// evio2cmsg // Reads evio events from a file and inserts into ET system // Rereads file at eof // Still to do: // const char* in setByteArrayNoCopy() // ejw, 8-jul-2013 #include #include #include #include #include #include #include // for evio #include "evioUtil.hxx" #include #include // for cmsg #include "cMsg.hxx" using namespace std; using namespace evio; using namespace cmsg; // misc variables string evioFilename = "eviofile.dat"; static string udl = "cMsg://localhost/cMsg/data"; static string name = "evio2cmsg"; static string subject = "cMsg2et"; static string type = "insert"; static int delay_time = 10; // in millisecs between reading evio events static int rate_time = 5; // in secs static int max_file = 0; static uint64_t max_event = 0; static int max_event_file = 0; static int max_len = 100000; // in bytes static bool no_buffer = false; static bool debug = false; // for performance stats, etc. static uint64_t nev_read = 0; static uint64_t nev_to_cmsg = 0; static time_t last_time = time(NULL); static uint64_t last_rec = 0; static uint64_t last_proc = 0; static double read_rate; static double proc_rate; // other prototypes void decode_command_line(int argc, char **argv); //------------------------------------------------------------------------------- //------------------------------------------------------------------------------- static void* monitoring_thread(void *arg) { time_t now; while(true) { // sleep sleep((rate_time>0)?rate_time:1); // calc stats now=time(NULL); int delta=now-last_time; read_rate=(double)(nev_read-last_rec)/delta; proc_rate=(double)(nev_to_cmsg-last_proc)/delta; last_time=now; last_rec=nev_read; last_proc=nev_to_cmsg; cout << " nev_read: " << nev_read << endl << " read_rate: " << (int)read_rate << endl << " nev_to_cmsg: " << nev_to_cmsg << endl << " proc_rate: " << (int)proc_rate << endl << endl; } return NULL; } //------------------------------------------------------------------------------- int main(int argc,char **argv) { int filecount=0, evfilecount=0; bool eof = false; char *buf = new char[max_len]; // decode command line decode_command_line(argc,argv); // launch monitoring thread //thread t(monitoring_thread); pthread_t t; pthread_create(&t, NULL, monitoring_thread, NULL); try { // connect to cMsg cMsg c(udl,name,"evio2cmsg"); c.connect(); // create and open evio file channel evioFileChannel *chan = new evioFileChannel(evioFilename,"r"); // open/read/close evio file as many times as needed filecount=0; evfilecount=0; while( ((max_file<=0)||(filecountopen(); // fill et buffers with evio events, reopen file if needed eof=false; while(!eof && ((max_event_file<=0)||(evfilecount0)usleep(delay_time*1000); // get evio event from file eof=!chan->readNoCopy(); if(!eof) { nev_read++; evfilecount++; const uint32_t *pevio = chan->getNoCopyBuffer(); // create and fill message cMsgMessage msg; msg.setSubject(subject); msg.setType(type); if(no_buffer) { msg.add("dataType","evioEvent"); msg.setByteArrayNoCopy((char*)pevio,(pevio[0]+1)*sizeof(uint32_t)); } else { msg.add("dataType","evioBuffer"); evioBufferChannel cmsgChan((uint32_t*)buf,max_len/sizeof(uint32_t),"w"); cmsgChan.open(); cmsgChan.write(pevio); msg.setByteArrayNoCopy((char*)buf,cmsgChan.getEVIOBufferLength()*sizeof(uint32_t)); cmsgChan.close(); } if(debug) { uint32_t *d = (uint32_t*)msg.getByteArray(); cout << endl; for(int i=0; i<12; i++) cout << hex << "0x" << d[i] << " "; cout << dec << endl; } c.send(msg); nev_to_cmsg++; } } // close channel chan->close(); delete chan; } } catch (evioException e) { cerr << e.toString() << endl; } catch (cMsgException e) { cerr << e.toString() << endl; } catch (...) { cerr << "?unknown exception" << endl; } // done cout << endl << " *** evio2cmsg shutdown after reading " << nev_read << "(" << filecount << ") events(files), sending " << nev_to_cmsg << " events ***" << endl << endl; } //-------------------------------------------------------------------------- void decode_command_line(int argc, char**argv) { const string help = "\nUsage:\n\n evio2cmsg [-name name] [-u udl] [-s subject] [-t type] [-no_buffer]\n" " [-delay delay_time] [-rate rate_time] \n" " [-max_event max_event] [-max_file max_file] [-max_ev_file max_event_file] [-max_len max_len] \n" " [-evio evioFilename] [-debug]\n"; int i=1; while(i