#!/usr/bin/env python # # wget https://halldweb.jlab.org/dist/rcdb.sqlite # # ccdb mkdir /ELECTRON_BEAM # ccdb mktbl /ELECTRON_BEAM/current_map_epics -r 1 map=string # ccdb mktbl /ELECTRON_BEAM/timestamp_to_unix -r 1 tics_per_sec=double rcdb_250MHz_offset_tics=int rcdb_start_time=int # # import sqlite3 as lite import sys import os import re import io import datetime import pytz import time import subprocess import glob # The mysql.connector shipped with RCDB is old and breaks this script # May sure no RCDB directories are in the PYTHONPATH filtered_path = [x for x in sys.path if 'rcdb' not in x] sys.path = filtered_path import mysql.connector import ccdb def Usage(): print(' ') print(' Usage:') print(' hd_beam_current.py [options] [run[-run] [rcdb.sqlite]]') print(' ') print(' options:') print(' -ccdb write current map and calib params to CCDB') print(' -f force writing new paramters to CCDB even if') print(' ones already exist (ignored unless -ccdb set)') print(' also ignores is_valid_run_end flag') print(' -noevio do not run hd_ana on files on gluonraid2 to') print(' get timestamp_to_unix calibration') print(' -noepics do not run the mySampler command to extract') print(' beam current info from the EPICS archive') print(' -tac TAC run(s). This will use the nA BPM') print(' IPMAD00C.VAL instead of IBCAD00CRCUR6 for the') print(' beam current. It also reduces the threshold') print(' used to determine "beam on" and "boundary"') print(' from 3nA down to 0.25nA') print(' -rcdb_host=host set the RCDB DB host (default is gluondb1)') print(' -t test mode. No entries will be written to ccdb') print(' (even if -f is specified!)') print(' ') print(' --- This is run via cron job from the ---') print(' --- hdsys account on gluonraid2 every hour ---') print(' ') print('Get beam current as a function of time during the specified') print('run(s). If -ccdb is specified then an entry is made in the CCDB') print('pointed to by the CCDB_CONNECTION environment variable.') print(' ') print('A single run or run range may be specified. A run range should be') print('specified using a dash with no spaces (e.g. 30600-30700 ). If no') print('run range is specified, then the RCDB will be searched for runs') print('that have completed in the past 100 hours.') print(' ') print('This will run both mySampler and hd_ana in order to extract') print('the values needed. Thus, it needs to be run on a gluon') print('computer with infiniband connectivity (e.g. gluon112). Running') print('hd_ana will typically take 1 to 2 minutes. To skip running') print('hd_ana (and therefore not get the 250MHz clock to unix time') print('calibration) use the -noevio option.') print(' ') print('Run start and stop times are obtained from RCDB. By default it') print('connects to the MySQL server on gluondb1. A different host may') print('be specified using the -rcdb_host=host option. (Other connection') print('parameters cannot be changed without editing this script.) If') print('a second argument after the run range is given, it is taken to') print('be the name of an SQLite DB file to use instead. If an SQLite') print('filename is given the MySQL host is ignored. n.b. one can obtain') print('the most recent rcdb.sqlite file (generated nightly) via:') print(' ') print(' wget https://halldweb.jlab.org/dist/rcdb.sqlite') print(' ') print('When adding to the ccdb a check is first made if there is already') print('an entry for each run in the specified range. Runs for which an') print('entry already exists are ignored with a warning printed to the') print('screen unless the -f option is given. Entries are made into two') print('constant sets:') print(' ') print(' /ELECTRON_BEAM/current_map_epics') print(' /ELECTRON_BEAM/timestamp_to_unix') print(' ') print('The first is the map of beam current as archived in EPICS for the') print('IBCAD00CRCUR6 current monitor at times relative to the start') print('time of the run as recorded in RCDB. The second are the conversion') print('parameters for converting the 250MHz clock read out every event') print('into seconds relative to that same RCDB start time.') sys.exit(0) # Parse command line arguments RCDB_FILENAME = '' RCDB_HOST = 'gluondb1' RCDB_USER = 'rcdb' RCDB_PASS = 'GlueX_2come' RUNS = '' WRITE_TO_CCDB = False FORCE_CCDB_WRITE = False RUN_HD_ANA = True RUN_MYSAMPLER = True TAC_RUN = False TEST_MODE = False if len(sys.argv) <2 : Usage() for arg in sys.argv[1:]: if arg == '-h' : Usage() elif arg == '-t' : TEST_MODE = True elif arg == '-ccdb' : WRITE_TO_CCDB = True elif arg == '-f' : FORCE_CCDB_WRITE = True elif arg == '-noevio' : RUN_HD_ANA = False elif arg == '-noepics' : RUN_MYSAMPLER = False elif arg == '-tac' : TAC_RUN = True elif arg.startswith('-rcdb_host=') : RCDB_HOST = arg[11:] elif RUNS == '' : RUNS = arg elif RCDB_FILENAME == '' : RCDB_FILENAME = arg else: Usage() sys.exit(-1) # Connect to RCDB try: if RCDB_FILENAME=='': # MySQL RCDB server RCDB = 'mysql://' + RCDB_USER + ':' + RCDB_PASS + '@' + RCDB_HOST + '/rcdb' cnx = mysql.connector.connect(user=RCDB_USER, password=RCDB_PASS, host=RCDB_HOST, database='rcdb') cur = cnx.cursor(dictionary=True) # make dictionary style cursor else: # SQLite RCDB file RCDB = RCDB_FILENAME con = lite.connect(RCDB_FILENAME) con.row_factory = lite.Row # make next cursor dictionary style cur = con.cursor() except Exception as e: print('Error connectiong to RCDB: ' + RCDB) print(str(e)) sys.exit(-1) # Get run range to process if '-' in RUNS: pos = RUNS.find('-') RUN_MIN = RUNS[0:pos] RUN_MAX = RUNS[pos+1:] elif RUNS!='': RUN_MIN = RUNS RUN_MAX = RUNS else: # No run range given. Find it for last 100 hours sql = 'SELECT min(number) AS RUN_MIN,max(number) AS RUN_MAX FROM runs WHERE ' if RCDB.startswith('mysql') : sql += 'UNIX_TIMESTAMP(NOW())-UNIX_TIMESTAMP(finished) <100*3600' else: sql += 'strftime("%s","now")-strftime("%s",finished) <100*3600' cur.execute(sql) c_rows = cur.fetchall() if len(c_rows)==0 : print('No runs specified and unable to find any in DB for last 100 hours') sys.exit(0) RUN_MIN = c_rows[0]['RUN_MIN'] RUN_MAX = c_rows[0]['RUN_MAX'] print('No run range specified. Processing runs completed in last 100 hours: ' + str(RUN_MIN) + '-' + str(RUN_MAX)) # Optionally create CCDB api class and get all run ranges for # default variation, but only if we plan to write to the CCDB EXISTING_RUNS = [] if WRITE_TO_CCDB: url = os.getenv('CCDB_CONNECTION') # In April 2018 we switched to using gluondb1 as our default CCDB # server. This broke the updates which need to go to the hallddb # server. Check if this is set to use gluondb1 and if so, replace # it with hallddb. if 'gluondb' in url: url = 'mysql://ccdb_user@hallddb.jlab.org/ccdb' provider = ccdb.AlchemyProvider() # this class has all CCDB manipulation functions provider.connect(url) # use usual connection string to connect to database provider.authentication.current_user_name = os.getenv('USER') # to have a name in logs tablename = "/ELECTRON_BEAM/current_map_epics" if not RUN_MYSAMPLER: tablename = "/ELECTRON_BEAM/timestamp_to_unix" table = provider.get_type_table(tablename) for constant_set in table.constant_sets: if constant_set.assignment != None: if constant_set.assignment.variation.name == 'default' : minrun = int(constant_set.assignment.run_range.min) maxrun = int(constant_set.assignment.run_range.max) if minrun != maxrun : print('ERROR: existing entry in /ELECTRON_BEAM/current_map_epics covers') print('more than one run! ' + minrun + ' - ' + maxrun) print('Excluding all for list of runs to process') for r in range(minrun, maxrun+1) : EXISTING_RUNS.append(str(r)) else: EXISTING_RUNS.append(minrun) # Loop over all specified runs for RUN in range(int(RUN_MIN), int(RUN_MAX)+1) : # Check if this run already has an entry in CCDB if RUN in EXISTING_RUNS: action = 'Skipping' if FORCE_CCDB_WRITE : action = 'Overwriting' print('Run ' + str(RUN) + ' already in CCDB for default variation. ' + action + ' ...') if not FORCE_CCDB_WRITE : continue # Get start and end times of run sql = 'SELECT started,finished FROM runs WHERE number=%s' % RUN cur.execute(sql) c_rows = cur.fetchall() if len(c_rows)==0 : print('Run %s not in RCDB!' % RUN) continue start = str( c_rows[0]['started'] ) end = str( c_rows[0]['finished'] ) if start==None or end==None: print('Run %s not in RCDB!' % RUN) continue # Get number of events sql='SELECT int_value FROM conditions,condition_types WHERE condition_type_id=condition_types.id' sql += ' AND condition_types.name="event_count" AND run_number=' + str(RUN) cur.execute(sql) c_rows = cur.fetchall() event_count = None if len(c_rows)>0 : event_count = c_rows[0]['int_value'] if event_count==None: print('Run ' + str(RUN) + ' has no event_count in RCDB! Skipping.') continue if event_count < 500000: print('Run ' + str(RUN) + ' has only ' + str(event_count) + ' events. Skipping.') continue # Only consider runs that ended cleanly. This is mainly to avoid making entries # for runs that are still in progress. sql='SELECT bool_value FROM conditions,condition_types WHERE condition_type_id=condition_types.id' sql += ' AND condition_types.name="is_valid_run_end" AND run_number=' + str(RUN) cur.execute(sql) c_rows = cur.fetchall() is_valid_run_end = None if len(c_rows)>0 : is_valid_run_end = c_rows[0]['bool_value'] if is_valid_run_end==None: print('Run ' + str(RUN) + ' has no is_valid_run_end flag in RCDB! Skipping.') continue if int(is_valid_run_end) == 0: print('Run ' + str(RUN) + ' has is_valid_run_end set to zero. Run may still be in progress.') if not FORCE_CCDB_WRITE : continue else: print('(will process anyway since -f flag set)') # Calculate total duration of run in seconds (add 2 for safety) rcdb_start_t = datetime.datetime.strptime(start,'%Y-%m-%d %H:%M:%S') rcdb_end_t = datetime.datetime.strptime(end ,'%Y-%m-%d %H:%M:%S') tdiff = (rcdb_end_t - rcdb_start_t).total_seconds() + 2 # Form mySampler command cmd = None if RUN_MYSAMPLER: cmd = ['mySampler'] cmd += ['-b', str(start)] cmd += ['-s', '1s'] cmd += ['-n', str(int(tdiff))] if TAC_RUN: cmd += ['IPMAD00C.VAL'] else: cmd += ['IBCAD00CRCUR6'] # Form hd_ana command iff the some files are present on gluonraid2 or the cache disk cmd2 = None if RUN_HD_ANA : RUN_PERIOD = os.getenv('RUN_PERIOD') evio_fnames = glob.glob( '/gluonraid2/rawdata/volatile/' + RUN_PERIOD +'/rawdata/Run%06d/hd_rawdata_%06d_00[0-2].evio' % (int(RUN), int(RUN)) ) if len(evio_fnames)==0: evio_fnames = glob.glob( '/cache/halld/' + RUN_PERIOD +'/rawdata/Run%06d/hd_rawdata_%06d_00[0-2].evio' % (int(RUN), int(RUN)) ) if len(evio_fnames)>0: cmd2 = ['hd_ana', '-PPLUGINS=syncskim', '-PNTHREADS=16', '-PEVIO:NTHREADS=16'] + evio_fnames else: print('Run ' + str(RUN) + ' has no EVIO files on gluonraid2. Skipping.') continue # Print summary of what we're about to do print('----------------------------------') print(' RCDB: ' + RCDB) print(' run: ' + str(RUN)) print(' started: ' + start) print(' finished: ' + end) print(' duration: ' + str(int(tdiff)) + ' sec') print(' events: ' + '{:,}'.format(event_count)) if cmd : print(' command: ' + ' '.join(cmd)) if cmd2 : print(' command: ' + ' '.join(cmd2)) # Start hd_ana command (if EVIO file exists) but don't block # here waiting for it. It may take 25-30 seconds proc_hd_ana = None if cmd2 : proc_hd_ana = subprocess.Popen(cmd2, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # This is used both for the "beam on" condition and the # current change threshold indicating a current boundary current_thresh = 3.0 if TAC_RUN: current_thresh=0.25 # Execute command and process output if cmd: vals = [] sum_non_zero = 0.0 cnt_non_zero = 0.0 lines = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] for line in lines.decode().split('\n'): if '#' in line: continue try: v = float(line.split(' ')[-1]) vals.append(v) if v>current_thresh : sum_non_zero += v cnt_non_zero += 1.0 except: pass # Find boundaries t_sec = 0 t_beam_on = 0 last_v =-1000.0; boundaries = [] for v in vals: if v > current_thresh: t_beam_on += 1 if abs( v - last_v)>current_thresh: boundaries.append([t_sec,v]) t_sec += 1 last_v = v # Add final point as final boundary boundaries.append([t_sec,last_v]) avg = 0.0 # protect against div by zero next line if cnt_non_zero > 0: avg = sum_non_zero/cnt_non_zero if t_sec == 0 : t_sec =1 # similarly # Wait for hd_ana process to finish and parse results tics_per_sec = None if proc_hd_ana : print('waiting for hd_ana process to finish ...') lines = proc_hd_ana.communicate()[0] for line in lines.decode().split('\n'): if 'timestamp to unix time conversion:' not in line : continue print(line) vals = line.split(' ') tics_per_sec = vals[5][len('tics_per_sec='):] unix_start_time = vals[6][len("unix_start_time="):] # Print results if cmd: print('Avg beam current: ' + str(avg) + ' nA') print('Found %d current boundaries in run %s' % (len(boundaries),RUN)) print('Beam was on for %d/%d sec = %3.1f%% of time' % (t_beam_on, t_sec, 100.0*float(t_beam_on)/float(t_sec))) if tics_per_sec : print('hi res clock to unix time conversion: tics_per_sec=' + tics_per_sec + ' unix_start_time=' + unix_start_time) if 'nan' in tics_per_sec: print('--- BAD calibration values for run ' + str(RUN) + ' skipping writing to CCDB ---') continue # Optionally write values to CCDB if WRITE_TO_CCDB: if tics_per_sec and int(round(float(unix_start_time)))<1000: print('Writing to CCDB specified including the tics_per_sec but') print('the unix_start_time is not valid (' + unix_start_time + ' < 1000)') print('This may happen if the file is only partially copied to ') print('gluonraid2. Skipping writing for now.') sys.exit(0) print('Writing to CCDB: ' + url) if cmd: # Create string with full table content = '' for (t,Ibeam) in boundaries: content += '%f %f\n' % (t,Ibeam) # prepare content # create_assignment accepts tabled data # rows and columns number must correspond to table definition tabled_data = [[content]] #add data to database print('Writing %d boundaries to /ELECTRON_BEAM/current_map_epics' % len(boundaries)) if not TEST_MODE: provider.create_assignment( data=tabled_data, path="/ELECTRON_BEAM/current_map_epics", variation_name="default", min_run=RUN, max_run=RUN, comment="Auto created from EPICS archive via hd_beam_current.py") else: print('') if tics_per_sec: # It is not entirely clear why the following works since I thought # both unix epoch and rcdb time should be cast as the local time zone. # Empirically, this seems correct through EDT = pytz.timezone('US/Eastern') UTC = pytz.utc rcdb_start_t_EDT = EDT.localize(rcdb_start_t) unix_epoch_UTC = UTC.localize(datetime.datetime(1970,1,1)) rcdb_start_t_unix = (rcdb_start_t_EDT - unix_epoch_UTC).total_seconds() rcdb_250MHz_offset_tics = ( float(rcdb_start_t_unix) - float(unix_start_time) )*float(tics_per_sec) print('Writing the following to /ELECTRON_BEAM/timestamp_to_unix: %f %d %d' % (float(tics_per_sec), int(rcdb_250MHz_offset_tics), int(rcdb_start_t_unix))) print('(offset is %f seconds)' % (rcdb_250MHz_offset_tics/float(tics_per_sec))) if not TEST_MODE: provider.create_assignment( data=[[float(tics_per_sec), int(rcdb_250MHz_offset_tics), int(rcdb_start_t_unix)]], path="/ELECTRON_BEAM/timestamp_to_unix", variation_name="default", min_run=RUN, max_run=RUN, comment="Auto created from EPICS archive via hd_beam_current.py") else: print('')