#!/usr/bin/env python # # This is run once per hour from the hdsys crontab on both # gluonraid3 and gluonraid4. # # Copy files from local disk into /media/ramdisk/active # on gluondaqbuff so skims may be processed and files may be # transferred to tape. # # This is a stop-gap for transitioning from the old directory # structure used for the DAQ to the new HOSS system. This will # scan each of the /data*/rawdata/active/${RUNPERIOD}/rawdata # directories on the local machine for evio files. Any that # it finds, it will check if the file already exists in a # /data*/rawdata/volatile/${RUNPERIOD}/rawdata/Run* directory. # If it does and has the same file size then the one in "active" # it is removed. If it does not exist and the original is more than # 30 minutes old then it will use hdrdmacp to copy it into the # HOSS system for processing. # # Any RunLog directories found whose contents are all more than # 1 hour old will be moved into an appropriate Run directory on # /data1. The directory will be automatically found and a tarball # made in the stage_for_tape directory by the hd_stage_to_tape.py # script which is run via cron job. # import os import sys import glob import time import random import subprocess RUNPERIOD = 'RunPeriod-2019-11' MAX_PROCS = 1 TESTMODE = True files = [] runlog_dirs = [] monitoring_dirs = [] procs = {} completed_files = [] #---------------------------- # GetDirAge # # Find age of most recently modified file in seconds in given directory. #---------------------------- def GetDirAge(mydir): # Find age of most recently modified file in target dir max_mtime = 0 for dirname,subdirs,files in os.walk( mydir ): for fname in files: full_path = os.path.join(dirname, fname) mtime = os.path.getmtime( full_path ) if mtime > max_mtime: max_mtime = mtime return time.time() - max_mtime # age in seconds #=========================================================================== # Loop over command line arguments for arg in sys.argv[1:]: if arg == '-r' : TESTMODE = False # Loop over partitions getting list of evio files for ipart in range(1,5): # EVIO files topdir = '/data%d/rawdata/active/%s/rawdata' % (ipart, RUNPERIOD) part_files = glob.glob(topdir + '/Run*/hd_rawdata_??????_???.evio') print( ('==== Processing %d files from: ' + topdir) % len(part_files)) files += part_files # Runlog directories part_dirs = glob.glob(topdir + '/Run*/RunLog*') for d in part_dirs: if os.path.isdir(d): if GetDirAge(d) > 3600: runlog_dirs.append( d ) # Monitoring directories part_dirs = glob.glob(topdir + '/Run*/monitoring') for d in part_dirs: if os.path.isdir(d): if GetDirAge(d) > 3600: monitoring_dirs.append( d ) # Loop over EVIO files and check for ones already processed files_processed = {} # key=full path to active val=full path to volatile duplicates = [] # seems some files were copied to 2 different volatile partitions. use this so we can delete one of them for fpath in files: fbase = fpath.split('/')[-1] vfiles = glob.glob('/data*/rawdata/volatile/RunPeriod*/rawdata/*/' + fbase) if len(vfiles) > 0: size_active = os.path.getsize(fpath) size_volatile = os.path.getsize(vfiles[0]) if size_active == size_volatile: files_processed[fpath] = vfiles[0] else: print('Skipping ' + fbase + ' (sizes don\'t match: ' + str(size_active) + ' != ' + str(size_volatile)) if len(vfiles)>1: print('WARNING: More than one file matching "'+fbase+'" in volatile!') print(vfiles) size_1 = os.path.getsize(vfiles[0]) all_sizes_same = True for f in vfiles[1:]: size_2 = os.path.getsize(f) if size_2 != size_1: all_sizes_same = False if all_sizes_same: duplicates.extend( vfiles[1:] ) else: print('ERROR: Multiple volatile versions found but sizes don\'t match!') # Delete files already processed and remove them from the files list print('==== Deleting %d EVIO files already processed ====' % len(files_processed)) for fpath,vpath in files_processed.items(): print('Deleting ' + fpath) if not TESTMODE: os.unlink(fpath) files = [x for x in files if x not in files_processed.keys()] print('==== Deleting %d duplicates ====' % len(duplicates)) for f in duplicates: print('Deleting: ' + f) if not TESTMODE: os.unlink(f) print('==== Moving %d RunLog files ====' % len(runlog_dirs)) for d in runlog_dirs: dest = d.replace('active', 'volatile') if os.path.exists(dest): print('WARNING: Directory already exists: ' + dest) continue dest_parent = os.path.dirname(dest) if not os.path.exists(dest_parent): print('making directory: ' + dest_parent) if not TESTMODE: os.makedirs(dest_parent) print('moving %s -> %s' % (d, dest)) if not TESTMODE: os.rename(d, dest) print('==== Moving %d monitoring (RootSpy) directories ====' % len(monitoring_dirs)) for d in monitoring_dirs: dest = d.replace('active', 'volatile') if os.path.exists(dest): print('WARNING: Directory already exists: ' + dest) continue dest_parent = os.path.dirname(dest) if not os.path.exists(dest_parent): print('making directory: ' + dest_parent) if not TESTMODE: os.makedirs(dest_parent) print('moving %s -> %s' % (d, dest)) if not TESTMODE: os.rename(d, dest) print('==== Sending %d EVIO files to HOSS ====' % len(files)) # Loop until all files are processed random.shuffle( files ) # shuffle files so they are not all read from same partition at once while True: if len(completed_files) >= len(files): break #if (len(completed_files)%10) == 0: print('Completed %d/%d files' % (len(completed_files), len(files))) for sfile in files: if sfile in completed_files: continue # This file has already been processed if sfile in procs.keys(): continue # This file is currently being processed if len(procs) >= MAX_PROCS: time.sleep(0.5) break cmd = ['hdrdmacp', '-u', 'hdops', '-m', '2', '-n', '8', sfile, 'gluondaqbuff:/media/ramdisk/active/'+os.path.basename(sfile)] if TESTMODE: print('(command we would have run: ' + ' '.join(cmd) + ')') # cmd = ['ls', '-l', sfile] prefix = '%s cmd: ' % time.ctime() print(prefix + ' '.join(cmd) ) if not TESTMODE: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) procs[sfile] = {'proc':proc, 'cmd':cmd} else: completed_files.append( sfile ) # Look for finished subprocesses for sfile,proc in procs.items() : if proc['proc'].poll() is not None : print( proc['proc'].communicate()[0] ) print('subprocess completed: ' + ' '.join(proc['cmd'])) procs.pop( sfile ) completed_files.append( sfile ) print('Completed %d/%d files' % (len(completed_files), len(files))) # Limit rate through loop #time.sleep(5)