#!/usr/bin/env python3 # # Launch the HOSS (Hall-D Online Skim System) # # This will read the hoss configurtion file to dertimine which hosts # need to run the hoss worker script. It will then launch it on all # of them via ssh. # # If the "-e" command line option is given then instead of starting # all of the processes, they will all be killed. # import faulthandler faulthandler.enable() import subprocess import argparse import time import sys import os from epics import caput #configfile = sys.argv[1] configfile = '/gluondaqfs/hdops/CDAQ/daq_dev_v0.31/daq/config/HOSS/hoss_default.config' all_hosts = set() mode = "starting" # valid values are "starting", "restarting", "ending". Set by -e and -r flags VERBOSE = 1 TARGET_HOSTS = [] #=================================================================================== # ParseConfigurationFile # # This will parse the config file and extract a list of all hosts mentioned in # it. If there are any entries in the global TARGET_HOSTS, then only hosts that # appear in both the configuration AND TARGET_HOSTS will be placed in the all_hosts # global, thus limiting which hosts the operation applies to. #=================================================================================== def ParseConfigurationFile( configfile ): global all_hosts, TARGET_HOSTS aliases = {} print('Reading configuration from: ' + configfile) with open(configfile) as f: for line in f.readlines(): if '#' in line: line = line[:line.find('#')] # Remove comments for name,value in aliases.items(): line = line.replace('%'+name, value) # Replace aliases if line.startswith('ALIAS:') : vals = line.split(maxsplit=2) aliases[vals[1]] = vals[2].strip() continue if line.startswith('stage:') or line.startswith('distribute:'): hosts = line.split(':')[1].strip().split(',') if TARGET_HOSTS: hosts = [x for x in hosts if x in TARGET_HOSTS] all_hosts.update([x.strip() for x in hosts]) #=================================================================================== # ParseCommandLineArgs #=================================================================================== def ParseCommandLineArgs(): global configfile, starting, mode, VERBOSE, TARGET_HOSTS parser = argparse.ArgumentParser() parser.add_argument('-e', help='Stop the hoss worker program(s)', action='store_true') parser.add_argument('-r', help='Restart the hoss worker program(s)', action='store_true') parser.add_argument('-v', help='increase verbosity level', action='store_true') parser.add_argument('-H', '--hosts', help='Specify which hosts the command should be limited to') parser.add_argument('configfile', help='HOSS configuration file to use', nargs='?', default = configfile) args = parser.parse_args() if args.e: mode = 'ending' if args.r: mode = 'restarting' if args.v: VERBOSE += 1 configfile = args.configfile if args.hosts: TARGET_HOSTS = args.hosts.split(',') #=================================================================================== # SendCommands # # Either launch or kill the hoss processes on all hosts depending on the # value of the start_procs argument. #=================================================================================== def SendCommands(start_procs = True): global VERBOSE # Form remote commands to run cmd = '' if start_procs: cmd = ['hdlog', '-c', '-s', '100000', '-r', '10', 'start_hoss_worker', configfile] func = 'starting' else: #cmd = ['pkill', '-f', '--signal', 'SIGKILL', '"start_hoss_worker|hd_data_flow.py"'] cmd = ['stop_hoss_worker'] func = 'stopping' # Prepend ssh and append redirection to /dev/null then run whole command in subprocess print('\nRunning remote commands:') procs = {} for h in sorted(all_hosts): ssh_cmd = ['ssh', h] + cmd + ['>&', '/dev/null', '&'] # ssh_cmd = ['ssh', h] + cmd print(' '.join(ssh_cmd)) proc = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) procs[proc] = h print('\n%s - Waiting for remote commands to detach:' % func) # Wait up to 10 seconds for all processes to finish start = time.time() while( (time.time() - start) < 10.0 ): finished_procs = [] for proc,host in procs.items(): if proc.poll() is not None : print(func + ' - finished: ' + host + ' (returncode=' + str(proc.returncode) + ')') outs = proc.communicate() if VERBOSE>1: print('stdout:\n' + str(outs[0])) print('stderr:\n' + str(outs[1])) finished_procs.append( proc ) for proc in finished_procs: procs.pop( proc ) # remove procs that are finished from list time.sleep(1) while procs: finished_procs = [] for proc,host in procs.items(): if proc.poll() is not None : print(func + ' - finished: ' + host + ' (returncode=' + str(proc.returncode) + ')') outs = proc.communicate() if VERBOSE>1: print('stdout:\n' + str(outs[0])) print('stderr:\n' + str(outs[1])) finished_procs.append( proc ) for proc in finished_procs: procs.pop( proc ) # remove procs that are finished from list running_procs = [host for proc,host in procs.items()] print(func+' - Still waiting on: ' + ','.join(running_procs)) time.sleep(1) #=================================================================================== # SetEPICSvar # # This just wraps the caput call in a try except block so if there is a problem # with setting the EPICS variable, it does not cause the whole script to fail. # # This also will only try setting the EPICS variable is TARGET_HOSTS is empty. # otherwis, it assumes the command is only being applied to a portion of the # whole system so setting the system level EPICS value doesn't make sense. #=================================================================================== def SetEPICSvar( name, val): global TARGET_HOSTS if len(TARGET_HOSTS)>0: print('Skipping setting of EPICS variable ' + name + ' due to only certain hosts being targeted') return try: caput(name, val) print('Set EPICS variable "' + name + '" to: ' + str(val)) except Exception as e: print('Unable to set EPICS variable "' + name + '" to: ' + str(val)) #========================================================================================== # ---- main ---- ParseCommandLineArgs() ParseConfigurationFile( configfile ) print('Found the following hosts: \n' + '\n'.join( sorted(all_hosts) )) print('') SetEPICSvar('HD:coda:daq:hoss_error', 0) if mode == 'starting': print('--- Starting all HOSS processes ---') SetEPICSvar('HD:coda:daq:hoss_status', b'starting') SendCommands(True) SetEPICSvar('HD:coda:daq:hoss_status', b'running') elif mode == 'restarting': print('--- Restarting all HOSS processes ---') SetEPICSvar('HD:coda:daq:hoss_status', b'restarting') SendCommands(False) SendCommands(True) SetEPICSvar('HD:coda:daq:hoss_status', b'running') elif mode == 'ending': print('--- Stopping all HOSS processes ---') SetEPICSvar('HD:coda:daq:hoss_status', b'stopping') SendCommands(False) SetEPICSvar('HD:coda:daq:hoss_status', b'stopped') else: print('--- Unknown mode: ' + str(mode)) print('\nDone.') # There is a bug in the epics module where it gives a segmentation fault in # in ca.py -> clear_subscription -> libca.ca_clear_subscription(event_id). # This is called in the atexit handler which tries to unsubscribe PVs # written to via the caput call. (The packages keeps connections open for # efficiency). To prevent the printing of the "Segmentation Fault" message, # we bypass the exit handler. os._exit(0)