#!/usr/bin/env python # # Submit jobs to run at IU BigRed3 via swif2 # # See more detailed documentation here: # https://halldweb.jlab.org/wiki/index.php/HOWTO_Execute_a_Launch_using_IU # # This will run commands submitting several recon jobs # with the run/file numbers hardcoded into this script. # Here is how this is supposed to work: # # launch_iu.py # | # |-> swif2 add-job (this will eventually run sbatch at bridges ...) # | # |-> run_singularity.sh # | # |-> singularity # | # |-> script_iu.sh (this is run from inside singularity) # | # |-> hd_root # # This will run swif2 with *some* of the SBATCH(slurm) options # passed via command line. This includes the singularity image # that should be used. swif2 will then take care of getting # the file from tape and transferring it to IU. Once # the file is there, it will submit the job to BigRed3 slurm. # # When the job wakes up, it will be in a subdirectory of the # storage disk that swif2 has already setup. # This directory will contain a symbolic link pointing # to the raw data file which is somewhere else on the disk. # # The container will run the /launch/script_iu.sh script # where /launch has been mounted in the container from the # "launch" directory in the project directory. The jana # config file is also kept in the launch directory. # # The container will also have ${HOME}/gluex/group/halld mounted. # BigRed3 does not support cvmfs so the appropriate directories # are rsync'd from JLab manually. This makes the GlueX software # available. The script_iu.sh script will use this to setup the # environment and then run hd_root using the # /launch/jana_recon_nersc.config file. # # A couple of more notes: # # 1. The CCDB and RCDB used comes from a sqlite files in # ${HOME}/gluex/group. The ccdb.sqlite file is copied to # the RAM disk /dev/shm for use. The rcdb.sqlite file is # copied to /tmp. Both are removed at the end of the job. # The timestamp used is hardcoded in: # /launch/jana_recon_nersc.config # # 2. SLURM requires that the program being run is actually # a script that starts with #!/XXX/YYY . It is actually a # SLURM script where additional SLURM options could be set. # We do not put them there though. All SLURM options are # passed via the sbatch command swif2 runs and that we specify # here. The /launch/script_iu.sh script is trivial and only # runs shifter passing any arguments we give to it here in # the swif2 command. # # 3. The output directory is created by this script # to allow group writing since the files are copied using # the davidl account on globus but swif2 is being run from # one of the gxproj accounts. # # import subprocess import math import glob import sys import os # mysql.connector not available via system and must come via PYTHONPATH if not os.getenv('PYTHONPATH') : sys.path.append('/group/halld/Software/builds/Linux_CentOS7-x86_64-gcc4.8.5/ccdb/ccdb_1.06.06/python') import mysql.connector #TESTMODE = True # True=only print commands, but don't actually submit jobs TESTMODE = False # True=only print commands, but don't actually submit jobs VERBOSE = 3 # 1 is default RUNPERIOD = '2019-11' LAUNCHTYPE = 'recon' # 'offmon' or 'recon' VER = '01' BATCH = 'BATCHNB' WORKFLOW = LAUNCHTYPE+'_'+RUNPERIOD+'_ver'+VER+'_catchup_batch'+BATCH+'_IU' NAME = 'GLUEX_' + LAUNCHTYPE #RCDB_QUERY = '@is_2018production and @status_approved' # Comment out for all runs in range MINRUN-MAXRUN RCDB_QUERY = '@is_dirc_production and @status_approved' # Comment out for all runs in range MINRUN-MAXRUN RUNS = [] # List of runs to process. If empty, MINRUN-MAXRUN are searched in RCDB MINRUN = RUNNB MAXRUN = RUNNB MINFILENO = FILENB # Min file number to process for each run (n.b. file numbers start at 0!) MAXFILENO = FILENB # Max file number to process for each run (n.b. file numbers start at 0!) FILE_FRACTION = 1.0 # Fraction of files to process for each run in specified range (see GetFileNumbersToProcess) MAX_CONCURRENT_JOBS = '400' # Maximum number of jobs swif2 will have in flight at once EXCLUDE_RUNS = [] # Runs that should be excluded from processing TIMELIMIT = '6:00:00' # Set time limit (based on scaling by this year's to last year's Cori II time limits ) QOS = 'general' # BigRed partition ('general' or 'debug') LAUNCHDIR = '/N/u/hpg/BigRed3/gluex/work/DATEDIR.recon_2019-11_ver01_catchup_batchBATCHNB'+'/launch' # will get mapped to /launch in singularity container IMAGE = '/N/u/hpg/BigRed3/gluex/group/halld/www/halldweb/html/dist/gluex_docker_devel.simg' RECONVERSION = 'halld_recon/halld_recon-4.19.0' # must exist in /group/halld/Software/builds/Linux_CentOS7-x86_64-gcc4.8.5-cntr SCRIPTFILE = '/launch/script_iu.sh' CONFIG = '/launch/jana_'+LAUNCHTYPE+'_nersc_BATCHNB.config' # yes, we use the same config file as NERSC RCDB_HOST = 'hallddb.jlab.org' RCDB_USER = 'rcdb' RCDB = None BAD_RCDB_QUERY_RUNS = [] # will be filled with runs that are missing evio_file_count field in RCDB query BAD_FILE_COUNT_RUNS = [] # will be filled with runs where number of evio files could not be obtained by any method BAD_MSS_FILE_RUNS = {} # will be filled with runs/files where the stub file in /mss is missing # Set output directory depending on launch type if LAUNCHTYPE=='offmon': OUTPUTTOP = 'mss:/mss/halld/halld-scratch/offline_monitoring/RunPeriod-'+RUNPERIOD+'/ver'+VER # prefix with mss: for tape or file: for filesystem elif LAUNCHTYPE=='recon': OUTPUTTOP = 'mss:/mss/halld/RunPeriod-'+RUNPERIOD+'/recon/ver'+VER else: print 'Unknown launch type "'+LAUNCHTYPE+'"! Don\'t know where to put output files!' sys.exit(-1) #---------------------------------------------------- def MakeJob(RUN,FILE): global NUM, DIRS_CREATED JOB_STR = '%s_%06d_%03d' % (NAME, RUN, FILE) EVIOFILE = 'hd_rawdata_%06d_%03d.evio' % (RUN, FILE) MSSFILE = '/mss/halld/RunPeriod-%s/rawdata/Run%06d/%s' % (RUNPERIOD, RUN, EVIOFILE) # Verify stub file exists before submitting job if not os.path.exists( MSSFILE ): if RUN not in BAD_MSS_FILE_RUNS.keys(): BAD_MSS_FILE_RUNS[RUN] = [] BAD_MSS_FILE_RUNS[RUN].append(FILE) return NUM['files_submitted'] += 1 # The OUTPUTDIR variable is a fully qualified path used to pre-create # the output directories for the job files. If the files are going to # /mss, then we must replace the mss:/mss part at the beginning with # /lustre/expphy/cache. Otherwise, just use the path as given in OUTPUTTOP OUTPUTDIR = OUTPUTTOP.split(':',1)[1] # just directory part if OUTPUTTOP.startswith('mss:/mss'): OUTPUTDIR = OUTPUTDIR.replace('/mss','/lustre/expphy/cache') # Get list of output file names and mappings to final directory and file name. # The outfiles variable is a map of local file(key) to output file with path(value) # The path is relative to the OUTPUTDIR directory. if LAUNCHTYPE == 'recon': outfiles = ReconOutFiles(RUN, FILE) elif LAUNCHTYPE == 'offmon': outfiles = OffmonOutFiles(RUN, FILE) else: print 'Unknown launch type (' + LAUNCHTYPE + ')! Unable to form output file list' # Get list of output directories so we can pre-create them with proper # permissions. Normally, we wouldn't have to make the directories, but if using # a Globus account with a different user than the one running swif2, # there will be permissions errors otherwise. outdirs = [] for (infile, outpath) in outfiles.iteritems(): if infile.startswith('match:'): # If using wildcards, the outputpath already is the output directory outdirs.append(outpath) else: outdirs.append(os.path.dirname(outpath)) # Pare down list of outdirs to only those that don't already exist new_outdirs = [x for x in outdirs if x not in DIRS_CREATED] # Set umask to make directories group writable (but not world writable) os.umask(0002) # Create output directories at JLab for d in new_outdirs: mydir = OUTPUTDIR + '/' + d if not os.path.exists(mydir) : if VERBOSE > 1: print('mkdir -p ' + mydir) if not TESTMODE: os.makedirs(mydir) DIRS_CREATED.append(mydir) # SLURM options SBATCH = ['-sbatch'] SBATCH += ['-N', '1'] # Number of nodes requested (per job) SBATCH += ['--tasks-per-node=48'] # Number of cores allocated per node SBATCH += ['-t', '%s' % TIMELIMIT] # Amount of wall time requested SBATCH += ['-p', QOS] # Big Red partition # Command for job to run CMD = ['%s/run_singularity_iu.sh' % LAUNCHDIR] CMD += [LAUNCHDIR] # arg 1: full path to "launch" dir (at BigRed) CMD += [IMAGE] # arg 1: full path to singularity image (at BigRed) CMD += [SCRIPTFILE] # arg 1: script to run in singularity (e.g. script_iu.sh) CMD += [CONFIG] # arg 1: JANA config file (relative to LAUNCHDIR) CMD += [RECONVERSION] # arg 2: halld_recon version CMD += [str(RUN)] # arg 3: run <--+ run and file number used to name job_info CMD += [str(FILE)] # arg 4: file <--+ directory only. # Make swif2 command SWIF2_CMD = ['swif2'] SWIF2_CMD += ['add-job'] SWIF2_CMD += ['-workflow', WORKFLOW] SWIF2_CMD += ['-name', JOB_STR] SWIF2_CMD += ['-input', EVIOFILE, 'mss:'+MSSFILE] for src,dest in outfiles.iteritems(): SWIF2_CMD += ['-output', src, OUTPUTTOP + '/' + dest] SWIF2_CMD += SBATCH + ['::'] + CMD # Print commands if VERBOSE > 1: if VERBOSE > 2 : print ' '.join(SWIF2_CMD) elif VERBOSE > 1 : print ' --- Job will be created for run:' + str(RUN) + ' file:' + str(FILE) NUM['jobs_to_process'] += 1 if not TESTMODE: subprocess.check_call(SWIF2_CMD) NUM['jobs_submitted'] += 1 #---------------------------------------------------- def OffmonOutFiles(RUN, FILE): # WARNING: Additions here need to also be added to script_nersc_aggregator.sh ! # Return list of output directory/filename mappings for a # offline monitoring job. # Map of local file(key) to output file(value) RFSTR = '%06d_%03d' % (RUN, FILE) outfiles = {} outfiles['job_info_%s.tgz' % RFSTR ] = 'job_info/%06d/job_info_%s.tgz' % (RUN, RFSTR) outfiles['match:converted_random*.hddm' ] = 'converted_random/%06d' % (RUN) outfiles['dana_rest.hddm' ] = 'REST/%06d/dana_rest_%s.hddm' % (RUN, RFSTR) outfiles['hd_root.root' ] = 'hists/%06d/hd_root_%s.root' % (RUN, RFSTR) #outfiles['tree_bcal_hadronic_eff.root' ] = 'tree_bcal_hadronic_eff/%06d/tree_bcal_hadronic_eff_%s.root' % (RUN, RFSTR) #outfiles['tree_fcal_hadronic_eff.root' ] = 'tree_fcal_hadronic_eff/%06d/tree_fcal_hadronic_eff_%s.root' % (RUN, RFSTR) #outfiles['tree_PSFlux.root' ] = 'tree_PSFlux/%06d/tree_PSFlux_%s.root' % (RUN, RFSTR) #outfiles['tree_trackeff.root' ] = 'tree_trackeff/%06d/tree_trackeff_%s.root' % (RUN, RFSTR) #outfiles['tree_p2k_dirc.root' ] = 'tree_p2k_dirc/%06d/tree_p2k_dirc_%s.root' % (RUN, RFSTR) #outfiles['tree_p2pi_dirc.root' ] = 'tree_p2pi_dirc/%06d/tree_p2pi_dirc_%s.root' % (RUN, RFSTR) #outfiles['hd_root_tofcalib.root' ] = 'hd_root_tofcalib/%06d/hd_root_tofcalib_%s.root' % (RUN, RFSTR) outfiles['hd_rawdata_%s.pi0fcaltofskim.evio' % RFSTR ] = 'pi0fcaltofskim/%06d/pi0fcaltofskim_%s.evio' % (RUN, RFSTR) #outfiles['syncskim.root' ] = 'syncskim/%06d/syncskim_%s.root' % (RUN, RFSTR) #outfiles['tree_TPOL.root' ] = 'tree_TPOL/%06d/tree_TPOL_%s.root' % (RUN, RFSTR) return outfiles #---------------------------------------------------- def ReconOutFiles(RUN, FILE): # WARNING: Additions here need to also be added to script_nersc_aggregator.sh ! # Return list of output directory/filename mappings for a # reconstruction job. # Map of local file(key) to output file(value) RFSTR = '%06d_%03d' % (RUN, FILE) outfiles = {} outfiles['job_info_%s.tgz' % RFSTR ] = 'job_info/%06d/job_info_%s.tgz' % (RUN, RFSTR) outfiles['tree_fcal_hadronic_eff.root' ] = 'tree_fcal_hadronic_eff/%06d/tree_fcal_hadronic_eff_%s.root' % (RUN, RFSTR) outfiles['tree_bcal_hadronic_eff.root' ] = 'tree_bcal_hadronic_eff/%06d/tree_bcal_hadronic_eff_%s.root' % (RUN, RFSTR) outfiles['tree_TS_scaler.root' ] = 'tree_TS_scaler/%06d/tree_TS_scaler_%s.root' % (RUN, RFSTR) outfiles['p3pi_excl_skim.root' ] = 'p3pi_excl_skim/%06d/p3pi_excl_skim_%s.root' % (RUN, RFSTR) outfiles['tree_trackeff.root' ] = 'tree_trackeff/%06d/tree_trackeff_%s.root' % (RUN, RFSTR) outfiles['tree_tof_eff.root' ] = 'tree_tof_eff/%06d/tree_tof_eff_%s.root' % (RUN, RFSTR) outfiles['tree_sc_eff.root' ] = 'tree_sc_eff/%06d/tree_sc_eff_%s.root' % (RUN, RFSTR) outfiles['tree_TPOL.root' ] = 'tree_TPOL/%06d/tree_TPOL_%s.root' % (RUN, RFSTR) outfiles['tree_PSFlux.root' ] = 'tree_PSFlux/%06d/tree_PSFlux_%s.root' % (RUN, RFSTR) outfiles['maybe:hd_rawdata_%s.sync.evio' % RFSTR ] = 'sync/%06d/sync_%s.evio' % (RUN, RFSTR) outfiles['hd_rawdata_%s.random.evio' % RFSTR ] = 'random/%06d/random_%s.evio' % (RUN, RFSTR) outfiles['hd_rawdata_%s.omega.evio' % RFSTR ] = 'omega/%06d/omega_%s.evio' % (RUN, RFSTR) outfiles['hd_rawdata_%s.ps.evio' % RFSTR ] = 'ps/%06d/ps_%s.evio' % (RUN, RFSTR) outfiles['maybe:hd_rawdata_%s.FCAL-LED.evio' % RFSTR ] = 'FCAL-LED/%06d/FCAL-LED_%s.evio' % (RUN, RFSTR) outfiles['maybe:hd_rawdata_%s.DIRC-LED.evio' % RFSTR ] = 'DIRC-LED/%06d/DIRC-LED_%s.evio' % (RUN, RFSTR) outfiles['maybe:hd_rawdata_%s.CCAL-LED.evio' % RFSTR] = 'CCAL-LED/%06d/CCAL-LED_%s.evio' % (RUN, RFSTR) outfiles['maybe:hd_rawdata_%s.BCAL-LED.evio' % RFSTR ] = 'BCAL-LED/%06d/BCAL-LED_%s.evio' % (RUN, RFSTR) #outfiles['dana_rest_coherent_peak.hddm' ] = 'dana_rest_coherent_peak/%06d/dana_rest_coherent_peak_%s.hddm' % (RUN, RFSTR) outfiles['dana_rest.hddm' ] = 'REST/%06d/dana_rest_%s.hddm' % (RUN, RFSTR) outfiles['hd_root.root' ] = 'hists/%06d/hd_root_%s.root' % (RUN, RFSTR) outfiles['match:converted_random*.hddm' ] = 'converted_random/%06d' % (RUN) return outfiles #---------------------------------------------------- def GetRunInfo(): # Get the list of runs to process and the number of EVIO files for each. # The list is returned in the form of a dictionary with the run numbers # as keys and the maximum evio file number for that run as values. # Which runs show up in the list depends on how the RUNS and RCDB_QUERY # globals are set: # # RUNS is not None: All runs in the list are included # RUNS is empty and RCDB_QUERY is None: All runs in the range MINRUN-MAXRUN inclusive are included # RUNS is empty and RCDB_QUERY is not None: RCDB is queried for the list of runs. # # n.b. that for the first 2 options above, the GetNumEVIOFiles routine # below is called which queries the RCDB via mysql directly so the RCDB # python module does not actually need to be in PYTHONPATH. For the 3rd # option, the RCDB python API is used so it is needed. global RUNS, MINRUN, MAXRUN, RCDB_QUERY, RUN_LIST_SOURCE, BAD_RCDB_QUERY_RUNS, BAD_FILE_COUNT_RUNS good_runs = {} # If RCDB_QUERY is not defined, define with value None try: RCDB_QUERY except : RCDB_QUERY = None # Query through RCDB API if len(RUNS)==0 and RCDB_QUERY!=None: RUN_LIST_SOURCE = 'RCDB ' + str(MINRUN) + '-' + str(MAXRUN) + ' (query="' + RCDB_QUERY + '")' print 'Querying RCDB for run list ....' # Import RCDB python module. Add a path on the CUE just in case # PYTHONPATH is not already set sys.path.append('/group/halld/Software/builds/Linux_CentOS7-x86_64-gcc4.8.5/rcdb/rcdb_0.06.00/python') import rcdb db = rcdb.RCDBProvider('mysql://' + RCDB_USER + '@' + RCDB_HOST + '/rcdb') print 'RCDB_QUERY = ' + RCDB_QUERY for r in db.select_runs(RCDB_QUERY, MINRUN, MAXRUN): evio_files_count = r.get_condition_value('evio_files_count') if evio_files_count == None: print('ERROR in RCDB: Run ' + str(r.number) + ' has no value for evio_files_count!...') BAD_RCDB_QUERY_RUNS.append( int(r.number) ) print('Attempting to extract number of files by examining /mss ...') rawdatafiles = glob.glob('/mss/halld/RunPeriod-'+RUNPERIOD+'/rawdata/Run%06d/hd_rawdata_%06d_*.evio' % (r.number,r.number)) if len(rawdatafiles) > 0: evio_files_count = len(rawdatafiles) if evio_files_count == None: print('ERROR getting number of files for: Run ' + str(r.number) ) BAD_FILE_COUNT_RUNS.append( int(r.number) ) continue good_runs[r.number] = int(evio_files_count) elif len(RUNS)==0 : RUN_LIST_SOURCE = 'All runs in range ' + str(MINRUN) + '-' + str(MAXRUN) print 'Getting info for all runs in range ' + str(MINRUN) + '-' + str(MAXRUN) + ' ....' for RUN in range(MINRUN, MAXRUN+1): good_runs[RUN] = GetNumEVIOFiles(RUN) else: RUN_LIST_SOURCE = 'Custom list: ' + ' '.join([str(x) for x in RUNS]) print 'Getting info for runs : ' + ' '.join([str(x) for x in RUNS]) for RUN in RUNS: good_runs[RUN] = GetNumEVIOFiles(RUN) # Filter out runs in the EXCLUDE_RUNS list global EXCLUDE_RUNS good_runs_filtered = {} for run in good_runs.keys(): if run not in EXCLUDE_RUNS: good_runs_filtered[run] = good_runs[run] return good_runs_filtered #---------------------------------------------------- def GetNumEVIOFiles(RUN): global BAD_RCDB_QUERY_RUNS, BAD_FILE_COUNT_RUNS # Access RCDB to get the number of EVIO files for this run. # n.b. the file numbers start from 0 so the last valid file # number will be one less than the value returned global RCDB, cnx, cur if not RCDB : try: RCDB = 'mysql://' + RCDB_USER + '@' + RCDB_HOST + '/rcdb' cnx = mysql.connector.connect(user=RCDB_USER, host=RCDB_HOST, database='rcdb') cur = cnx.cursor() # using dictionary=True crashes when running on ifarm (??) except Exception as e: print 'Error connecting to RCDB: ' + RCDB print str(e) sys.exit(-1) Nfiles = 0 sql = 'SELECT int_value from conditions,condition_types WHERE condition_type_id=condition_types.id' sql += ' AND condition_types.name="evio_files_count" AND run_number=' + str(RUN); cur.execute(sql) c_rows = cur.fetchall() if len(c_rows)>0 : Nfiles = int(c_rows[0][0]) else: BAD_RCDB_QUERY_RUNS.append(RUN) print('Attempting to extract number of files by examining /mss ...') rawdatafiles = glob.glob('/mss/halld/RunPeriod-'+RUNPERIOD+'/rawdata/Run%06d/hd_rawdata_%06d_*.evio' % (RUN,RUN)) if len(rawdatafiles) > 0: Nfiles = len(rawdatafiles) else: BAD_FILE_COUNT_RUNS.append(RUN) return Nfiles #---------------------------------------------------- def GetFileNumbersToProcess(Nfiles): # This will return a list of file numbers to process for a run # given the number of files given by Nfiles. The list is determined # by the values MINFILENO, MAXFILENO and FILE_FRACTION. # First, the actual range of file numbers for the run is determined # by MINFILENO-MAXFILENO, but clipped if necessary to Nfiles. # Next, a list of files in that range representing FILE_FRACTION # of the range is formed and returned. # # Example 1: Process first 10 files of each run: # MINFILENO = 0 # MAXFILENO = 9 # FILE_FRACTION = 1.0 # # Example 2: Process first 5% of files of each run of # files distributed throughout run: # MINFILENO = 0 # MAXFILENO = 1000 n.b. set this to something really big # FILE_FRACTION = 0.05 # global MINFILENO, MAXFILENO, FILE_FRACTION # Make sure MINFILENO is not greater than max file number for the run if MINFILENO >= Nfiles : return [] # Limit max file number to how many there are for this run according to RCDB maxfile = MAXFILENO+1 # set to 1 past the actual last file number we want to process if Nfiles < maxfile : maxfile = Nfiles # If FILE_FRACTION is 1.0 then we want all files in the range. if FILE_FRACTION == 1.0: return range( MINFILENO, maxfile) # At this point, maxfile should be one greater than the last file # number we want to process. If the last file we want to process # is the last file in the run, then it could be a short file. Thus, # use the next to the last file in the run to determine the range. if Nfiles < MAXFILENO : maxfile -= 1 # Number of files in run to process Nrange = float(maxfile-1) - float(MINFILENO) N = math.ceil(FILE_FRACTION * Nrange) if N<2 : return [MINFILENO] nskip = Nrange/(N-1) filenos = [] for i in range(0, int(N)): filenos.append(int(i*nskip)) # print 'Nrange=%f N=%f nskip=%f ' % (Nrange, N, nskip) return filenos #---------------------------------------------------- def PrintConfigSummary(): print '=================================================' print 'Launch Summary ' + ('**** TEST MODE ****' if TESTMODE else '') print '-----------------------------------------------' print ' RunPeriod: ' + RUNPERIOD print ' Launch type: ' + LAUNCHTYPE print ' Version: ' + VER print ' batch: ' + BATCH print ' WORKFLOW: ' + WORKFLOW print ' QOS: ' + QOS + ' (BigRed3 partition)' print ' Origin of run list: ' + RUN_LIST_SOURCE print ' Number of runs: ' + str(len(good_runs)) print ' Number of files: ' + str(NUM['files_to_process']) + ' (maximum ' + str(MAXFILENO-MINFILENO+1) + ' files/run)' print ' Min. file no.: ' + str(MINFILENO) print ' Max. file no.: ' + str(MAXFILENO) print ' QOS: ' + QOS + ' (BigRed3 partition)' print ' Time limit per job: ' + TIMELIMIT print ' Singularity image: ' + IMAGE print ' halld_recon version: ' + RECONVERSION + ' (from copy of /group/halld)' print ' launch directory: ' + LAUNCHDIR + ' (at BigRed)' print ' JANA config file: ' + CONFIG print ' Output directory: ' + OUTPUTTOP print '=================================================' #---------------------------------------------------- # --------------- MAIN -------------------- # Initialize some counters NUM = {} NUM['files_to_process'] = 0 NUM['files_submitted'] = 0 NUM['jobs_to_process'] = 0 NUM['jobs_submitted'] = 0 # Get list of runs with number of evio files for each. # (parameters for search set at top of file) good_runs = GetRunInfo() # Print some info before doing anything for n in [x for (y,x) in good_runs.iteritems()]: NUM['files_to_process'] += len(GetFileNumbersToProcess(n)) if VERBOSE > 0: PrintConfigSummary() # Create workflow cmd = ['swif2', 'create', '-workflow', WORKFLOW] cmd += ['-site', 'iu/bigred3'] cmd += ['-site-storage', 'iu:bigred3:hpg'] #cmd += ['-site-login', 'hpg@bigred3.uits.iu.edu'] cmd += ['-max-concurrent', MAX_CONCURRENT_JOBS] if VERBOSE>0 : print 'Workflow creation command: ' + ' '.join(cmd) if TESTMODE: print '(TEST MODE so command will not be run)' else: (cmd_out, cmd_err) = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() if VERBOSE>0: if len(cmd_err)>0 : if VERBOSE>1 : print cmd_err print 'Command returned error message. Assuming workflow already exists' else: print cmd_out # Run workflow cmd = ['swif2', 'run', '-workflow', WORKFLOW] if VERBOSE>0 : print 'Command to start workflow: ' + ' '.join(cmd) if TESTMODE: print '(TEST MODE so command will not be run)' else: (cmd_out, cmd_err) = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() if VERBOSE>0: print cmd_out print cmd_err # Loop over runs DIRS_CREATED = [] # keeps track of local directories we create so we don't create them twice if VERBOSE>0 : print 'Submitting jobs ....' print '-----------------------------------------------' for (RUN, Nfiles) in good_runs.iteritems(): # Get list of files to process files_to_process = GetFileNumbersToProcess( Nfiles ) # Loop over files, creating job for each for FILE in files_to_process: MakeJob(RUN, FILE) if VERBOSE>0: sys.stdout.write(' ' + str(NUM['files_submitted']) + '/' + str(NUM['files_to_process']) + ' jobs \r') sys.stdout.flush() print('\n') print('NOTE: The values in BAD_RCDB_QUERY_RUNS is informative about what is missing from') print(' the RCDB. An attempt to recover the information from the /mss filesystem') print(' was also made. Values in BAD_FILE_COUNT_RUNS are ones for which that failed.') print(' Thus, only runs listed in BAD_FILE_COUNT_RUNS will not have any jobs submitted') print 'BAD_RCDB_QUERY_RUNS=' + str(BAD_RCDB_QUERY_RUNS) print 'BAD_FILE_COUNT_RUNS=' + str(BAD_FILE_COUNT_RUNS) print 'BAD_MSS_FILE_RUNS=' + str(BAD_MSS_FILE_RUNS) # If more than 5 jobs were submitted then the summary printed above probably # rolled off of the screen. Print it again. if (VERBOSE > 0) : PrintConfigSummary() NUM['missing_mss_files'] = 0 for run,files in BAD_MSS_FILE_RUNS.items(): NUM['missing_mss_files'] += len(files) print('') print('WORKFLOW: ' + WORKFLOW) print('------------------------------------') print('Number of runs: ' + str(len(good_runs)) + ' (only good runs)') print(str(NUM['files_submitted']) + '/' + str(NUM['files_to_process']) + ' total files submitted (' + str(NUM['missing_mss_files']) + ' files missing from mss)') print(str(NUM['jobs_submitted']) + '/' + str(NUM['jobs_to_process']) + ' total jobs submitted') print(str(len(DIRS_CREATED)) + ' directories created for output') print('')