// cMsg2et // // Inserts data from cMsg message into ET system // // Uses CodaObject framework // Based on ipcbank2et from CLAS online, ejw, 19-oct-00 // // // Still to do: // what aux evio and et parameters are needed in message? // remote et // add host-specific subscription to coda object library // // ejw, 10-apr-2013 #include //#include #include #include #include #include #include #include using namespace std; #include #ifndef HAVE_CODAOBJECTS // ================ No CODA Objects available. Compile dummy program ============ int main(int narg, char *argv[]){ cout << "This program requires the CODA Objects package be available when" << endl; cout << "it compiles. It was compiled without CODA objects support and therefore" << endl; cout << "is disabled. (Sorry!)" << endl; } #else // =========================== Actual program follows =========================== // for coda object package #include using namespace codaObject; // for et #include // for evio #include #include #include using namespace cmsg; using namespace evio; // misc variables static string udl = "cMsg://gluondb1/cMsg"; static string dataUDL = "cMsg://gluondb1/cMsg/data"; static string mySession; static string localHost = string(getenv("HOST")); static string remoteHost; //static unique_ptr dataConn; static cMsg* dataConn; static string name = "cMsg2et"; static bool et_ok = false; static bool connected = false; static bool lost_connection = false; static bool verbose = false; static bool debug = false; static bool force_start = false; static bool done = false; // for calculation of performance stats static int sleep_time = 2; // in secs static int rate_time = 5; // in secs static int nev_rec = 0; static int nev_to_et = 0; static int nev_no_et = 0; static time_t now; static time_t last_time = time(NULL); static int last_rec = 0; static int last_proc = 0; static int last_no_et = 0; static double rec_rate; static double proc_rate; static double no_et_rate; // et stuff static et_sys_id et_system_id; static et_openconfig openconfig; static string et_filename; static et_att_id et_attach_id; static et_event *et_event_ptr; static int et_control[ET_STATION_SELECT_INTS]; static size_t etbuffersize; // for insert request handling static const string insertType = "insert"; static const string evioEvent = "evioevent"; static const string evioBuffer = "eviobuffer"; // other prototypes class cMsg2et; void decode_command_line(int argc, char **argv); void init_et(void); void connect_et(void); //---------------------------------------------------------------------- void monitoring_thread0(cMsg *conn); extern "C" { void quitCallback(int sig); void atExitHandler(void); } //---------------------------------------------------------------------- pthread_t t; static void *monitoring_thread (void* arg) { cMsg *conn = (cMsg*) arg; //cout << " conn ptr= " << conn << endl; monitoring_thread0(conn); return 0; } //------------------------------------------------------------------------------- //------------------------------------------------------------------------------- class cMsg2et : public RunObject { public: cMsg2et(const string& UDL, const string& name, const string& descr, const string &theSession) : RunObject(UDL,name,descr) { cout << "cMsg2et constructor called" << endl; // set session if specified if(!theSession.empty())handleSetSession(theSession); } //----------------------------------------------------------------------------- ~cMsg2et(void) throw () { done=true; } //----------------------------------------------------------------------------- bool userConfigure(const string& s) throw () { return(true); } //----------------------------------------------------------------------------- bool userDownload(const string& s) throw () { return(true); } //----------------------------------------------------------------------------- bool userPrestart(const string& s) throw () { return(true); } //----------------------------------------------------------------------------- bool userGo(const string& s) throw () { dataConn->start(); return(true); } //----------------------------------------------------------------------------- bool userPause(const string& s) throw () { dataConn->stop(); return(true); } //----------------------------------------------------------------------------- bool userResume(const string& s) throw () { dataConn->start(); return(true); } //----------------------------------------------------------------------------- bool userEnd(const string& s) throw () { dataConn->stop(); return(true); } //----------------------------------------------------------------------------- bool userReset(const string& s) throw () { return(true); } //----------------------------------------------------------------------------- void exit(const string& s) throw () { cout << "cMsg2et received exit command" << endl; done=true; } //----------------------------------------------------------------------------- void userMsgHandler(cMsgMessage *msgp, void *userArg) throw () { //unique_ptr msg(msgp); cMsgMessage* msg = new cMsgMessage(msgp); //cMsgMessage* msg = msgp; cerr << "?cMsg2et...received unknown message subject,type: " << msg->getSubject() << "," << msg->getType() << endl << endl; } //------------------------------------------------------------------------------- // end class definition //------------------------------------------------------------------------------- }; // inserts contents of cMsgMessage into ET system class dataCallback : public cMsgCallback { void callback(cMsgMessage *msg, void *userArg) { cout << "subject is: " << msg->getSubject() << endl; cout << "type is: " << msg->getType() << endl; cout << "userInt is: " << msg->getUserInt() << endl; cout << "text is: " << msg->getText() << endl; cout << "byte array length is: " << msg->getByteArrayLength() << endl; cout << endl; //unique_ptr msg(msgp); //cMsgMessage* msg = new cMsgMessage(msgp); //cMsgMessage* msg = msgp; int status; int *pdata; if(debug) cout << "cMsg2etCallback received message" << endl; try { // convert to lower case string type = msg->getType(); std::transform(type.begin(), type.end(), type.begin(), (int(*)(int)) tolower); string dataType = msg->getString("dataType"); cout << "cMsg2etCallback received message dataType = " << dataType << endl; std::transform(dataType.begin(), dataType.end(), dataType.begin(), (int(*)(int)) tolower); // only handle insert messages for the moment... if(type==insertType) { // total events received nev_rec++; if(debug) cout << "cMsgCallback received event for insert, dataType is: " << dataType << endl; // check et if(!et_ok) { if(debug) cout << "connect_et(); " << endl; connect_et(); if(!et_ok) { nev_no_et++; return; } } else if(et_alive(et_system_id)==0) { nev_no_et++; lost_connection=true; done=true; return; } // get new event from ET system status=et_event_new(et_system_id,et_attach_id,&et_event_ptr,ET_SLEEP,NULL,etbuffersize); if(status!=ET_OK) { if(debug) cerr << "?unable to get event, status is: " << status << endl; nev_no_et++; return; } // set control words...must set 1st word > 32 to not fool CODA for (int ii=0; iigetByteArrayLength(); if(len>0)memcpy((void*)pdata,(void*)msg->getByteArray(),len); et_event_setlength(et_event_ptr,len); if(debug)cout << "got event of len " << len << endl; } else if(dataType==evioBuffer) { int len = msg->getByteArrayLength(); if(len>0)memcpy((void*)pdata,(void*)msg->getByteArray(),len); et_event_setlength(et_event_ptr,len); if(debug)cout << "got buffer of len " << len << endl; if(debug) { cout << endl; for(int i=0; i<12; i++) cout << hex << "0x" << pdata[i] << " "; cout << dec << endl; } } else { // later will dispatch to plugins... cerr << "?cMsg2et...unknown data type: " << dataType << endl; } // return event to ET system if(et_alive(et_system_id)==1) { status=et_event_put(et_system_id,et_attach_id,et_event_ptr); if(status==ET_OK) { nev_to_et++; } else { if(debug) cerr << "?unable to put event, status is: " << status << endl; nev_no_et++; } } } else { cerr << "?cMsg2et...callback received unknown message type: " << type << endl; } } catch (cMsgException e) { cerr << "cMsg error: " + e.toString() << endl; } // done return; } //------------------------------------------------------------------------------- // end class definition //------------------------------------------------------------------------------- }; int main(int argc,char **argv) { // decode command line decode_command_line(argc,argv); // set session name if(mySession.empty())mySession="halldsession"; // post startup message cout << "Process startup: " << name << " in session " << mySession <(new cMsg(dataUDL,name+"/data","cMsg2et data connection")); cMsg* dataConn = new cMsg(dataUDL,name+"/data","cMsg2et data connection"); dataConn->connect(); // create callback for data insert messages and subscribe //dataCallback cb; dataCallback *cb = new dataCallback(); //dataConn->subscribe(name, insertType, cb, NULL, NULL); dataConn->subscribe(name, insertType, cb, NULL); if(debug)cout << "data connection subscription to " << name << "," << insertType << endl; dataConn->start(); // initialize et system init_et(); // create coda object and start processing //cMsg2et c(udl, name, "cMsg2et", mySession); cMsg2et *c = new cMsg2et(udl, name, "cMsg2et", mySession); // launch monitoring thread //thread t(monitoring_thread,c.rcConn); printf("Start thread \n"); int ret = pthread_create (&t, 0, monitoring_thread, c->rcConn); if (ret) { perror ("pthread_create"); } // start processing c->startProcessing(); if(force_start)dataConn->start(); // wait until done, periodically check ET system while(!done) { sleep(sleep_time); //cout << "check ET system... " << endl; // check ET system if(et_ok&&(et_alive(et_system_id)==0)) { lost_connection=true; done=true; } } // stop processing dataConn->stop(); c->stopProcessing(); } catch (CodaException e) { cout << e.toString() << endl; } // shutdown messages if(lost_connection)cerr << endl << "?cMsg2et...lost connection" << endl << endl; cout << "Process shutdown: " << name << endl; // done if(connected)et_forcedclose(et_system_id); exit(EXIT_SUCCESS); } //-------------------------------------------------------------------------- void atExitHandler(void) { cout << "cMsg2et...atExitHandler called" << endl; return; } //------------------------------------------------------------------- void quitCallback(int sig) { cout << "cMsg2et...quitCallback called with signal " << sig << endl; done=true; return; } //------------------------------------------------------------------- void monitoring_thread0(cMsg *conn) { while(!done) { sleep(1); // calc rates and set monitoring string now=time(NULL); if((now-last_time)>rate_time) { int delta=now-last_time; rec_rate=(double)(nev_rec-last_rec)/delta; proc_rate=(double)(nev_to_et-last_proc)/delta; no_et_rate=(double)(nev_no_et-last_no_et)/delta; last_time=now; last_rec=nev_rec; last_proc=nev_to_et; last_no_et=nev_no_et; // set monitoring string stringstream ss; ss << setw(10) << " " << nev_rec << " " << endl << " " << rec_rate << " " << endl << " " << nev_to_et << " " << endl << " " << proc_rate << " " << endl << " " << nev_no_et << " " << endl << " " << no_et_rate << " " << endl << " " << et_ok << " " << endl << ends; //cout << " conn ptr= " << conn << endl; //cout << " ss= " << ss.str() << endl; conn->setMonitoringString(ss.str()); // print monitoring info if requested if(verbose) { cout << " nev_rec " << nev_rec << endl << " rec_rate " << rec_rate << endl << " nev_to_et " << nev_to_et << endl << " proc_rate " << proc_rate << endl << " nev_no_et " << nev_no_et << endl << " no_et_rate " << no_et_rate << endl << " et_ok " << et_ok << endl << endl; } } } } //---------------------------------------------------------------- void init_et(void) { cout << "Will connect to et system: " << et_filename << endl; et_ok=false; et_open_config_init(&openconfig); et_open_config_setcast(openconfig, ET_DIRECT); et_open_config_setwait(openconfig, ET_OPEN_WAIT); if(remoteHost.empty()) { et_open_config_sethost(openconfig, localHost.c_str()); cout << "Use loc host = " << localHost << endl; } else { // et_open_config_sethost(openconfig,ET_HOST_ANYWHERE); et_open_config_sethost(openconfig, remoteHost.c_str()); cout << "Use rem host = " << remoteHost << endl; } return; } //--------------------------------------------------------------------- void connect_et() { int status; sigset_t sigblock; if(debug)cout << "in connect_et" << endl; et_ok=false; cout << "trying to connect to ET system" << endl; // open et system if(et_open(&et_system_id,et_filename.c_str(),openconfig)!=ET_OK) { cerr << "ERROR: Cannot connect to ET - return" << endl; return; } // get max normal event size et_system_geteventsize(et_system_id,&etbuffersize); cout << "INFO: event size = " << etbuffersize << endl; // block signals to THIS thread and any thread created by this thread // needed to keep signals from et threads sigfillset(&sigblock); pthread_sigmask(SIG_BLOCK,&sigblock,NULL); // attach to existing station status=et_station_attach(et_system_id,ET_GRANDCENTRAL,&et_attach_id); if(status!=ET_OK) { et_forcedclose(et_system_id); cerr << "Unable to attach to grandcentral station" << endl; done=true; return; } // unblock signals pthread_sigmask(SIG_UNBLOCK,&sigblock,NULL); // success connected=true; et_ok=true; cout << "...now connected to ET system: " << et_filename << ", station: grandcentral" << endl; return; } //--------------------------------------------------------------------- void decode_command_line(int argc, char**argv) { const string help = "\nusage:\n\n cMsg2et [-udl udl] [-dataUDL dataUDL] [-n name] [-s session] [-et etFileName]\n" " [-rhost remoteHost] [-sleep sleep_time] [-r rate_time] [-verbose] [-force] [-debug]\n"; int i=1; while(i