// et2et // Transfers ET buffers from one ET system to another // based on et_2_et.c by Carl Timmer // ejw, 4-sep-2013 #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; #define _DBG__ cout<<__FILE__<<":"<<__LINE__<<" " << endl; // for ET #include "et.h" using namespace std; // misc vars string ET_FILENAME1, ET_STATION1, ET_HOST1; string ET_FILENAME2, ET_STATION2, ET_HOST2; int ET_PORT1, ET_PORT2; int ET_CHUNK = 20; // number buffers to get from ET at one go int ET_NBUFFERS = 20; // number of buffers to tranfer at one go uint64_t ET_NEVENTS_TRANSFERRED = 0; int rate_time = 2; // seconds between printing ticker (0 means don't print) // prototypes void ParseETstring(const string &etstr, string &etfname, string &stationname, string &host, int &tcp_port); void Usage(void); void ParseCommandLineArguments(int argc, char **argv); //------------------------------------------------------------------------------- //------------------------------------------------------------------------------- //---------------------------------------------------------------- // monitoring_thread //---------------------------------------------------------------- static void* monitoring_thread(void* ptr) { time_t now; time_t last_time = time(NULL); uint64_t last_ntransferred = ET_NEVENTS_TRANSFERRED; double proc_rate; while(true) { // sleep if(rate_time>0) { sleep(rate_time); } else { sleep(1); } now=time(NULL); int delta = now-last_time; if( (rate_time>0) && (delta>rate_time) ) { proc_rate=(double)(ET_NEVENTS_TRANSFERRED-last_ntransferred)/delta; last_time=now; last_ntransferred = ET_NEVENTS_TRANSFERRED; cout << ET_NEVENTS_TRANSFERRED << " events transferred"; cout << " (" << proc_rate << " Hz) \r"; cout.flush(); } } return NULL; // 2/13/2015 DL } //---------------------------------------------------------------- // main //---------------------------------------------------------------- int main(int argc,char **argv) { int status; int ntransferred=0; et_statconfig sconfig; et_openconfig openconfig,openconfig2; et_bridgeconfig bconfig; et_att_id att_from, att_to; et_stat_id stat_from; et_sys_id id_from, id_to; // decode command line ParseCommandLineArguments(argc,argv); // launch monitoring thread pthread_t thr; pthread_create(&thr, NULL, monitoring_thread, NULL); // open first ET system et_open_config_init(&openconfig); et_open_config_sethost(openconfig, ET_HOST1.c_str()); et_open_config_settcp(openconfig, 2000000, 130000, 0); et_open_config_setcast(openconfig, ET_DIRECT); et_open_config_setserverport(openconfig, ET_PORT1); if (et_open(&id_from, ET_FILENAME1.c_str(), openconfig) != ET_OK) { printf("%s:%d -- et_open problem for system 1\n", ET_HOST1.c_str(), ET_PORT1); exit(1); } et_open_config_destroy(openconfig); printf("Opened et system 1\n"); // open second ET system et_open_config_init(&openconfig2); et_open_config_sethost(openconfig2, ET_HOST2.c_str()); et_open_config_settcp(openconfig2, 2000000, 130000, 0); et_open_config_setcast(openconfig2, ET_DIRECT); et_open_config_setserverport(openconfig2, ET_PORT2); if (et_open(&id_to, ET_FILENAME2.c_str(), openconfig2) != ET_OK) { printf("%s:%d -- et_open problem for system 2\n", ET_HOST2.c_str(), ET_PORT2); exit(1); } et_open_config_destroy(openconfig2); printf("Opened et system 2\n"); et_station_config_init(&sconfig); et_station_config_setuser(sconfig, ET_STATION_USER_MULTI); et_station_config_setrestore(sconfig, ET_STATION_RESTORE_OUT); et_station_config_setprescale(sconfig, 1); et_station_config_setcue(sconfig, 5); et_station_config_setselect(sconfig, ET_STATION_SELECT_ALL); et_station_config_setblock(sconfig, ET_STATION_NONBLOCKING); // set debug level et_system_setdebug(id_from, ET_DEBUG_INFO); et_system_setdebug(id_to, ET_DEBUG_INFO); // create station if ((status = et_station_create(id_from, &stat_from, ET_STATION1.c_str(), sconfig)) < ET_OK) { if (status == ET_ERROR_EXISTS) { /* my_stat contains pointer to existing station */; printf("%s:%d -- station \"%s\" already exists, reusing...\n", ET_HOST1.c_str(), ET_PORT1, ET_STATION1.c_str()); } else { printf("%s:%d -- error in station creation of \"%s\"\n", ET_HOST1.c_str(), ET_PORT1, ET_STATION1.c_str()); cerr << et_perror(status) << endl; exit(EXIT_FAILURE); } } et_station_config_destroy(sconfig); // attach to stations if (et_station_attach(id_from, stat_from, &att_from) < 0) { printf("%s:%d -- \"%s\" error in station attach\n", ET_HOST1.c_str(), ET_PORT1, ET_STATION1.c_str()); exit(EXIT_FAILURE); } if (et_station_attach(id_to, ET_GRANDCENTRAL, &att_to) < 0) { printf("%s:%d -- error attaching to GrandCentral station\n", ET_HOST2.c_str(), ET_PORT2); exit(EXIT_FAILURE); } // set up bridge et_bridge_config_init(&bconfig); et_bridge_config_setchunkfrom(bconfig, ET_CHUNK); et_bridge_config_setchunkto(bconfig, ET_CHUNK); // transfer events status=ET_OK; while (status == ET_OK) { status = et_events_bridge(id_from, id_to, att_from, att_to, bconfig, ET_NBUFFERS, &ntransferred); ET_NEVENTS_TRANSFERRED += (uint64_t)ntransferred; } if(status != ET_OK) cerr << et_perror(status) << endl; // done et_bridge_config_destroy(bconfig); et_forcedclose(id_from); et_forcedclose(id_to); return(EXIT_SUCCESS); } //---------------------------------------------------------------- // ParseETstring // // Parse the given ET specification string by splitting on // colons (:) and copying the results into the given references. // If there are not exactly 5 tokens, then the program exits // with an error //---------------------------------------------------------------- void ParseETstring(const string &etstr, string &etfname, string &stationname, string &host, int &tcp_port) { stringstream ss(etstr); string item; vector tokens; while(std::getline(ss, item, ':')) tokens.push_back(item); if(tokens.size()!=5){ cerr << "Unable to parse ET specification string: \"" << etstr << "\"" << endl; cerr << "Expected 5 tokens but got " << tokens.size() << endl; exit(-1); } tcp_port = atoi(tokens[4].c_str()); host = tokens[3]; stationname = tokens[2]; etfname = tokens[1]; } //---------------------------------------------------------------- void Usage(void) { cout << endl; cout << " Usage:" < posargs; // positional arguments for(int i=1; i