// et2evio.cc // // copy et buffers to evio file // closes file at end-of-run, opens another // // ejw, 10-jul-2013 // // Still to do: // Select on control words /* include files */ #include #include #include #include #include #include #include #include #include #include using namespace std; //#define NO_CODAOBJECTROOT // placeholder #ifdef NO_CODAOBJECTROOT // ================ No CODA Objects available. Compile dummy program ============ int main(int narg, char *argv[]){ cout << "This program requires the CODA Objects package be available when" << endl; cout << "it compiles. It was compiled without CODA objects support and therefore" << endl; cout << "is disabled. (Sorry!)" << endl; } #else // =========================== Actual program follows =========================== // for et #include // for evio #include "evioUtil.hxx" #include #include // for coda object package #include using namespace evio; using namespace codaObject; /* for et */ static string etName = "etName"; static string et_station_name = "et2evio"; static bool remote = false; static bool block = false; static int qsize = 1; static int chunk = 100; //static int select_control[4] = {-1,-1,-1,-1}; static int select_control[4] = {0,0,0,0}; static et_sys_id et_system_id; static et_openconfig openconfig; static string et_filename; static et_att_id et_attach_id; static et_statconfig et_station_config; static et_stat_id et_station_id = -1; static et_event **pevents; // for performance stats, etc. static uint64_t nbuf_read = 0; static uint64_t nbuf_proc = 0; static uint64_t nev_read = 0; static uint64_t nev_proc = 0; static int nfile_proc = 0; static time_t last_time = time(NULL); static uint64_t last_buf_read = 0; static uint64_t last_buf_proc = 0; static uint64_t last_ev_proc = 0; static uint64_t last_ev_read = 0; static int rate_time = 10; // in secs static double buf_read_rate; static double buf_proc_rate; static double ev_read_rate; static double ev_proc_rate; /* misc variables */ static string udl = "cMsg://localhost/cMsg"; static string name = "et2evio"; static string startup_session; static string evioFileBase = "et2evio"; static string dictfilename; static int max_file = 0; static int max_evfile = 0; static int max_mbfile = 10000; // 10 GBytes static bool no_buffer = false; static bool force_start = false; static bool file_open = false; static bool run_in_progress = false; static bool paused = false; static bool debug = false; static bool done = false; static bool verbose = false; static int int_count = 0; /* prototypes */ void decode_command_line(int argc, char **argv); void quit_callback(int sig); void connect_to_et(); //-------------------------------------------------------------------------- //-------------------------------------------------------------------------- // class et2evio : public RunObject { // // public: // et2evio(const string& UDL, const string& name, const string& descr, const string &theSession) : // RunObject(UDL,name,descr) { class et2evio { public: et2evio(const string& UDL, const string& name, const string& descr, const string &theSession){ cout << "et2evio constructor called" << endl; // set session if specified // if(!theSession.empty())handleSetSession(theSession); } //----------------------------------------------------------------------------- ~et2evio(void) throw() { done=true; } //----------------------------------------------------------------------------- bool userConfigure(const string& s) throw(CodaException) { paused=false; return(true); } //----------------------------------------------------------------------------- bool userDownload(const string& s) throw(CodaException) { paused=false; return(true); } //----------------------------------------------------------------------------- bool userPrestart(const string& s) throw(CodaException) { paused=false; return(true); } //----------------------------------------------------------------------------- bool userGo(const string& s) throw(CodaException) { paused=false; run_in_progress=true; return(true); } //----------------------------------------------------------------------------- bool userPause(const string& s) throw(CodaException) { paused=true; return(true); } //----------------------------------------------------------------------------- bool userResume(const string& s) throw(CodaException) { paused=false; return(true); } //----------------------------------------------------------------------------- bool userEnd(const string& s) throw(CodaException) { paused=false; run_in_progress=false; return(true); } //----------------------------------------------------------------------------- bool userReset(const string& s) throw(CodaException) { paused=false; run_in_progress=false; return(true); } //----------------------------------------------------------------------------- void exit(const string& s) throw(CodaException) { cout << "et2evio received exit command" << endl; run_in_progress=false; done=true; } //----------------------------------------------------------------------------- void userMsgHandler(cMsgMessage *msgp, void *userArg) throw(CodaException) { // unique_ptr msg(msgp); cerr << "?et2evio...received unknown message subject,type: " << msgp->getSubject() << "," << msgp->getType() << endl << endl; delete msgp; } //------------------------------------------------------------------------------- // end class definition //------------------------------------------------------------------------------- }; static void* monitoring_thread(void *arg) { time_t now; while(true) { // sleep if(rate_time>0) { sleep(rate_time); } else { sleep(1); } now=time(NULL); if((rate_time>0)&&(now-last_time)>rate_time) { int delta=now-last_time; buf_read_rate=(double)(nbuf_read-last_buf_read)/delta; buf_proc_rate=(double)(nbuf_proc-last_buf_proc)/delta; ev_read_rate=(double)(nev_read-last_ev_read)/delta; ev_proc_rate=(double)(nev_proc-last_ev_proc)/delta; last_time=now; last_buf_read=nbuf_read; last_buf_proc=nbuf_proc; last_ev_read=nev_read; last_ev_proc=nev_proc; cout << " nbuf_read: " << nbuf_read << endl << " nbuf_proc: " << nbuf_proc << endl << " buf_read_rate: " << (int)buf_read_rate << endl << " buf_proc_rate: " << (int)buf_proc_rate << endl << " nev_read: " << nev_read << endl << " nev_proc: " << nev_proc << endl << " ev_read_rate: " << (int)ev_read_rate << endl << " ev_proc_rate: " << (int)ev_proc_rate << endl << " nfile_proc: " << nfile_proc << endl << " run_in_progress: " << run_in_progress << endl << endl; } } return NULL; } //------------------------------------------------------------------------------- int main (int argc, char **argv) { int etBufferCount,stat; size_t etbuflen; uint32_t *pdata; int fileCount=0, evcount=0; uint64_t bcount=0; //unique_ptr chan; evioFileChannel *chan = NULL; ofstream *ofs = NULL; /* decode command line */ decode_command_line(argc,argv); // set session name if(startup_session.empty())startup_session="halldsession"; /* set signal handlers */ if(signal(SIGTERM,quit_callback)==SIG_ERR)printf("?unable to set TERM signal handler\n");; if(signal(SIGQUIT,quit_callback)==SIG_ERR)printf("?unable to set QUIT signal handler\n");; if(signal(SIGHUP,quit_callback)==SIG_ERR)printf("?unable to set HUP signal handler\n");; if(signal(SIGINT,quit_callback)==SIG_ERR)printf("?unable to set INT signal handler\n");; // allocate pevents array pevents = (et_event **) calloc(chunk, sizeof(et_event*)); if (pevents == NULL) { cerr << "unable to allocate events array, chunk is " << chunk << endl; exit(EXIT_FAILURE); } // create run object et2evio c(udl, name, "et2evio", startup_session); // c.startProcessing(); // cout << "Process startup: " << name << " in session " << c.getSession() <is_open()){ cerr << "Unable to open output file \""<close(); nfile_proc++; file_open=false; done=(max_file>0)&&(fileCount>=max_file); } sleep(1); } else if (paused) { sleep(1); } else { // // open file if needed // if(!file_open) { // stringstream ss; // // ss << evioFileBase << "_" << setfill('0') << setw(6) << c.getRunNumber() << ".evio." << setw(3) << fileCount << ends; // ss << evioFileBase << "_" << setfill('0') << setw(6) << 0 << ".evio." << setw(3) << fileCount << ends; // //chan.reset(new evioFileChannel(ss.str(),"w")); // //chan = new evioFileChannel(ss.str(),"w"); // ofs = new ofstream(ss.str().c_str()); // fileCount++; // evcount=0; // bcount=0; // //chan->open(); // file_open=true; // } // get some et buffers if((stat=et_events_get(et_system_id, et_attach_id, pevents, ET_SLEEP, NULL, chunk, &etBufferCount))!=ET_OK) { cerr << "?error return from et_events_get: " << stat << endl; exit(EXIT_FAILURE); } if(debug&&(etBufferCountwrite((const char*)pdata, etbuflen); evcount++; nev_read++; nev_proc++; // if(no_buffer) { // chan->write(pdata); // evcount++; // nev_read++; // nev_proc++; // bcount+=(pdata[0]+1)*sizeof(uint32_t); // // } else { // evioBufferChannel etChan((uint32_t*)pdata,etbuflen/sizeof(uint32_t),"r"); // etChan.open(); // while(etChan.read()) { // chan->write(etChan); // evcount++; // nev_read++; // nev_proc++; // bcount+=(pdata[0]+1)*sizeof(uint32_t); // } // etChan.close(); // } } // release buffers et_events_put(et_system_id,et_attach_id,pevents,etBufferCount); // check whether file should be closed // if(((max_evfile>0)&&(evcount>=max_evfile)) || // ((max_mbfile>0)&&(int(bcount/1000000)>=max_mbfile)) ) { // chan->close(); // nfile_proc++; // file_open=false; // done=(max_file>0)&&(fileCount>=max_file); // } } } if(ofs){ ofs->close(); delete ofs; } /* done */ // c.stopProcessing(); et_station_detach(et_system_id,et_attach_id); et_station_remove(et_system_id,et_station_id); et_forcedclose(et_system_id); exit(EXIT_SUCCESS); } /*---------------------------------------------------------------- */ void quit_callback(int sig) { int_count++; if(int_count>=5)exit(EXIT_FAILURE); done=1; } /*---------------------------------------------------------------- */ void connect_to_et() { int status; sigset_t sigblock; /* create config */ et_open_config_init(&openconfig); if(remote)et_open_config_sethost(openconfig,ET_HOST_ANYWHERE); /* connect to et system */ status=et_open(&et_system_id,et_filename.c_str(),openconfig); if(status!=ET_OK) { printf( "?Unable to connect to et system\n"); exit(EXIT_FAILURE); } /* kill station if it exists */ if(et_station_exists(et_system_id,&et_station_id,et_station_name.c_str()))et_station_remove(et_system_id,et_station_id); /* create station */ et_station_config_init(&et_station_config); if(block) { et_station_config_setblock(et_station_config,ET_STATION_BLOCKING); } else { et_station_config_setblock(et_station_config,ET_STATION_NONBLOCKING); } et_station_config_setselect(et_station_config,ET_STATION_SELECT_MATCH); et_station_config_setselectwords(et_station_config,select_control); et_station_config_setuser(et_station_config,ET_STATION_USER_MULTI); et_station_config_setrestore(et_station_config,ET_STATION_RESTORE_OUT); et_station_config_setprescale(et_station_config,1); et_station_config_setcue(et_station_config,qsize); status=et_station_create(et_system_id,&et_station_id,et_station_name.c_str(),et_station_config); if(status!=ET_OK) { et_forcedclose(et_system_id); printf("?Unable to create station %s\n",et_station_name.c_str()); exit(EXIT_FAILURE); } // Force station to first position so we can see events // before the ER does et_station_setposition(et_system_id, et_station_id, 1, 0); /* block signals to THIS thread and any thread created by this thread */ /* needed to keep signals from et threads */ sigfillset(&sigblock); pthread_sigmask(SIG_BLOCK,&sigblock,NULL); /* attach to station */ status=et_station_attach(et_system_id,et_station_id,&et_attach_id); if(status!=ET_OK) { et_forcedclose(et_system_id); printf("?Unable to attach to station %s\n",et_station_name.c_str()); exit(EXIT_FAILURE); } /* unblock signals */ pthread_sigmask(SIG_UNBLOCK,&sigblock,NULL); return; } /*----------------------------------------------------------------*/ void decode_command_line(int argc, char**argv) { const char *help = "\nusage:\n\n et2evio [-udl udl] [-n name] [-s session] [-force]\n" " [-et et_file_name] [-remote] [-chunk chunk]\n" " [-station et_station_name] [-queue qsize] [-block]\n" " [-evio evioFileBase] [-dict dictfilename]\n" " [-max_file max_file] [-max_mbfile max_mbfile] [-max_evfile max_evfile]\n" " [-c0 c0] [-c1 c1] [-c2 c2] [-c3 c3]\n" " [-no_buffer] [-rate rate_time] [-debug] [-verbose]\n"; /* loop over arguments */ int i=1; while (i