#!/usr/bin/env python # # This script is responsible for moving rawdata files coming from # the DAQ through various compute nodes and directories. This is # done in order to process and store them in a distributed way in # the counting house computers. The exact configuration is driven # by an external configuration file. # # The configuration file allows a lot of flexibility in how complex # the system is. The system was designed for the following scenario # which gives some idea of the level of complexity it can handle. # # 1. Distribute newly generated files to multiple RAID servers # so that each server only needs to handle a portion of the # overall bandwidth. # # 2. Distribute the files sent to a server among multiple RAID # partitions so each partition only needs to handle a portion # of the overall bandwidth. # # 3. Transfer a copy of each raw data file a RAID server receives # to a farm node so that a skim can be done and the files # transferred back to the RAID server # # 4. Make hard links to each file on the RAID disk in both a staging # and volatile directory so they can be copied to tape and # kept around on the RAID until the space is needed. # # # Configuration File Format # =============================================================== # The configuration file is made of any number of sections where # each section is either type "stage:" or "distribute:". A stage # section is used to link a file in all directories in a list. # A distribute section is used to send a file to one of several # destinations in a list. # # Another way to view it is that stage sections are things than # can be done immediately on the same filesystem while distribute # sections are things that require an external process. # # stage: # -------------------- # "stage:" sections may contain one or more "source" lines and # one or more "destination" lines. The source lines are strings # compatible with python's glob.glob(). Files matched to any # source line AND are not opened by any process for writing (as # indicated by /usr/sbin/lsof) are linked in all destination # directories. Each destination line should indicate an existing # directory (no wilcards). Note that the source files and # destination directories must exist on the same local filesystem # (i.e. not on different disks). The links in the destination # directories are all hard links so all must be removed to actually # remove the file from the disk. The original source file # will be unlinked once all of the links in the destination # directories have been created. # # distribute: # -------------------- # "distribute:" sections may contain one or more "source" lines # and either a single "process" line or one or more "destination" # lines. If a "process" line is present it specifies an external # process to run on a source file. "destination" lines indicate # that the file is to be transferred. Transfers are done using # either hdrdmacp for remote nodes or rsync for other filesystems # on the same node. (Use a stage section if transferring within # the same file system.) # # In the case that a "destination" is used in a distribute: section # multiple destination lines may exist and the script will transfer # the source file to just one of them. # # For the special case of a distribution section where the file # is to be copied to a different file system on the local host # AND one also wishes to make a hard link to that file in a different # directory on the same file system, then multiple destination # directories may be specified. Multiple destination directories # on the same line are not supported in any other context. # # # This will support multiple source directories being scanned to # multiple destination directories. Moreover, it also supports # multiple sets of src/dest lists. The configuration is set via # an input configuration file. An example of the format is given # here: # # stage: # source /media/ramdisk/rawdata/*.evio # dest /media/ramdisk/stage_to_disk # dest /media/ramdisk/stage_to_skim # # distribute: # source /media/ramdisk/stage_to_skim/*.evio # dest gluon112:/media/ramdisk/stage_to_skim # dest gluon113:10471:/media/ramdisk/stage_to_skim # # # # Format of data structures holding staging and distribution sets # staging_set = [{sources:[string], destinations:[[string,string,...]]] # distribution_set = [{sources:[string], destinations:[[string,string,...]], processes:[[string]], idx:int] # import sys import os import re import subprocess import signal import threading import time import glob import socket import shutil import psutil import zmq import json import traceback import inspect from inspect import currentframe, getframeinfo from epics import caput,caget MAX_PROCS = 6 TIMEOUT_PROCESS = 180 # timeout in seconds to allow external process to run before killing it TIMEOUT_LSOF = 6 # timeout in seconds to allow the /usr/sbin/lsof command to execute before throwing exception MIN_TIME_SINCE_MODIFICATION = 1.5 # seconds (see SelectFilesNotOpenForWriting) KILLALL_PATS = [] FILE_TIMEOUT = {} ALARMS = {} TEST_MODE = False DONE = False Ntransfers = 0 Nprocesses_started = 0 HEARTBEAT = 0 # Increased once per iteration of main loop STATS_PORT = 10473 COMMAND_PORT = 10474 DB_INFO = {'update':False, 'host':'', 'user':'', 'passwd':'', 'database':'', 'conn':None, 'errors':0} hostname = socket.gethostname() hostname_short = hostname.split('.jlab.org')[0].split('-daq')[0] zmq_context = None zmq_req_socket = {} # These are used to help with debugging LINE_MAIN = getframeinfo(currentframe()).lineno LINE_SERVER = 0 #----------------------------------------------------- # SelectFilesNotOpenForWriting # # Returns list of files from the given list that are # not currently open for writing. # n.b. this will NOT detect if a file is being written # to via NSF. #----------------------------------------------------- def SelectFilesNotOpenForWriting( fnames ): global MIN_TIME_SINCE_MODIFICATION, LINE_MAIN LINE_MAIN = getframeinfo(currentframe()).lineno if not fnames : return [] # don't run lsof with no arguments # n.b. we must use sudo to run lsof so that it will see root processes (e.g. hdrdmacp) # This requires a line in the file /etc/sudoers.d/10_halld-cmds cmd = ['sudo', '/usr/sbin/lsof'] + fnames try: LINE_MAIN = getframeinfo(currentframe()).lineno lines = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf8').communicate(timeout=TIMEOUT_LSOF)[0] LINE_MAIN = getframeinfo(currentframe()).lineno except subprocess.TimeoutExpired as e: LINE_MAIN = getframeinfo(currentframe()).lineno print('=========================================================================') print('Timeout exception while processing cmd: ' + ' '.join(cmd)) print( e ) print(' This can happen if one of the network mounted disks (e.g. gluonraid3) becomes') print(' unresponsive. Check that all partitions of all mounts are still sctive. If') print(' a server is down (or went down and is up, but this is still timing out), ') print(' you may need to umount it manually with something like:') print(' sudo umount -lazy -force /gluonraid3/data1') print(' It should then automatically get re-mounted by the system.') print('') return [] # tell caller all files are being written to so it does nothing except Exception as e: LINE_MAIN = getframeinfo(currentframe()).lineno print('Exception while processing cmd: ' + ' '.join(cmd)) print( e ) return [] # tell caller all files are being written to so it does nothing LINE_MAIN = getframeinfo(currentframe()).lineno files_open_for_writing = [] # print('----- Nlines: ' + str(len(lines))) for line in lines.split('\n'): # print(line) words = line.split() if len(words) <5 : continue fname = words[-1] # print('fname=' + fname + ' words[3]=' + words[3]) if fname in fnames and 'w' in words[3] : files_open_for_writing.append( fname ) LINE_MAIN = getframeinfo(currentframe()).lineno selected = [fname for fname in fnames if fname not in files_open_for_writing ] # hdrdmacp checks to see if a file transferred properly by sending a zmq message # asking about the file size when it is done. If HOSS moves the file to another # directory too quickly then it will not report the correct file size and hdrdmacp # will report the file transfer failed. To avoid that situation, we check that a # file's modification time is at least MIN_TIME_SINCE_MODIFICATION seconds old # before adding it to the returned list. LINE_MAIN = getframeinfo(currentframe()).lineno mature = [] tnow = time.time() for fname in selected: if os.path.exists(fname): try: age = tnow - os.path.getmtime(fname) if age >= MIN_TIME_SINCE_MODIFICATION: mature.append( fname ) except: pass # print('===== selected: ' + str(selected)) LINE_MAIN = getframeinfo(currentframe()).lineno return mature #=================================================================================== # ParseConfigurationFile #=================================================================================== def ParseConfigurationFile( configfile ): global staging_sets, distribute_sets, TEST_MODE, hostname, MAX_PROCS, errors global TIMEOUT_PROCESS print('Reading configuration from: ' + configfile) try: link = os.readlink( configfile ) print('configuration file is link: ' + configfile + ' -> ' + link) except: pass staging_sets = [] distribute_sets = [] aliases = {} set_type = 'nada' set_hosts = () iline = 0 with open(configfile) as f: set ={'sources':[], 'destinations':[], 'processes':[]} for line in f.readlines(): iline += 1 if '#' in line: line = line[:line.find('#')] # Remove comments for name,value in aliases.items(): line = line.replace('%'+name, value) # Replace aliases if line.startswith('ALIAS:') : vals = line.split(maxsplit=2) aliases[vals[1]] = vals[2].strip() continue if line.startswith('TESTMODE:') : TEST_MODE = True continue if line.startswith('REMOVETREE:') : vals = line.split(maxsplit=2) if len(vals)<3: print('ERROR in REMOVETREE line. Needs 2 arguments: directory and node list') else: for host in vals[2].split(','): if hostname.startswith(host.strip()): dirname = ReplaceKeywords(vals[1]) for f in glob.glob( dirname + '/*' ): try: if os.path.isdir( f ): print('-rmdir: ' + f ) shutil.rmtree( f, True) else: print('-rm: ' + f ) os.unlink( f ) except Exception as e: print('ERROR removing ' + f ) print( e ) continue if line.startswith('KILLALL:'): vals = line.split(':')[1] # grab part of string after "KILLALL:" pat = vals.split()[0] # split on spaces and take first item as string to search for in command lines to kill hosts_str = ''.join(vals.split()[1:]) # split on spaces and take all other items as hosts (allows spaces with commas) hosts = hosts_str.split(',') # split host string on commas into individual hosts for host in hosts: if hostname.startswith(host): KILLALL_PATS.append(pat) continue if line.startswith('MAX_PROCS:'): vals = line.split(':')[1] # grab part of string after "MAX_PROCS:" max_procs = vals.split()[0] # split on spaces and take first item as max number of processes hosts_str = ''.join(vals.split()[1:]) # split on spaces and take all other items as hosts (allows spaces with commas) hosts = hosts_str.split(',') # split host string on commas into individual hosts for host in hosts: if hostname.startswith(host): MAX_PROCS = int(max_procs) continue if line.startswith('FILE_TIMEOUT:'): vals = line.split(maxsplit=3) for host in vals[3].split(','): if hostname.startswith(host.strip()): dir = ReplaceKeywords(vals[2]) FILE_TIMEOUT[dir] = vals[1].split('s')[0] # key = directory, value=timeout in seconds continue if line.startswith('PROCESS_TIMEOUT:'): vals = line.split(maxsplit=2) if len(vals)<2: print('ERROR in PROCESS_TIMEOUT line. Needs 1 argument: timeout (in seconds)') else: TIMEOUT_PROCESS = int(vals[1]) print('TIMEOUT_PROCESS set to %d secs' % TIMEOUT_PROCESS) continue if line.startswith('ALARM:'): vals = line.split(maxsplit=3) for host in vals[3].split(','): if hostname.startswith(host.strip()): dir = ReplaceKeywords(vals[2]) ALARMS[dir] = vals[1].split('GB')[0] # key = directory, value=min disk space in GB continue if line.startswith('DB_HOST:'): DB_INFO['host'] = line.split()[-1] continue if line.startswith('DB_USER:'): DB_INFO['user'] = line.split()[-1] continue if line.startswith('DB_PASS:'): vals = line.split() if len(vals) > 1: DB_INFO['passwd'] = vals[1] continue if line.startswith('DB_DATABASE:'): DB_INFO['database'] = line.split()[-1] continue if line.startswith('DB_NODES:'): vals = line.split(maxsplit=2) for host in vals[1]: if hostname.startswith(host.strip()): DB_INFO['update'] = True continue if line.startswith('stage:') or line.startswith('distribute:'): # Starting new section. Finalize previous section (if any) if hostname.startswith( set_hosts ): # only add if section applies to this host if set_type.startswith('stage:'): staging_sets.append(set) if set_type.startswith('distribute:'): distribute_sets.append(set) # Initialize variables for new section set_type = line.strip() set_hosts = tuple([x.strip() for x in set_type.split(':')[1].strip().split(',')]) set ={'sources':[], 'destinations':[], 'processes':[]} else: words = line.split() if len(words)<2: continue if words[0] == 'source' : set['sources' ].append( ReplaceKeywords(words[1]) ) if words[0] == 'process' : set['processes' ].append( words[1:] ) if words[0] == 'process_out': set['process_out' ] = words[1:] # Destination lines may have a comma-separated list of hosts. if words[0] == 'destination': dest = line.split(maxsplit=1)[1] if ':' in dest: destpath = dest.split(':')[1].strip() hosts = dest.split(':')[0].split(',') for h in hosts: set['destinations'].append( [h.strip()+':'+destpath] ) else: set['destinations'].append( words[1:] ) # local directory (no hosts) # Check for configuration inconsistencies if len(set['processes']) > 1: print('ERROR at line ' + str(iline) + ': Only one process line permitted in a section') sys.exit(-1) if (words[0] == 'process') and (set_type.startswith('stage:')): print('ERROR at line ' + str(iline) + ': "process" lines not allowed in "stage:" sections') sys.exit(-1) if (words[0] == 'process') and (len(set['destinations']) > 0): print('ERROR at line ' + str(iline) + ': Cannot have both "process" and "destination" lines in same section') sys.exit(-1) if (words[0] == 'source') and (len(words) > 2): print('ERROR at line ' + str(iline) + ': "source" lines may only specify one expression') sys.exit(-1) if (words[0] == 'destination') and (len(words) > 2) and (set_type.startswith('stage:')): print('ERROR at line ' + str(iline) + ': "destination" lines in "stage:" sections may only specify one expression') sys.exit(-1) # Add last set if hostname.startswith( set_hosts ): # only add if section applies to this host if set_type.startswith('stage:' ): staging_sets.append(set) if set_type.startswith('distribute:'): distribute_sets.append(set) # Add idx to all distribute sets for set in distribute_sets: if len(set['destinations']) > 0 : set['idx'] = 0 # Make directories for all sources if they don't already exist for set in staging_sets + distribute_sets: for source in set['sources']: srcdir = ReplaceKeywords( os.path.dirname(source)) # This should maybe be modified in the future to make as much of the # the directory tree as possible that does not include a wildcard. if '*' in srcdir: continue if not os.path.exists( srcdir ): print('+mkdir: ' + srcdir) try: os.makedirs( srcdir ) except: print('ERROR making directory: ' + srcdir) errors['MKDIR_FAILED'] = errors.get('MKDIR_FAILED',0) + 1 #=================================================================================== # SetEPICSvar # # This just wraps the caput call in a try except block so if there is a problem # with setting the EPICS variable, it does not cause the whole script to fail #=================================================================================== def SetEPICSvar( name, val): global LINE_MAIN try: LINE_MAIN = getframeinfo(currentframe()).lineno caput(name, val) LINE_MAIN = getframeinfo(currentframe()).lineno except Exception as e: print('Unable to set EPICS variable "' + name + '" to: ' + str(val)) print(str(e)) #=================================================================================== # GetEPICSvar # # This just wraps the caget call in a try except block so if there is a problem # with getting the EPICS variable, it does not cause the whole script to fail #=================================================================================== def GetEPICSvar( name): global LINE_MAIN try: LINE_MAIN = getframeinfo(currentframe()).lineno return caget(name) except Exception as e: LINE_MAIN = getframeinfo(currentframe()).lineno print('Unable to get EPICS variable "' + name + '" ') print(str(e)) return '' #=================================================================================== # ReplaceKeywords # # This is called to replace any keywords in commands to be executed. # Keywords all start with "@". No check is made for invalid keywords # so typos will be silently ignored. # # Valid keywords: # # @INFILE - input filename (copied from sfile) # @RUNPERIOD - Run Period (from RUNPERIOD envar.) # @RUNNUMBER - Run number extracted from sfile. zero padded to 6 digits # @SKIMDIR - skim file directory name (looked up in map based on sfile) # @TESTDIR/ - replaced by 'TEST' when in test mode and empty string otherwise #=================================================================================== def ReplaceKeywords(cmd, sfile='dummy000000_000.evio'): global TEST_MODE RUNPERIOD = os.getenv('RUN_PERIOD', 'RunPeriod000000-000') nums = re.findall(r'\d+', os.path.basename(sfile) ) if len(nums) < 1: print('ERROR Cannot extract run number from: ' + sfile) RUNNUMBER='000000' else: RUNNUMBER = '%06d' % int(nums[0]) # extract first integer from file and ensure it is formatted as 6 digit string if len(nums) >1: FILENUMBER = '%03d' % int(nums[1]) else: FILENUMBER = 0 SKIMDIR = 'skims' if sfile.endswith('.BCAL-LED.evio' ): SKIMDIR = 'BCAL-LED' if sfile.endswith('.FCAL-LED.evio' ): SKIMDIR = 'FCAL-LED' if sfile.endswith('.CCAL-LED.evio' ): SKIMDIR = 'CCAL-LED' if sfile.endswith('.DIRC-LED.evio' ): SKIMDIR = 'DIRC-LED' if sfile.endswith('.ps.evio' ): SKIMDIR = 'PS' if sfile.endswith('.random.evio' ): SKIMDIR = 'random' if sfile.endswith('.sync.evio' ): SKIMDIR = 'sync' if 'hd_root_tofcalib_' in sfile : SKIMDIR = 'TOF' if 'hd_calib_final_' in sfile : SKIMDIR = 'hists' TESTDIR = '' if TEST_MODE: TESTDIR='TEST/' # Allow cmd to be either single string or list of strings if isinstance( cmd, str ): mycmd = [cmd] else: mycmd = cmd mycmd = [x.replace('@INFILE' , sfile ) for x in mycmd] mycmd = [x.replace('@RUNPERIOD', RUNPERIOD) for x in mycmd] mycmd = [x.replace('@RUNNUMBER', RUNNUMBER) for x in mycmd] mycmd = [x.replace('@FILENUMBER', FILENUMBER) for x in mycmd] mycmd = [x.replace('@SKIMDIR' , SKIMDIR ) for x in mycmd] mycmd = [x.replace('@TESTDIR/' , TESTDIR ) for x in mycmd] if isinstance( cmd, str ): return mycmd[0] # user gave us singe string so return single string else: return mycmd # user gave us list so return a list #=================================================================================== # KillProc # # This kills the process with the given pid and all children of that process. # If a filename is given, then that file is unlinked. #=================================================================================== def KillProc(pid, sfile=None): try: LINE_MAIN = getframeinfo(currentframe()).lineno parent = psutil.Process( pid ) except NoSuchProcess: LINE_MAIN = getframeinfo(currentframe()).lineno print(' pid=%d is no longer around so no need to kill' % p.pid) return [] except AccessDenied: LINE_MAIN = getframeinfo(currentframe()).lineno print(' access denied for pid=%d (ppid=%d). Unable to kill' % (p.pid, p.ppid())) return [] LINE_MAIN = getframeinfo(currentframe()).lineno children = parent.children(recursive=True) LINE_MAIN = getframeinfo(currentframe()).lineno children.append(parent) for p in children: print(' killing pid=%d (ppid=%d)' % (p.pid, p.ppid())) try: p.send_signal(signal.SIGKILL) except: print(' exception killing child process. Ignoring.') if sfile: if os.path.exists( sfile ): print('unlinking: ' + sfile ) try: os.unlink( sfile ) except Exception as e: print('ERROR unlinking: ' + sfile ) print( e ) return children #=================================================================================== # RaiseDiskSpaceAlarm # # This is called when the free disk space falls below a minimum. Alarms levels # are associated with specific directories in the config file. # # This will automatically kill any running procs and delete any files in # directories listed in FILE_TIMEOUT in an attempt to reset to a workable state. #=================================================================================== def RaiseDiskSpaceAlarm(topdir, free_GB, min_GB): global procs, errors, last_cmd, FILE_TIMEOUT, LINE_MAIN LINE_MAIN = getframeinfo(currentframe()).lineno errstr = 'ALARM: free space fell below minumim (%3.1fGB < %3.1fGB) for: %s' % (float(free_GB), float(min_GB), topdir) print(errstr) SetEPICSvar('HD:coda:daq:hoss_status', bytes(errstr, 'utf-8')) SetEPICSvar('HD:coda:daq:hoss_error', 1) errors['DISK_SPACE'] = errors.get('DISK_SPACE',0) + 1 #=============== Kill all running sub processes trying to write to the full disk ================= # Make dictionary of pids and all children pids indexed by sfile sfile_pids = {} for sfile,proc in procs.items(): pids = [proc['proc'].pid] pids.extend( psutil.Process( proc['proc'].pid ).children(recursive=True) ) sfile_pids[sfile] = pids # Put pids of all subprocesses and their children into a list all_pids = [] for s,pids in sfile_pids.items(): all_pids.extend( pids ) # Run lsof for our pid's LINE_MAIN = getframeinfo(currentframe()).lineno cmd = ['sudo','/usr/sbin/lsof', '-p', ','.join([str(x) for x in all_pids])] try: LINE_MAIN = getframeinfo(currentframe()).lineno lines = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf8').communicate()[0] LINE_MAIN = getframeinfo(currentframe()).lineno except Exception as e: print('Exception while processing cmd: ' + ' '.join(cmd)) print( e ) lines = [] # make empty lines list for the following # Find any files open for writing and put the sfile corresponding to that pid (possibly child) into a list LINE_MAIN = getframeinfo(currentframe()).lineno sfiles_to_remove = [] st_dev1 = os.stat(topdir).st_dev for line in lines.split('\n'): words = line.split() if len(words) <9 : continue if words[3].endswith('w'): pid = words[1] fname = words[8] try: st_dev2 = os.stat(fname).st_dev if st_dev2 == st_dev1: for sfile,pids in sfile_pids.items(): if pid in pids: sfiles_to_remove.append(sfile) except: print('Error getting st_dev for ' + fname) # Kill all subprocesses which are writing or have a child writing to the full disk LINE_MAIN = getframeinfo(currentframe()).lineno all_children = [] for s in sfiles_to_remove: proc = procs[sfile] children = KillProc(proc['proc'].pid) # Do not delete the input file to the process since this can delete raw data files. all_children.extend( children ) # all_children = [] # sfiles_to_remove = [] # for sfile,proc in procs.items(): # # children = KillProc(proc['proc'].pid, sfile) # if any(x.startswith(topdir) for x in proc['dest_paths']): # children = KillProc(proc['proc'].pid) # Do not delete the input file to the process since this can delete raw data files. # all_children.extend( children ) # sfiles_to_remove.append() # else: # print('Process for ' + sfile + ' not writing to this disk so letting it continue ...') LINE_MAIN = getframeinfo(currentframe()).lineno gone, alive = psutil.wait_procs(all_children, timeout=5) LINE_MAIN = getframeinfo(currentframe()).lineno print('freeing space: killed %d processes (%d still lingering)' % (len(gone), len(alive))) for s in sfiles_to_remove: print('Killed processes for ' + s) del procs[s] #procs = {} transition_times[time.time()] = 'idle' last_cmd = [] #========== Remove any files in directories targeted for file timeouts ========= LINE_MAIN = getframeinfo(currentframe()).lineno for topdir,timeout in FILE_TIMEOUT.items(): for root, dirs, files in os.walk(topdir): for fname in files: mypath = os.path.join(root, fname) delta_t = time.time() - os.stat(mypath).st_mtime print('freeing space: unlinking file last modified %3.1f seconds ago: %s' % (delta_t, mypath) ) os.unlink( mypath ) #=================================================================================== # RaiseProcessAlarm # # This is called when an external process (run by Popen) returns a non-zero # error code. hdrdmacp processes are treated more seriously since they may represent # a failure in the flow of raw data. #=================================================================================== def RaiseProcessAlarm(cmd, returncode): global procs, errors, last_cmd, FILE_TIMEOUT if cmd[0] =='hdrdmacp': errstr = 'ALARM: RDMA file copy failed: ' + ' '.join(cmd) SetEPICSvar('HD:coda:daq:hoss_error', 2) errors['HDRDMACP_FAILED'] = errors.get('HDRDMACP_FAILED',0) + 1 else: errstr = 'ALARM: Subprocess failed: ' + ' '.join(cmd) SetEPICSvar('HD:coda:daq:hoss_error', 3) errors['PROCESS_FAILED'] = errors.get('PROCESS_FAILED',0) + 1 print(errstr) SetEPICSvar('HD:coda:daq:hoss_status', bytes(errstr, 'utf-8')) #=================================================================================== # UpdateDB # # This will insert a row into the HOSS_transfers table of the DB specified in the # configuration file for the specified source and destination. #=================================================================================== def UpdateDB(src, dest): global DB_INFO # Only try inserting if "update" flag was set if DB_INFO['update']: # Issues arose in Oct. 2021 PrimEx run where the connection # was being dropped. If the connection has been set, then # check that it is still active. If not, re-establish it along # with a new cursor. if DB_INFO['conn']: if not DB_INFO['conn'].is_connected(): print('NOTICE: MySQL connection seems to have dropped. Reconnecting ...') DB_INFO['conn'] = None if not DB_INFO['conn']: try: # We need to import mysql.connector but the RCDB version is # not compatible with what is installed on the gluons. Thus, # we need to make sure it is not in our path mypath = sys.path.copy() # loop over copy so we can change real list within loop for p in mypath: if '/rcdb/' in p: sys.path.remove(p) import mysql.connector as mysql conn = mysql.connect( host=DB_INFO['host'], user=DB_INFO['user'], passwd=DB_INFO['passwd'], database=DB_INFO['database']) print(conn) DB_INFO['conn'] = conn DB_INFO['cursor'] = conn.cursor() except: traceback.print_exc() print('Unable to create DB connection. Writing to DB will be disabled.') DB_INFO['update'] = False return try: # Get all numbers from string into a list so we can assume last two are run and file nums = re.findall(r'\d+', src) while len(nums)<2 : nums.append(0) # For ease of reading, put all values in dictionary and then form SQL from that c = {} c['run'] = str(int(nums[-2])) c['file'] = str(int(nums[-1])) c['source'] = src c['destination'] = dest c['size'] = str( os.path.getsize(src) ) c['host'] = hostname_short vals = ['"{0}"'.format(w) for w in c.values()] # create list with each value in quotes sql = 'INSERT INTO HOSS_transfers ' sql += '(' + ','.join(c.keys()) + ') ' sql += 'VALUES (' + ','.join(vals) + ') ' # print(sql) DB_INFO['cursor'].execute(sql) DB_INFO['conn'].commit() except: traceback.print_exc() DB_INFO['errors'] += 1 if DB_INFO['errors'] >= 10: print('Too many DB errors. Disabling DB updates!') DB_INFO['update'] = False #=================================================================================== # PublishStats # # This runs in a separate thread to periodically publish statistics # in the form of a zeroMQ message on tcp port 10473 # # This is also responsible for monitoring the HEARTBEAT variable updated # by the main thread. #=================================================================================== def PublishStats(): global DONE, zmq_context, hostname, procs, tbusy_last, tidle_last, errors, STATS_PORT, Nprocesses_started global HEARTBEAT, LINE_SERVER try: if not zmq_context: zmq_context = zmq.Context() socket = zmq_context.socket(zmq.PUB) socket.bind('tcp://*:' + str(STATS_PORT)) print('Publishing stats for ' + hostname + ' to port ' + str(STATS_PORT)) missed_heartbeats = 0 last_heartbeat = HEARTBEAT while not DONE: LINE_SERVER = getframeinfo(currentframe()).lineno # Verify main loop is still running if HEARTBEAT==last_heartbeat: missed_heartbeats += 1 if missed_heartbeats>5: LINE_SERVER = getframeinfo(currentframe()).lineno print('ERROR: Heartbeat stopped for main loop. Quitting ...') errors['HEARTBEAT_STOPPED'] = errors.get('HEARTBEAT_STOPPED',0) + 1 # Temporarily disable ending of loop just because heartbeat stops. 8/11/2021 DL #DONE = True missed_heartbeats = 0 else: LINE_SERVER = getframeinfo(currentframe()).lineno missed_heartbeats = 0 last_heartbeat = HEARTBEAT vals = {} vals['host'] = hostname vals['program'] = 'hd_data_flow.py' vals['status'] = 'idle' if len(procs)>0 : vals['status'] = 'busy' vals['tbusy'] = tbusy_last vals['tidle'] = tidle_last vals['last_cmd'] = ' '.join(last_cmd) vals['respawns'] = os.getenv('HDLOG_NUM_RESPAWNS','0') vals['num_procs'] = len(procs) vals['num_procs_total'] = Nprocesses_started vals['max_procs'] = MAX_PROCS vals['errors'] = str(errors) LINE_SERVER = getframeinfo(currentframe()).lineno socket.send_string( json.dumps(vals) ) LINE_SERVER = getframeinfo(currentframe()).lineno time.sleep(3) except: LINE_SERVER = getframeinfo(currentframe()).lineno print(sys.exc_info()[0]) print('Problem publishing stats. Disabled from here on out.') LINE_SERVER = getframeinfo(currentframe()).lineno #=================================================================================== # CommandServer # # This runs in a separate thread listening for commands from remote processes # in the form of zeroMQ REQ-REP messages on tcp port 10474. #=================================================================================== def CommandServer(): global DONE, zmq_context, hostname, procs, errors, COMMAND_PORT, Nprocesses_started global LINE_MAIN, LINE_SERVER try: if not zmq_context: zmq_context = zmq.Context() socket = zmq_context.socket(zmq.REP) socket.bind('tcp://*:' + str(COMMAND_PORT)) print('Listening for commands on port ' + str(COMMAND_PORT)) except zmq.ZMQError as zerr: print(sys.exc_info()[0]) print(zerr.msg) print('Problem setting up command server. Will be unable to accept commands.') while not DONE: try: cmd = socket.recv().decode() #print('received command: ' + cmd) response = 'OK' #-------- quit if cmd.startswith('quit'): DONE = True #-------- get Nprocs elif cmd.startswith('get Nprocs'): response = str(len(procs)) #-------- get free space elif cmd.startswith('get free space'): try: dir = cmd.split('get free space', maxsplit=1)[1].strip() s = os.statvfs(dir) free_bytes = s.f_frsize*s.f_bfree response = str(free_bytes) except: print('ERROR getting free space for requested directory: ' + dir) print(sys.exc_info()[0]) response = '0 unknown directory: ' + dir #-------- get debug elif cmd.startswith('get debug'): response = 'LINE_MAIN: %d\nLINE_SERVER: %d' % (LINE_MAIN, LINE_SERVER) #-------- clear errors elif cmd.startswith('clear errors'): errors = {} elif cmd.startswith('clear counters'): errors = {} Nprocesses_started = 0 else: response = 'Unknown command: ' + cmd socket.send(response.encode()) except: print(sys.exc_info()[0]) print('Problem receiving command.') #=================================================================================== # SendCommand # # Send a command in the form of a zeroMQ REQ-REP message on tcp port 10474. # This allows communication with an instance of this script running on a remote host #=================================================================================== def SendCommand(host, cmd): global zmq_context, zmq_req_socket, COMMAND_PORT, LINE_MAIN try: LINE_MAIN = getframeinfo(currentframe()).lineno if not zmq_context: zmq_context = zmq.Context() # Keep one socket connection for each host if not host in zmq_req_socket.keys() : req_socket = zmq_context.socket(zmq.REQ) req_socket.connect('tcp://'+host+':' + str(COMMAND_PORT)) zmq_req_socket[host] = req_socket else: req_socket = zmq_req_socket[host] LINE_MAIN = getframeinfo(currentframe()).lineno print('sending command to ' + host + ': ' + cmd) req_socket.send( cmd.encode() ) LINE_MAIN = getframeinfo(currentframe()).lineno response = req_socket.recv() LINE_MAIN = getframeinfo(currentframe()).lineno return response.decode() except: print(sys.exc_info()[0]) print('Problem sending command to host ' + host + ': ' + cmd) return 'ERROR' #=================================================================================== # main errors = {} if len(sys.argv) < 2 : print('\nUsage:\n\n hd_data_flow.py configfile\n') print(' e.g.') print(' hd_data_flow.py $DAQ_HOME/config/HOSS/hoss_default.config\n') sys.exit(0) configfile = sys.argv[1] ParseConfigurationFile( configfile ) print(' staging_sets: %d' % len(staging_sets)) print('distribute_sets: %d' % len(distribute_sets)) print(' MAX_PROCS: %d' % MAX_PROCS) print(' FILE_TIMEOUT: ' + str(FILE_TIMEOUT)) print(' ALARMS: ' + str(ALARMS)) print(' KILLALL: ' + str(KILLALL_PATS)) print('staging:' + str(staging_sets)) print('distribute_sets:' + str(distribute_sets)) print('-------------------------------------------------------------------------------------------') for pat in KILLALL_PATS: cmd = ['pkill', '-f', '--signal', 'SIGTERM', pat] print('KILLALL: ' + ' '.join(cmd)) subprocess.call(cmd) if KILLALL_PATS: time.sleep(2) for pat in KILLALL_PATS: cmd = ['pkill', '-f', '--signal', 'SIGKILL', pat] print('KILLALL: ' + ' '.join(cmd)) subprocess.call(cmd) procs = {} rsync_post_links = {} last_cmd = [] # Variable to help keep track of running average of fraction of time we are busy running an external process transition_times = {time.time():'idle'} twindow = 5*60 # look back time to average over tbusy_last = 0 tidle_last = twindow if (len(staging_sets) + len(distribute_sets)) == 0: print('\n\nNothing in configuration for host: ' + hostname) sys.exit(0) # Launch separate thread to publish statistics as zeroMQ messages t_stats = threading.Thread( target=PublishStats ) t_stats.start() # Launch separate thread to listen for commands from zeroMQ messages t_cmds = threading.Thread( target=CommandServer ) t_cmds.start() # WARNING: Do NOT put global variable declarations here that are used in PublishStats ! while not DONE: #print('line: %d' % (inspect.getframeinfo(inspect.currentframe()).lineno) ) HEARTBEAT += 1 LINE_MAIN = getframeinfo(currentframe()).lineno #================ STAGING SETS =================== for staging_set in staging_sets: # Loop over source directories and form list of files files =[] for src in staging_set['sources']: files.extend( glob.glob(src) ) # In case multiple sources match the same file twice, remove duplicates (n.b. this reorders files) files = list(set(files)) # Find subset of file list that are not currently open for writing LINE_MAIN = getframeinfo(currentframe()).lineno files = SelectFilesNotOpenForWriting( files ) # Loop over destination directories LINE_MAIN = getframeinfo(currentframe()).lineno for sfile in files: Nlinks_created = 0 for destdirs in staging_set['destinations']: destdir = destdirs[0] unlink_sfile = False; if destdir != '/dev/null': # Create destination directory if it does not already exist destdir = ReplaceKeywords( destdir, sfile ) if not os.path.exists(destdir): try: print('++mkdir: ' + destdir ) os.makedirs( destdir ) except: print('ERROR makedirs: ' + destdir + ' (perhaps directory was made while I was getting ready to make it?)' ) print( e ) # Create hard link in destination directory linkname = destdir + '/' + os.path.basename(sfile) print('linking: ' + linkname + ' -> ' + sfile ) try: os.link( sfile, linkname ) Nlinks_created += 1 #unlink_sfile = True UpdateDB( sfile, linkname ) except Exception as e: print('ERROR linking: ' + linkname + ' -> ' + sfile ) print( e ) if os.path.samefile( sfile, linkname ): print('Existing link is already pointing to source file. Reconsidering this as a success.') Nlinks_created += 1 #unlink_sfile = True else: try: print('Existing link not hard link to source (according to python which seems to be wrong on this). Checking file sizes') s = os.statvfs(sfile) sfile_size = s.f_frsize*s.f_bfree s = os.statvfs(linkname) link_size = s.f_frsize*s.f_bfree if sfile_size == link_size: print('Sizes match. Reconsidering this as a success.') Nlinks_created += 1 except: print('ERROR trying to compare file sizes for: ' + sfile + ' and ' + linkname) else: sys.stdout.write('--/dev/null ') Nlinks_created += 1 #unlink_sfile = True # Unlink original file iff all links succeeded if Nlinks_created == len(staging_set['destinations']): print('unlinking: ' + sfile ) try: os.unlink( sfile ) except Exception as e: print('ERROR unlinking: ' + sfile ) print( e ) #============= DISTRIBUTE SETS ================ LINE_MAIN = getframeinfo(currentframe()).lineno for dist_set in distribute_sets: # Loop over source directories and form list of files files =[] for src in dist_set['sources']: files.extend( glob.glob(src) ) # Set counter for how many destinations we may still try sending to this round. # Assume dist_sets have mutually exclusive remote destinations. See comments below. dest_counter = len( dist_set['destinations'] ) # Print some useful info for the log to help with debugging if files: print('Distribute set has %d files:' % len(files) ) # Loop over files staged and ready for processing for sfile in files: if sfile in procs.keys(): print (' - ' + sfile + ' (in process by pid=' + str(procs[sfile]['proc'].pid) + ')') continue # this file is already being processed else: print (' - ' + sfile) if len(procs) >= MAX_PROCS: print( 'Max. processes running. Deferring ' + sfile + ' to next iteration ...') break # The dest_paths list is added to the dictionary of a subprocess we laucnh. # It is used in case a disk space alarm is raised to kill any processes that # are writing to the full disk. dest_paths = [] if len(dist_set['processes'])>0: # ------ process -------- cmd = dist_set['processes'][0] if 'process_out' in dist_set.keys(): dest_paths = [ReplaceKeywords(x, sfile) for x in dist_set['process_out']] # Directories/files this process will write to else: # ------ transfer -------- # Here we loop over destinations to try and find one with enough free # disk space to take the file we are about to send. Actually, we only # worry about this for remote transfers. We must loop here though since # the destinations may be a mixture of remote and local. # # Note that this is complicated by us not wanting to send multiple files # to the same destination simultaneously since there may not be enough # disk space for them all. Thus, for this dist_set and in this iteration # of the main loop, we want to make sure we loop through the destinations # at most once. This does not actually prevent files from being sent # to the same destination on the next iteration, but at least it gives # a 1 second lag plus the time it takes to ask all of the remote destinations # about free space which may be enough. try: required_space = os.path.getsize(sfile) + 1000000000 # required space is file size plus 1GB except: print(sys.exc_info()[0]) print('Failed to get file size for ' + sfile + ' (this should not be possible!)') continue for i in range(0, len( dist_set['destinations'] )): # Ensure each destination is considered only once this iteration of outer loop (see above) if dest_counter == 0: break dest_counter -= 1 if dist_set['idx'] >= len( dist_set['destinations'] ) : dist_set['idx'] = 0 dests = dist_set['destinations'][ dist_set['idx'] ] dist_set['idx'] += 1 destpath = dests[0] + '/' + os.path.basename(sfile) Ntransfers += 1 if ':' in destpath: # Use RDMA to transfer to remote host # Check first that there is space in the destination directory for this file mydestpath = ReplaceKeywords( destpath, sfile ) myhost = mydestpath.split(':')[0].strip() mydir = os.path.dirname( mydestpath.split(':')[1].strip() ) try: LINE_MAIN = getframeinfo(currentframe()).lineno free_space = SendCommand(myhost, 'get free space ' + mydir) LINE_MAIN = getframeinfo(currentframe()).lineno print('Response: '+ free_space) if int(free_space) < required_space: print('WARNING: not enough free space on remote destination: ' + mydestpath + ' (' + str(free_space) + ' < ' + str(required_space)+')') continue # try next destination except: print('Unable to confirm remote destination ' + myhost + ' has enough disk space or not. Skipping') continue # Remote size has enough space for us to send file cmd = ['hdrdmacp', '-d', '-P', sfile, destpath] # -d= delete file after transfer -P=make parent dirs else: # Use "cp" to copy file to another directory on the local node (can be different filesystem) # n.b. originally used rsync, but that turned out to be extremely slow, even with checksumming # off. We leave it here, but commented out for future reference # Use rsync to copy file to another directory on the local node (can be different filesystem) #cmd = ['rsync', '--remove-source-files', '-vv', sfile, destpath] cmd = ['cp', sfile, destpath] # Create destination directory if it does not already exist destdir = ReplaceKeywords( os.path.dirname(destpath), sfile ) dest_paths.append( destdir ) if not os.path.exists( destdir ): print('+++mkdir: ' + destdir) os.makedirs( destdir ) # Record additional destinations to hard link for when the rsync transfer is complete if len(dests)>1 : rsync_post_links[sfile] = {'orig':destpath, 'linknames':[x+ '/' + os.path.basename(sfile) for x in dests[1:]]} # Update DB, but only for transfers (not processes) LINE_MAIN = getframeinfo(currentframe()).lineno UpdateDB( sfile, ReplaceKeywords(destpath, sfile) ) LINE_MAIN = getframeinfo(currentframe()).lineno break # If we get here then we found a destination and do not need to loop over them LINE_MAIN = getframeinfo(currentframe()).lineno cmd = ReplaceKeywords( cmd, sfile ) print('cmd: ' + ' '.join(cmd)) proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) procs[sfile] = {'proc':proc, 'cmd':cmd, 'dest_paths':dest_paths, 'tstart':time.time()} if len(procs)==1 : transition_times[time.time()] = 'busy' last_cmd = cmd Nprocesses_started += 1 #========== Check for finished processes ========= LINE_MAIN = getframeinfo(currentframe()).lineno finished_proc_keys = [] for sfile,proc in procs.items() : if proc['proc'].poll() is not None : out = proc['proc'].communicate() print( out[0] ) print( out[1], file=sys.stderr ) returncode = proc['proc'].returncode print('external command -- Completed (returncode=' + str(returncode) +'): ' + ' '.join(proc['cmd'])) if returncode!=0 : RaiseProcessAlarm( proc['cmd'], returncode ) finished_proc_keys.append(sfile) # Make any additional hard links for local transfers if sfile in rsync_post_links.keys(): destpath = rsync_post_links[sfile]['orig'] destpath = ReplaceKeywords( destpath, sfile ) for linkname in rsync_post_links[sfile]['linknames']: linkname = ReplaceKeywords( linkname, sfile ) # Create destination directory if it does not already exist destdir = os.path.dirname(linkname) if not os.path.exists( destdir ): print('++++mkdir: ' + destdir) os.makedirs( destdir ) print('linking: ' + linkname + ' -> ' + destpath ) try: os.link( destpath, linkname ) except Exception as e: print('ERROR linking: ' + linkname + ' -> ' + sfile ) print( e ) rsync_post_links.pop(sfile) # Unlink input file to process (in case it wasn't deleted by process itself) # n.b. We do NOT unlink failed hdrdmacp commands since that could potentially # delete raw data. if os.path.exists( sfile ): if 'hdrdmacp' in proc['cmd']: print('not unlinking file for failed hdrdmacp command so it can be retried: ' + sfile) else: print('unlinking: ' + sfile ) try: os.unlink( sfile ) except Exception as e: print('ERROR unlinking: ' + sfile ) print( e ) # Remove finished procs from dictionary LINE_MAIN = getframeinfo(currentframe()).lineno for sfile in finished_proc_keys: procs.pop( sfile ) if len(procs)==0 : transition_times[time.time()] = 'idle' last_cmd = [] #========== Check if we're about to run out of memory ========= LINE_MAIN = getframeinfo(currentframe()).lineno mem = psutil.virtual_memory() mem_avail_GB = mem.available/1E9 if mem_avail_GB < 1.5: print('WARNING: AVAILABLE SYSTEM MEMORY IS TOO LOW (%3.1f<1.5GB). KILLING ALL SUBPROCESSES!' % mem_avail_GB) all_children = [] for sfile,proc in procs.items(): children = KillProc(proc['proc'].pid, sfile) all_children.extend( children ) gone, alive = psutil.wait_procs(all_children, timeout=5) print('killed %d processes (%d still lingering)' % (len(gone), len(alive))) procs = {} transition_times[time.time()] = 'idle' last_cmd = [] errors['OUT_OF_MEMORY'] = errors.get('OUT_OF_MEMORY',0) + 1 #========== Check for procs that need to time out ========= LINE_MAIN = getframeinfo(currentframe()).lineno all_children = [] unlinked_files = [] for sfile,proc in procs.items(): tdiff = time.time() - proc['tstart'] LINE_MAIN = getframeinfo(currentframe()).lineno if tdiff > TIMEOUT_PROCESS: LINE_MAIN = getframeinfo(currentframe()).lineno print('WARNING: Process timed out: ') try: LINE_MAIN = getframeinfo(currentframe()).lineno print(' command: ' + ' '.join(proc['cmd'])) except Exception as e: LINE_MAIN = getframeinfo(currentframe()).lineno print( e) LINE_MAIN = getframeinfo(currentframe()).lineno children = KillProc(proc['proc'].pid, sfile) LINE_MAIN = getframeinfo(currentframe()).lineno all_children.extend( children ) LINE_MAIN = getframeinfo(currentframe()).lineno unlinked_files.append( sfile ) # So we can remove these from procs later LINE_MAIN = getframeinfo(currentframe()).lineno if len(all_children) > 0: LINE_MAIN = getframeinfo(currentframe()).lineno gone, alive = psutil.wait_procs(all_children, timeout=5) print('killed %d processes (%d still lingering)' % (len(gone), len(alive))) errors['PROCESS_TIMEOUT'] = errors.get('PROCESS_TIMEOUT',0) + 1 for sfile in unlinked_files : del procs[sfile] LINE_MAIN = getframeinfo(currentframe()).lineno if len(procs) == 0 : LINE_MAIN = getframeinfo(currentframe()).lineno transition_times[time.time()] = 'idle' last_cmd = [] #========== Check for lingering files (mainly left from killed processes) ========= LINE_MAIN = getframeinfo(currentframe()).lineno for topdir,timeout in FILE_TIMEOUT.items(): for root, dirs, files in os.walk(topdir): for fname in files: mypath = os.path.join(root, fname) delta_t = time.time() - os.stat(mypath).st_mtime if delta_t > float(timeout): print('unlinking stale file last modified %3.1f seconds ago: %s' % (delta_t, mypath) ) os.unlink( mypath ) #========== Check if free disk space falls below alarm level ========= LINE_MAIN = getframeinfo(currentframe()).lineno for topdir,min_GB in ALARMS.items(): svfs = os.statvfs(topdir) free_GB = svfs.f_frsize*svfs.f_bfree/1.0E9 if free_GB < float(min_GB): RaiseDiskSpaceAlarm(topdir, free_GB, min_GB) #========= Calculate busy and idle times for last 5 minutes ========== LINE_MAIN = getframeinfo(currentframe()).lineno now = time.time() last_t = now tbusy = 0 tidle = 0 for t in reversed(sorted(transition_times)): #for t,type in transition_times.items(): type = transition_times[t] delta_t = last_t - t if (now - t) > twindow : delta_t = last_t - (now - twindow) if type == 'busy' : tbusy += delta_t if type == 'idle' : tidle += delta_t last_t = t if (now - t) > twindow : break tbusy_last = tbusy tidle_last = tidle # remove times older than 5 minutes LINE_MAIN = getframeinfo(currentframe()).lineno for t in sorted(transition_times): if t < last_t: transition_times.pop(t) #=============== Sleep =========================== LINE_MAIN = getframeinfo(currentframe()).lineno sys.stdout.flush() sys.stderr.flush() time.sleep(1)