#!/usr/bin/env python # # Script to start and stop the online monitoring processes for GlueX detector # # Usage: # start_monitoring [-e] [-sSOURCE] [node1 node2 ...] # # If no arguments are given, then the script will exam the current COOL # configuration to figure out the parameters needed to connect to the # ET system for the current run. It will launch processes on all nodes # found in the $DAQ_HOME/config/monitoring/nodes.conf file. # # If the "-e" option is given, then the processes are all killed # # If the "-sSOURCE" option is given, then the value "SOURCE" is passed # as the event source instead of the ET system obtained from COOL. This # can be used to specify another ET system or a file. # # If at least one node is specified (identified by arguments not starting # with either "-e" or "-s") then only nodes specified on the command line # are started or stopped. These do not have to correspond to nodes listed # in the nodes.conf file. # import sys import os import subprocess import glob import time # In case PYTHONPATH not set DAQ_HOME = os.getenv('DAQ_HOME') if DAQ_HOME!=None: sys.path.append('%s/tools/pymods' % DAQ_HOME) import coolutils # ----- Global parameters ----- TEST_MODE = False VERBOSE = 0 RUN = '' what = 'Starting' primary_source = None monnodes = {} # key=level val=[list of nodes] monconfigs = {} # key=level val=JANA config file monsources = {} # key=level val=event source (ET or file) etnodes = [] # list of tuples with (node, [levels], ETsource) archivenodes = [] timeseriesnodes = [] ainodes = [] aioutputdirs = {} # key=node value is directory to create symlinks in (e.g. .../hydra_in) hydranodes = [] hydramodelpaths = {} hydraoutputdirs = {} hydrainputdirs = {} hydrajointdirs = {} l3nodes = [] ETpeb_conn = '' ETseb_conn = '' ETer_conn = '' ET_FILENAME = '/tmp/et_sys_monitoring' ET_PORT = 11122 ET_NUMEVENTS = 50 ET_EVENTSIZE = 30000000 ET_CHUNKSIZE = 10 ET_NO_SECONDARY = False FILE_SOURCE_LOOP = False RUN_RSARCHIVER = False RUN_RSTIMESERIES = False RUN_RSAI = False RUN_HYDRA = False HYDRA_TEST_MODE = False #------------------------------ #----------------------- # Usage #----------------------- def Usage(show_detailed): print '' print ' Script to start and stop the online monitoring processes for Hall-D' print '' print ' Usage:' print ' start_monitoring [options] [node1 node2 ...]' print '' print '' print ' -h short help (prints this usage statement)' print ' --help long help (prints this plus some more details)' print ' -t Test mode. Print commands that would be executed but' print ' don\'t actually execute them' print ' -v Increase verbosity level' print ' -e stop processes on remote nodes rather than start them' print ' (default is to start them)' print ' -noET Ignore any secondary ET specified in nodes.conf file' print ' -sSOURCE use "SOURCE" as the primary event source instead of the' print ' ET system obtained from nodes.conf or COOL.' print ' -loop if SOURCE is a file, then loop continuously over it' print ' -Llevels Set monitoring levels to start/stop. "levels" is a' print ' comma separated list See below for examples' print ' -Rrun Set the run number used when starting RSArchiver and RSAI' print ' -A Start RSArchiver using the given run number' print ' (n.b. archive node must be specified in config. file)' print ' -T Start RSTimeSeries' print ' (n.b. timeseries node must be specified in config. file)' print ' -AI Start RSAI' print ' (n.b. ai node must be specified in config. file)' print ' -Hydra Start Hydra' print ' -testmode turn on test mode (currently just passed -t to start_hydra' print '' if not show_detailed : sys.exit(0); print '' print ' If no arguments are given, then the script will exam the current COOL' print ' configuration to figure out the parameters needed to connect to the' print ' ET system for the current run. It will launch processes on all nodes' print ' found in the $DAQ_HOME/config/monitoring/nodes.conf file. This file' print ' contains configuration info. including all monitoring nodes, levels,' print ' RSarchiver nodes, RSTimeSeries nodes, RSAI nodes, and secondary ET ' print ' system configuration.' print '' print ' If the "-e" option is given, then the processes are all killed (i.e.' print ' "ended"). Note that the stop_monitoring script just runs this script' print ' with the "-e" option.' print '' print ' If the "-sSOURCE" option is given, then the value "SOURCE" is passed' print ' as the event source instead of the ET system obtained from COOL. This' print ' can be used to specify another ET system or a file. An ET source is ' print ' specified using the standard JANA format. Namely:' print '' print ' ET:et_filename:station:host:port' print '' print ' One may also give an EVIO or HDDM filename. By default, if a secondary' print ' ET system is specified in the nodes.conf file and you do not pass "-noET"' print ' to this script, then the secondary ET system will be set up and events' print ' fed to it from a file using file2et. Note that for this case, you cannot' print ' use an HDDM file. To read from an HDDM file, you should specify "-noET"' print ' Keep in mind that if you are not using a secondary ET system and are' print ' reading from a file then all nodes will process the exact same events"' print ' so some repetition will occur. Also, it is assumed that reading from' print ' a file is primarily done for testing so the "-loop" option may be' print ' specified which will in turn add that to the command to start' print ' file2et so that it continuously re-opens the data file to read events.' print ' Note that "-loop" only works when using a secondary ET.' print '' print ' If the -Llevels argument is given, then "levels" is taken to be a comma' print ' separated list of the monitoring levels to start. For example, "-LOCCUPANCY"' print ' will only start OCCUPANCY level processes. These are designated in' print ' the nodes.conf file. If "-LOCCUPANCY,HIGHLEVEL" is given then both' print ' the OCCUPANCY and HIGHLEVEL levels are started. If no "-L" option ' print ' is given then all levels are started.' print '' print ' If at least one node is specified (identified by arguments not starting' print ' with "-") then only nodes specified on the command line' print ' are started or stopped. These do not have to correspond to nodes listed' print ' in the nodes.conf file. Keep in mind that all monitoring levels will' print ' then be started on each of these nodes which may result in multiple' print ' processes on each node. If you wish to specify certain nodes run only' print ' certain levels, you will need to either run this script multiple times' print ' with different parameters, or just edit the nodes.conf file.' print '' print ' The RSArchiver, RSTimeSeries, and RSAI programs may be run by specifying' print ' the "-A", "-T", and "-AI" flags repectively. For the RSArchiver and' print ' RSAI programs, the "-Rrun" option should be used to specify the run' print ' number. The nodes these program run on must be specified in the file ' print ' If a run number is specified via the "-R" argument then the RSArchiver' print ' $DAQ_HOME/config/monitoring/nodes.conf file as "archive", "timeseries",' print ' and "ai". For RSArchiver, the output directory and file name will be ' print ' based on the given run number as:' print ' ' print '/gluex/raid/rawdata/active/$RUN_PERIOD/rawdata/RunXXXXXX/monitoring/hdmon_onlineXXXXXX.root' print ' ' print ' where the XXXXXX is the 6 digit zero-padded run number. (zero padding' print ' is added by this script so don\'t include that in -R argument.) The' print ' monitoring directory will be created if it does not already exist.' print '' print ' The RSAI program will write the image files to the directory:' print '/gluonraid1/monitoring/AI/$RUN_PERIOD/RunXXXXXX' print '' print ' Use the "-t" option to print information on what would be run and where' print ' without actually running anything.' print '' sys.exit(0) #---------------------------- # mkdirpath # # routine that's equivalent to "mkdir -p path" in the shell. # specifically, all directories in the specified path are # made so that the full path exists. def mkdirpath(path): try: dirs = path.split('/') p = '' for dir in dirs: if dir == '': continue p += '/' + dir if not os.path.exists(p): os.mkdir(p) except Exception,e: print(str(e)) #----------------------- # CheckIfAlreadyRunning # # This will check if the specified command is already running on the # specified node. It will return the PIDs of all processes matching # the command. If no command is present, an empty list is returned. #----------------------- def CheckIfAlreadyRunning(remote_cmd, node): cmd_str ='' for tok in remote_cmd: cmd_str += tok + ' ' cmd_str = cmd_str[:-1] # chop off last space cmd = ['ssh', node, 'pgrep', '-f' , '"^' + cmd_str+'"'] try: result = subprocess.check_output(cmd) pids = result.split() return pids except subprocess.CalledProcessError: return [] #----------------------- # RemoveNodesWhereAlreadyRunning # # This will search a list of nodes for a specific command to make sure # it is not already running. If it is not, then the entry is copied to # an output list which is returned and can be used as a list of nodes/cmds # to run. This is done by running the pgrep command via ssh on each node. # To make it efficient, the ssh commands are all run in parallel and the # results gathered afterwards. #----------------------- def RemoveNodesWhereAlreadyRunning(cmds): print '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' print 'Checking for existing processes ...' filtered_cmds = [] procs = [] for (node, remote_cmd) in cmds: sys.stdout.write(' ') #print remote_cmd cmd_str ='' for tok in remote_cmd: cmd_str += tok + ' ' cmd_str = cmd_str[:-1] # chop off last space cmd = ['ssh', node, 'pgrep', '-f' , '"^' + cmd_str+'"'] print ' '.join(cmd) if VERBOSE>0 : cmd_str ='' for tok in cmd: cmd_str += tok + ' ' print cmd_str[:-1] try: # Note: we send stderr to a pipe here just to suppress all of the # the "X11 connection rejected..." and such type messages. They # don't affect the actual program being run and just clutter the # output of this script with confusing messages. If VERBOSE>0 so # the stderr is printed below. proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) procs.append((proc, (node, remote_cmd))) except subprocess.CalledProcessError: # If there was a problem checking, go ahead and add to output list # (not sure what best thing to do here is ??) filtered_cmds.append( (node, remote_cmd) ) # Wait for each remote process to finish and check it's output print '' for (proc, (node, remote_cmd)) in procs: print ' [] ' + node + ' ' + remote_cmd[0] (out, err) = proc.communicate() pids = out.split() if VERBOSE>0 and len(err)>0 : print err if len(pids) > 0 : print 'process already running on ' + node + ' for command: "' + ' '.join(remote_cmd) + '"' sys.stdout.write(' pids: ') print pids else: filtered_cmds.append( (node, remote_cmd) ) print ' %d of %d processes already running' % (len(cmds)-len(filtered_cmds), len(cmds)) return filtered_cmds #----------------------- # MapHostName # # The COOL configuration will probably list the host as something like # "gluonraid1". However, we want the monitoring processes to connect # via the infiniband interface which is dnsname "gluonraid1-ib". This # function allows us to substitute host names for ones found in the COOL # configuration. #----------------------- def MapHostName(node): if node == 'gluonraid1' : return 'gluonraid1-ib' if node == 'gluonraid2' : return 'gluonraid2-ib' if node == 'gluonraid3' : return 'gluonraid3-ib' if node == 'gluon46' : return '172.19.5.46' if node == 'gluon47' : return '172.19.5.47' if node == 'gluon48' : return '172.19.5.48' if node == 'gluon49' : return '172.19.5.49' return node #--------------------------------------------------------------------- # ReplaceKeywords # # This is called to replace any keywords in the given string. # Keywords all start with "@". No check is made for invalid keywords # so typos will be silently ignored. # # Valid keywords: # # @RUNPERIOD - Run Period (from RUNPERIOD envar.) # @RUNNUMBER - Run number extracted from sfile. zero padded to 6 digits #--------------------------------------------------------------------- def ReplaceKeywords(instr): global RUN if not RUN: RUNNUMBER = '%06d' % 0 else: RUNNUMBER = '%06d' % int(RUN) RUNPERIOD = os.getenv('RUN_PERIOD', 'RunPeriod000000-000') # Allow cmd to be either single string or list of strings if isinstance( instr, str ): mycmd = [instr] else: mycmd = instr mycmd = [x.replace('@RUNPERIOD', RUNPERIOD) for x in mycmd] mycmd = [x.replace('@RUNNUMBER', RUNNUMBER) for x in mycmd] if isinstance( instr, str ): return mycmd[0] # caller gave us single string so return single string else: return mycmd # caller gave us list so return a list #----------------------- # ReadCOOLconfiguration #----------------------- def ReadCOOLconfiguration(): global ETpeb_conn, ETseb_conn, ETer_conn # Require env. vars. be set if coolutils.COOL_HOME==None: print "You must set your COOL_HOME environment variable!" sys.exit(-1) if coolutils.EXPID==None: print "You must set your EXPID environment variable!" sys.exit(-1) if coolutils.CONFIG==None: print "You must set your CODA_CONFIG environment variable!" sys.exit(-1) print 'Reading COOL configuration from:' print ' %s' % coolutils.COOL_HOME print '' # Check that configuration was able to be read in if not coolutils.configDataValid: print 'Unable to read COOL configuration.' print 'make sure COOL_HOME, EXPID' print 'environment variables are set properly.' print 'You may also need to set CODA_CONFIG if' print 'the configuration can\t be extracted from' print 'COOL_HOME.' sys.exit(-1) #----------------------------------------------- # Make sure we don't have more than one PEB, SEB or ER if len(coolutils.pebs)+len(coolutils.sebs) > 1: print 'More than 1 PEB/SEB found!' sys.stdout.write('PEBs '); print coolutils.pebs sys.stdout.write('SEBs '); print coolutils.sebs if len(coolutils.pebs)>0: print 'Will use: %s' % coolutils.pebs[0] else: print 'Will use: %s' % coolutils.sebs[0] #----------------------------------------------- # ET system created by PEB if len(coolutils.pebs) > 0: PEB = coolutils.pebs[0] ETpeb = coolutils.GetETInfo(PEB) if ETpeb != None: ETpeb['host'] = MapHostName(ETpeb['host']) ETpeb_conn = 'ET:%s:MON:%s:%s' % (ETpeb['etName'], ETpeb['host'], ETpeb['port']) # ET system created by SEB if len(coolutils.sebs) > 0: SEB = coolutils.sebs[0] ETseb = coolutils.GetETInfo(SEB) if ETseb != None: if 'host' in ETseb and 'etName' in ETseb and 'port' in ETseb: ETseb['host'] = MapHostName(ETseb['host']) ETseb_conn = 'ET:%s:MON:%s:%s' % (ETseb['etName'], ETseb['host'], ETseb['port']) # ET system created by ER if len(coolutils.ers) > 0: ER = coolutils.ers[0] ETer = coolutils.GetETInfo(ER) if ETer != None: ETer['host'] = MapHostName(ETer['host']) ETer_conn = 'ET:%s:MON:%s:%s' % (ETer['etName'], ETer['host'], ETer['port']) print ' EXPID = %s' % coolutils.EXPID print ' SESSION = %s' % coolutils.SESSION print ' CONFIG = %s' % coolutils.CONFIG print '' print ' ET systems:' if len(ETpeb_conn)>0 : print ' PEB [%s]' % ETpeb_conn if len(ETseb_conn)>0 : print ' SEB [%s]' % ETseb_conn if len(ETer_conn )>0 : print ' ER [%s]' % ETer_conn print '' #----------------------- # ReadNodesConfiguration #----------------------- def ReadNodesConfiguration(): global monnodes, archivenodes, timeseriesnodes, ainodes, hydranodes, l3nodes, etnodes, ET_FILENAME global aioutputdirs, hydramodelpaths, hydraoutputdirs, hydrainputdirs, hydrajointdirs # If user did not specify which nodes to run on, get them from config. file fname = '%s/config/monitoring/nodes.conf' % DAQ_HOME if not os.path.exists(fname): print '' print 'The file "%s" does not exist!' % fname print 'This is required so we know which nodes to run on!' print '' print 'File format should have one line for each node. Each line' print 'should contain a process type, and node name followed by' print 'any other parameters that may required by the process type.' print 'e.g.' print ' mon gluon104 1' print ' mon gluon105 1' print ' mon gluon106 2' print ' ET gluon48 1,2' print '' print ' archive gluon111' print '' print ' L3 gluon100' print ' L3 gluon101' print '' sys.exit(-1) print 'Reading monitoring node assignments from:' print ' %s' % fname print '' file = open(fname, 'r') for line in file: vals = line.split() if line.find('#') == 0 : continue # skip comment lines if len(vals)==0 : continue # skip empty lines bad_Nvals = 0 # ---- mon if vals[0]=='mon' : if len(vals)<3 : bad_Nvals = 3 else: node = vals[1] levels = vals[2].split(',') for level in levels : if level not in monnodes : monnodes[level] = [] # tell python the values in this dictionary are arrays monnodes[level].append(node) # ---- archive if vals[0]=='archive' : if len(vals)<2 : bad_Nvals = 2 else: node = vals[1] archivenodes.append(node) # ---- timeseries if vals[0]=='timeseries' : if len(vals)<2 : bad_Nvals = 2 else: node = vals[1] timeseriesnodes.append(node) # ---- ai if vals[0]=='ai' : if len(vals)<2 : bad_Nvals = 2 else: node = vals[1] ainodes.append(node) if len(vals) >2: aioutputdirs[node] = ReplaceKeywords( vals[2] ) # ---- hydra if vals[0]=='hydra' : if len(vals)<2 : bad_Nvals = 2 else: node = vals[1] hydranodes.append(node) if len(vals) >2: hydramodelpaths[node] = vals[2] if len(vals) >3: hydraoutputdirs[node] = vals[3] if len(vals) >4: hydrainputdirs[node] = vals[4] if len(vals) >5: hydrajointdirs[node] = vals[5] # ---- ET if vals[0]=='ET' : if len(vals)<3 : bad_Nvals = 3 else: node = vals[1] if ':' in node : (node,ET_FILENAME) = node.split(':') levels = vals[2].split(',') sourceET = None if len(vals) > 3 : sourceET = vals[3] etnodes.append((node, levels,sourceET)) # ---- L3 if vals[0]=='L3' : if len(vals)<2 : bad_Nvals = 2 else: node = vals[1] l3nodes.append(node) if bad_Nvals > 0: print 'ERROR: Bad format in config. file. The following line should' print ' have at least %d values but it has %d' % (bad_Nvals, len(vals)) print line sys.exit(-1) #----------------------- # SendRemoteCommand # # Send a remote command. If the gloabal "what" variable is set to "Stopping" # then send a pkill command using the specified "sig" which should be something # like "-HUP" or "-KILL". Otherwise, send the complete command to start the # process, using the hdlog command on the remote computer to log the output. #----------------------- def SendRemoteCommand(node, cmd): # form single string of command with arguments cmd_str = cmd[0] for c in cmd[1:]: cmd_str = '%s %s' % (cmd_str, c) # form list containing full ssh hdlog command if cmd[0]=='hdmon': # respawn hdmon processes up to 10 times ssh_cmd = ['ssh', node, 'hdlog', '-c', '-r', '10', '-s', '100000'] + cmd + ['>&', '/dev/null'] # elif cmd[0]=='RSAI': # ssh_cmd = ['ssh', node, 'hdlog', '-c', '-s', '100000', '--coredumpsize', '1000000000'] + cmd + ['>&', '/dev/null'] else: ssh_cmd = ['ssh', node, 'hdlog', '-c', '-s', '100000'] + cmd + ['>&', '/dev/null'] # print full ssh command to screen ssh_cmd_str = ssh_cmd[0] for c in ssh_cmd[1:]: ssh_cmd_str = '%s %s' % (ssh_cmd_str, c) print ' ' + ssh_cmd_str # optionally execute ssh hdlog command if not TEST_MODE :subprocess.Popen(ssh_cmd) #----------------------- # SendRemoteKillCommand # # Send a remote pkill command to kill the specified command on the specified node. # The value of "sig" should be something like "-HUP" or "-KILL". Note that "cmd" # should be a list containing the command and arguments. The entire list is used # to form the command string to match. If you only wish to match say, the command # and first two arguments, call tlike this: # # SendRemoteKillCommand(node, cmd{:2], '-KILL') # #----------------------- def SendRemoteKillCommand(node, cmd, sig='-HUP', pkill_opts=[]): # form single string of command with arguments cmd_str = ' '.join(cmd) # form list containing full ssh pkill command ssh_cmd = ['ssh', node, 'pkill', sig, '-f', '"^%s"' % cmd_str] if len(pkill_opts)>0 : ssh_cmd.extend(pkill_opts) # print full ssh command to screen ssh_cmd_str = ssh_cmd[0] for c in ssh_cmd[1:]: ssh_cmd_str = '%s %s' % (ssh_cmd_str, c) # Explicitly kill the hdlog command as well so that it does not respawn the # actual command. In principle, this should be handled in hdlog via the # way the program exists. In practice, it is not and the process gets # respawned after we kill it here. # Another annoyance is that the pkill command regular expression does not # handle wildcards in the expected way when trying to match commands that # start with "hdlog" and have something like "hdmon" in them. It seems # we are relegated to providing *exactly* the start of the command which # means we have to reproduce the hdlog options from SendRemoteCommand # here. Sigh. if cmd[0]=='hdmon': # respawn hdmon processes up to 10 times hdlog_cmd = ['hdlog', '-c', '-r', '10', '-s', '100000'] + cmd else: hdlog_cmd = ['hdlog', '-c', '-s', '100000'] + cmd ssh_hdlog_cmd = ['ssh', node, 'pkill', sig, '-f', '"^%s"' % ' '.join(hdlog_cmd)] if len(pkill_opts)>0 : ssh_hdlog_cmd.extend(pkill_opts) # optionally execute ssh pkill command print ssh_hdlog_cmd print ssh_cmd_str if not TEST_MODE : subprocess.Popen(ssh_hdlog_cmd) subprocess.Popen(ssh_cmd) #----------------------- # ParseCommandLineArguments #----------------------- def ParseCommandLineArguments(): #global what, source, user_specified_nodes, mon_levels, TEST_MODE, VERBOSE, RUN, DAQ_HOME global TEST_MODE, DAQ_HOME, RUN, ET_NO_SECONDARY, etnodes, monnodes, monconfigs, monsources, primary_source global what, VERBOSE, RUN_RSARCHIVER, RUN_RSTIMESERIES, RUN_RSAI, archivenodes, timeseriesnodes, ainodes global FILE_SOURCE_LOOP, RUN_HYDRA, HYDRA_TEST_MODE # Parse the command line arguments leaving results in global parameters user_specified_source = None user_specified_levels = [] user_specified_nodes = [] for arg in sys.argv[1:]: if arg == '-h' : Usage(False) # short usage statement elif arg == '--help' : Usage(True) # detailed usage statement elif arg == '-t' : TEST_MODE = True elif arg == '-v' : VERBOSE += 1 elif arg == '-e' : what = 'Stopping' elif arg == '-noET' : ET_NO_SECONDARY = True elif arg == '-loop' : FILE_SOURCE_LOOP = True elif arg == '-A' : RUN_RSARCHIVER = True elif arg == '-T' : RUN_RSTIMESERIES = True elif arg == '-AI' : RUN_RSAI = True elif arg == '-Hydra' : RUN_HYDRA = True elif arg == '-testmode' : HYDRA_TEST_MODE = True elif arg.startswith('-R') : RUN = arg[2:] elif arg.startswith('-s') : user_specified_source = arg[2:] elif arg.startswith('-L') : user_specified_levels = arg[2:].split(',') elif not arg.startswith('-' ) : user_specified_nodes.append(arg) else: print 'Unknown option "' + arg + '"!' sys.exit(-1) # Read in nodes configuration from $DAQ_HOME/config/monitoring/nodes.conf ReadNodesConfiguration() # Read in COOL configuration ReadCOOLconfiguration() # Check auxillary programs if RUN_RSARCHIVER: if RUN=='': print 'WARNING: -A given but -Rrun not. RSArchiver will not be run' if len(archivenodes)==0: print 'WARNING: -A given but no nodes specified in config. file. RSArchiver will not be run' if RUN_RSTIMESERIES: if len(timeseriesnodes)==0: print 'WARNING: -T given but no nodes specified in config. file. RSTimeSeries will not be run' if RUN_RSAI: if RUN=='': print 'WARNING: -AI given but -Rrun not. RSAI will not be run' if len(ainodes)==0: print 'WARNING: -AI given but no nodes specified in config. file. RSAI will not be run' if RUN_HYDRA: if RUN=='': print 'WARNING: -Hydra given but -Rrun not. RSAI will not be run' if len(hydranodes)==0: print 'WARNING: -Hydra given but no nodes specified in config. file. Hydra will not be run' if RUN!='' and len(hydranodes)>0 and not RUN_RSAI: print 'WARNING: -Hydra given, but RSAI is not being run. You may not get the results you\'re looking for!' # List of levels if len(user_specified_levels)>0 : monlevels = user_specified_levels # use command-line levels if specified for level in monnodes.keys() : if level not in monlevels: del monnodes[level] else : monlevels = monnodes.keys() # use nodes.conf levels otherwise # List of nodes # if user specified which nodes to use, then assign all levels # to all of them. Otherwise, just leave what's been set from nodes.conf if len(user_specified_nodes)>0 : monnodes ={} for level in monlevels: monnodes[level] = user_specified_nodes # This is used to flag any errors so all errors may be reported before exiting all_good = True # Make sure a JANA config file exists for every level we're going to start for level in monnodes.keys() : config_filename = '%s/config/monitoring/hdmon%s.conf' % (DAQ_HOME, level) if os.path.exists(config_filename) : monconfigs[level] = config_filename else : print 'Level "%s" config. file doesn\'t exist! (%s)' % (level, config_filename) all_good = False # Determine the primary source of events. This will either # feed the monitoring nodes directly or it will feed them # through one or more secondary ET systems if len(ETer_conn )> 0 : primary_source = ETer_conn elif len(ETpeb_conn)> 0 : primary_source = ETpeb_conn elif len(ETseb_conn)> 0 : primary_source = ETseb_conn if user_specified_source != None: primary_source = user_specified_source if primary_source == None : print 'WARN: primary event source not specified! Source was not specified' print ' on the command line and I could not find one in the COOL configuration!\n' print ' Assuming CDAQ is running. Will try to feed monitoring from file.' primary_source = 'CDAQ:' #all_good = False # Determine the sources that the monitoring nodes will use. Each # level will have a source specified. This can be (in order of priority): # # 1. user-specified on command line (could be file) # 2. secondary ET system specified in nodes.conf # 3. CODA ET system determined from COOL configuration # a. ER created # b. PEB created # c. SEB created if ET_NO_SECONDARY : etnodes = [] # allow user to supress use of secondary ET systems for (node, levels, sourceET) in etnodes : for level in levels : host = MapHostName(node) source = 'ET:%s:MON_%s:%s:%d' % (ET_FILENAME, level, host, ET_PORT) if sourceET != None : source = sourceET # use source explicitly set in nodes.conf if it was set #if user_specified_source != None: source = user_specified_source # allow user to override monsources[level] = source # Use primary source as the source for any levels that are not # already set (i.e. weren't set by ET line in nodes.conf) # If primary_source was generated from COOL configuration, modify the # default station name "MON" to include the level. for level in monlevels : if primary_source : source = primary_source if user_specified_source==None: source = source.replace(':MON:', ':MON_%s:' % level) if level not in monsources.keys(): monsources[level] = source else: print 'primary_source not defined!' all_good = False # If errors were detected, exit now if not all_good : print '\nRe-run with "--help" for full usage statement\n' sys.exit(-1) #----------------------------------------------- # :::::::::: Main script starts here :::::::::: print '' print '----------------- Hall-D Online Monitoring Utility -------------' # Parse command line and read in configurations from various files ParseCommandLineArguments() # Tell user what we're going to do print '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' print 'nodes :' for level, nodes in monnodes.iteritems(): print(' %2d monitoring nodes for level "%s":' % (len(nodes), level)) sys.stdout.write(' ') print nodes print '' print 'Secondary ET systems (%d):' % len(etnodes) for (node, levels, ETsource) in etnodes : sys.stdout.write(' %s levels: ' % node) print levels print '' print 'archiver nodes (%d):' % len(archivenodes) sys.stdout.write(' ') print archivenodes print '' print 'timeseries nodes (%d):' % len(timeseriesnodes) sys.stdout.write(' ') print timeseriesnodes print '' print 'ai nodes (%d):' % len(ainodes) sys.stdout.write(' ') print ainodes print '' # Form commands to be run (or killed) on remote nodes cmds = [] #--- Hydra (takes longest to start up so do it first) if RUN_HYDRA and RUN != '' : for node in hydranodes: hydra_install = os.getenv('HYDRA_INSTALL','/group/halld/hydra') cmd = ['/gluex/etc/crontabs/tcsh_hydra_env', 'python3', '/group/halld/hydra/utils/start_hydra.py'] cmd += ['-R', os.getenv('RUN_PERIOD','RunPeriod-default')] cmd += ['-r', str(int(RUN))] cmd += ['--config', hydra_install+'/Hydra.cfg'] if node in hydramodelpaths.keys(): cmd += ['-mp', hydramodelpaths[node]] if node in hydraoutputdirs.keys(): cmd += ['-ol', hydraoutputdirs[node]] if node in hydrainputdirs.keys() : cmd += ['-i' , hydrainputdirs[node]] if node in hydrajointdirs.keys() : cmd += ['-jd', hydrajointdirs[node]] if HYDRA_TEST_MODE : cmd += ['-t'] cmds += [(node, cmd)] # hydra manages its own subprocesses so we must tell the master script to kill them if what == 'Stopping' : cmd = ['ssh', node, cmd[0], cmd[1], '-e'] # Run just start_hydra.py -e print('Telling hydra to kill subprocesses: ' + ' '.join(cmd)) subprocess.call(cmd) #--- et_start, et2et for (node, levels, ETsource) in etnodes : cmd = ['et_start'] cmd.extend(['-f', ET_FILENAME, '-n', str(ET_NUMEVENTS), '-s', str(ET_EVENTSIZE), '-p', str(ET_PORT)]) cmds += [(node, cmd)] if primary_source.startswith('ET:'): cmd = ['et2et'] cmd.extend([primary_source, 'ET:%s::%s:%s' % (ET_FILENAME, node, ET_PORT), '--chunk', str(ET_CHUNKSIZE)]) elif primary_source.startswith('CDAQ:'): cmd = ['start_cdaq2mon'] else: cmd = ['file2et'] if FILE_SOURCE_LOOP : cmd.extend(['-loop']) cmd.extend(['-n', '3', '-f', ET_FILENAME, primary_source]) cmds += [(node, cmd)] #--- hdmon for level, nodes in monnodes.iteritems(): cmd = ['hdmon'] cmd.append('--config=%s' % monconfigs[level]) if RUN != '' : cmd.append('-PRUNNUMBER=%d' % int(RUN)) cmd.append(monsources[level]) for node in nodes : cmds += [(node, cmd)] #--- RSArchiver if RUN_RSARCHIVER and RUN != '' : archivedir = '/gluex/data/rawdata/curr/rawdata/active/%s/rawdata/Run%06d/monitoring' % (os.getenv('RUN_PERIOD','RunPeriod-default'), int(RUN)) #archivedir = '/gluex/raid/rawdata/active/%s/rawdata/Run%06d/monitoring' % (os.getenv('RUN_PERIOD','RunPeriod-default'), int(RUN)) #archivedir = '/home/davidl/HallD/Studies/2014.10.22.RSArchiver/gluex/raid/rawdata/active/%s/rawdata/Run%06d/monitoring' % (os.getenv('RUN_PERIOD','RunPeriod-default'), int(RUN)) archivefile = '%s/hdmon_online%06d.root' % (archivedir, int(RUN)) print 'mkdirpath %s' % archivedir if not TEST_MODE : mkdirpath(archivedir) for node in archivenodes: cmd = ['RSArchiver', '-B', '-F', archivefile] cmds += [(node, cmd)] #--- RSTimeSeries if RUN_RSTIMESERIES: for node in timeseriesnodes: cmd = ['RSTimeSeries'] cmds += [(node, cmd)] #--- RSAI if RUN_RSAI and RUN != '' : for node in ainodes: aidir = aioutputdirs[node] + '/%s/Run%06d' % (os.getenv('RUN_PERIOD','RunPeriod-default'), int(RUN)) print 'mkdirpath %s' % aidir if not TEST_MODE : mkdirpath(aidir) cmd = ['RSAI', '-d', aidir] # The hydrainputdirs comes from the hydra configuration line which # my specify a different node than where RSAI is run. Thus, just # look for any hydrainputdirs for any node and add symlinks to those. for n, hydrainputdir in hydrainputdirs.items(): mydir = hydrainputdir + '/%s/Run%06d' % (os.getenv('RUN_PERIOD','RunPeriod-default'), int(RUN)) if not os.path.exists(mydir): print 'mkdirpath %s' % mydir if not TEST_MODE : mkdirpath(mydir) cmd += ['-l', mydir] cmds += [(node, cmd)] # Check if remote processes are already running, but only if # we're trying to start them. Remove any from the list of # commands that already appear to be running.s if what == 'Starting' : cmds = RemoveNodesWhereAlreadyRunning(cmds) print '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' print 'Sending remote commands:' if what == 'Stopping' : print 'Sending HUP signal to all remote processes ...' for node,cmd in cmds: SendRemoteKillCommand(node, cmd, '-HUP', ['-n']) # -n option tells pkill to only kill newest (leaving hdlog and tcsh processes alone time.sleep(6) print 'Sending INT signal to all remote processes ...' for node,cmd in cmds: SendRemoteKillCommand(node, cmd, '-INT', ['-n']) time.sleep(6) print 'Sending KILL signal to all remote processes ...' for node,cmd in cmds: SendRemoteKillCommand(node, cmd, '-KILL') print 'Sending general kill command to clear out defunct processes' for node,cmd in cmds: SendRemoteKillCommand(node, [cmd[0]], '-KILL') print 'Removing ET system files' for (node, levels, ETsource) in etnodes : SendRemoteCommand(node, ['rm', '-f', ET_FILENAME]) else: # Starting last_cmd = '' for node,cmd in cmds: if last_cmd.startswith('et_start') : time.sleep(5) # give ET processes 5 seconds to start up #if last_cmd.startswith('et2et') : time.sleep(5) # give ET processes 5 seconds to start up last_cmd = cmd[0] SendRemoteCommand(node, cmd) print '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' if TEST_MODE : print 'TEST MODE: No commands were actually run on remote nodes (other than checking for existing processes)'