// // // $Id$ // $HeadURL$ // #include #include #include #include using namespace std; #include #include #include #include #include #include #include #include #include #ifdef HAVE_EVIO #include #include #include using namespace evio; #endif // HAVE_EVIO // For EPICS #ifdef HAVE_EZCA #include #include #endif #ifndef _DBG #define _DBG_ cout<<__FILE__<<":"<<__LINE__<<" " #define _DBG__ cout<<__FILE__<<":"<<__LINE__< &epics_vals); void InsertEventIntoCDAQ(map &epics_vals); int cdaq_tcp_event_snd( unsigned int *DATA, int lenDATA,int n,int k, unsigned int evtHDR, unsigned int TriggerID ); void InsertEventIntoEVIOfile(map &epics_vals); void InsertEventIntoASCIIfile(map &epics_vals); uint32_t BuildEPICSEvent(uint32_t *buff, map &epics_vals); uint32_t GetEPICSvalues(map &epics_vals); string GetEPICSvalue(epics_channel_t &ch); // Globals et_sys_id sys_id; et_att_id att_id; et_stat_id sta_id; evioChannel *chan; int eviofilehandle = 0; // only one of this or evio_ofs is used ofstream *evio_ofs = NULL; ofstream *ofs=NULL; uint32_t BUFFER_SIZE = 20000000; // in bytes bool WRITE_TO_ET=false; bool WRITE_TO_EVIOFILE=false; bool WRITE_TO_ASCIIFILE=false; bool WRITE_TO_CDAQ=false; bool ADD_NETWORK_TRANSFER_HEADER_ET = true; bool ADD_NETWORK_TRANSFER_HEADER_EVIOFILE = false; bool USE_EVIO_LIBRARY = true; bool DONE =false; //-------------------- // SigHandler //-------------------- void SigHandler(int sig) { cout << "Signal: " << sig << " received. Finishing ...." << endl; DONE = true; } //-------------------- // Usage //-------------------- void Usage(bool longform=false, string mess="") { cout << endl; cout << "Usage:" << endl; cout << " epics2et [options]" << endl; cout << "" << endl; cout << "Periodically read EPICS values and insert them into an ET system or file." << endl; cout << "" << endl; if(longform){ cout << "By default, this will read the configuration file:" << endl; cout << " $DAQ_HOME/config/epics2et.conf" << endl; cout << "The file should have two entries per line. The first being" << endl; cout << "the EPICS PV (=process variable) name and the second the " << endl; cout << "period in seconds in which to read out that variable. "; cout << "Events can be written to an ET system, CDAQ, EVIO file, or ASCII" << endl; cout << "formatted file. Any combination of these output formats can be" << endl; cout << "written simultaneously. Writing to an ET system can be used to insert" << endl; cout << "the events into a CODA-produced data file." << endl; cout << "Writing to CDAQ can be used to insert the events into a CDAQ-produced" << endl; cout << "data file. (Only one of ET or CDAQ should be used.)" << endl; cout << "" << endl; cout << "The ETsystem string is normally of the format:" << endl; cout << "" << endl; cout << "ET:etfilename:station:host:port" << endl; cout << "" << endl; cout << "for example:" << endl; cout << "" << endl; cout << "ET:/tmp/et_sys_hdops:MON:gluonraid1:23914" << endl; cout << "" << endl; cout << "This format is deliberately chosen to match that used by the" << endl; cout << "DAQ plugin and therefore, the online monitoring system. Note that" << endl; cout << "because this program (unlike monitoring programs) is a producer," << endl; cout << "it will always attach to the GRAND CENTRAL station, thereby" << endl; cout << "ignoring the 'station' field. Also, if the port number is not given" << endl; cout << "then the default ET port number of 11111 is used. Finally, if the" << endl; cout << "source does not start with the string 'ET:', then the source is " << endl; cout << "assumed to be an ET system file on the local file system." << endl; cout << "" << endl; cout << "At least one of the -et, -cdaq, -evio, or -ascii options must be given." << endl; cout << "" << endl; } cout << "options:" << endl; cout << " -h Print the short form Usage statement" << endl; cout << " --help Print the long form Usage statement" << endl; cout << " -v turn on verbose mode" << endl; cout << " -et ETsystem Write events to ET system with given name" << endl; cout << " -cdaq Write events to CDAQ system" << endl; cout << " -evio filename Write events to EVIO file with given name" << endl; cout << " -ascii filename Write events to ASCII file with given name" << endl; cout << " (if filename exists, it will be appended to)" << endl; cout << " -c file.conf Use this config file instead of default" << endl; cout << " -eviolib Do not use the EVIO library to write to the EVIO file" << endl; cout << " (if writing an EVIO file). NTH is forced in this case." << endl; cout << " -NTHevio Do NOT write out Network Transfer Header to EVIO file" << endl; cout << " " << (longform ? "(see below)":"(see long form help)") << endl; cout << " +NTHevio Do write out Network Transfer Header to EVIO file" << endl; cout << " " << (longform ? "(see below)":"(see long form help)") << endl; cout << " -NTHet Do NOT write out Network Transfer Header to ET event" << endl; cout << " " << (longform ? "(see below)":"(see long form help)") << endl; cout << " +NTHet Do write out Network Transfer Header to ET event" << endl; cout << " " << (longform ? "(see below)":"(see long form help)") << endl; cout << "" <0) cout << mess << endl << endl; exit(0); } //-------------------- // ParseCommandLineArguments //-------------------- void ParseCommandLineArguments(int narg, char *argv[]) { // If DAQ_HOME environment variable is set, use it to define default config file const char *ptr = getenv("DAQ_HOME"); if(ptr) CONFIG_FILENAME = string(ptr) + "/config/monitoring/epics2et.conf"; // Loop over al lcommand line arguments for(int i=1; i900){ cerr<<"Period out of range for variable \""<is_open(); if(!opened_file) evio_ofs = NULL; } if(!opened_file){ cerr << "ERROR opening EVIO file for output: " << OUTPUT_EVIO_FILENAME << endl; exit(-3); }else{ cout << "Writing EPICs events to EVIO file: " << OUTPUT_EVIO_FILENAME << endl; } } // Optionally open ASCII file for output if(WRITE_TO_ASCIIFILE){ ofs = new ofstream(OUTPUT_ASCII_FILENAME.c_str(), ios_base::app); if(!ofs->is_open()){ cerr << "ERROR opening ASCII file for output: " << OUTPUT_ASCII_FILENAME << endl; exit(-4); }else{ cout << "Writing EPICs events to ASCII file: " << OUTPUT_ASCII_FILENAME << endl; } } //- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // Loop while alive while (!DONE) { if(WRITE_TO_ET){ if(! et_alive(sys_id) ) break; } map epics_vals; if(GetEPICSvalues(epics_vals)){ if( WRITE_TO_ET ) InsertEventIntoET(epics_vals); if( WRITE_TO_CDAQ ) InsertEventIntoCDAQ(epics_vals); if( WRITE_TO_EVIOFILE ) InsertEventIntoEVIOfile(epics_vals); if( WRITE_TO_ASCIIFILE ) InsertEventIntoASCIIfile(epics_vals); } if(DONE) break; // Sleep for 1 second sleep(1); if(WRITE_TO_ET && !DONE){ if (!et_alive(sys_id)) et_wait_for_alive(sys_id); } } //- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // Close connections to output streams if(ofs ) { ofs->close(); delete ofs;} if(eviofilehandle ) evClose(eviofilehandle); if(evio_ofs ) evio_ofs->close(); if(WRITE_TO_ET ) et_close(sys_id); #endif // HAVE_EZCA return 0; } //---------------- // ConnectToET //---------------- void ConnectToET(const char* source_name) { #ifdef HAVE_ET /// Format for ET source strings is: /// /// ET:session:station:host:port /// /// The session is used to form the filename of the ET /// system. For example, if an session of "eb" is specified, /// then a file named "/tmp/et_sys_eb" is assumed to be /// what should be opened. If no session is specified (or /// an empty session name) then "none" is used as the session. /// /// If the station name specified does not exist, it will /// be created. If it does exist, the existing station will /// be used. If no station is specified, then the station /// name "DANA" will be used. Any station created will be /// set to "blocking" *unless* the configuration paramter /// EVIO:ET_STATION_CREATE_BLOCKING is set to "0" /// in which case it will be set to non-blocking. /// /// If the host is specified, then an attempt will be made /// to open that system. If it is not specified, then /// it will attempt to open an ET system on the local machine. /// /// If port is specified, it is used as the TCP port number /// on the remote host to attach to. If the host is not /// specified (i.e. by having two colons and therefore /// an empty string) then the port is ignored. If the /// port is omitted or specified as "0", then the default /// port is used. /// // Split source name into session, station, etc... vector fields; string str = source_name; size_t startpos=0, endpos=0; while((endpos = str.find(":", startpos)) != str.npos){ size_t len = endpos-startpos; fields.push_back(len==0 ? "":str.substr(startpos, len)); startpos = endpos+1; } if(startpos1 ? fields[1]:""; string station = fields.size()>2 ? fields[2]:""; string host = fields.size()>3 ? fields[3]:"localhost"; int port = fields.size()>4 ? atoi(fields[4].c_str()):ET_SERVER_PORT; if(session == "") session = "none"; if(station == "") station = "DANA"; if(host == "") host = "localhost"; string fname = session.at(0)=='/' ? session:(string("/tmp/et_sys_") + session); // Override station name with "GRAND CENTRAL" for display purposes // We are a producer so we always connect to that. We allow station // to be passed so we can accept same format as consumers. station = "GRAND CENTRAL"; // Report to user what we're doing cout << " Opening ET system:" << endl; if(session!=fname) cout << " session: " << session << endl; cout << " station: " << station << endl; cout << " system file: " << fname << endl; cout << " host: " << host << endl; if(port !=0) cout << " port: " << port << endl; // connect to the ET system et_openconfig openconfig; et_open_config_init(&openconfig); if(host != ""){ et_open_config_setcast(openconfig, ET_DIRECT); et_open_config_setmode(openconfig, ET_HOST_AS_LOCAL); // ET_HOST_AS_LOCAL or ET_HOST_AS_REMOTE et_open_config_sethost(openconfig, host.c_str()); et_open_config_setport(openconfig, ET_BROADCAST_PORT); et_open_config_setserverport(openconfig, port); } int err = et_open(&sys_id,fname.c_str(),openconfig); if(err != ET_OK){ cerr << __FILE__<<":"<<__LINE__<<" Problem opening ET system"< BUFFER_SIZE){ cout<<" Events in ET system are larger than currently set buffer size:"< "< &epics_vals) { // Encode values into EVIO buffer uint32_t buff[50000]; BuildEPICSEvent(buff, epics_vals); int nwords = buff[0] + 1; // +1 is for length word if(nwords <=1 ) return; // get new/unused event et_event *pe; int status = et_event_new(sys_id, att_id, &pe, ET_SLEEP, NULL, (nwords+8)*sizeof(uint32_t)); if (status != ET_OK) { printf("et_producer: error in et_event_new\n"); exit(0); } // Get pointer to data of new event char *pdata; et_event_getdata(pe,(void**)&pdata); // When CODA writes events to ET, it adds a network transport header first // If the ADD_NETWORK_TRANSFER_HEADER is true, then add this header uint32_t *iptr = (uint32_t*)pdata; if(ADD_NETWORK_TRANSFER_HEADER_ET){ uint32_t nwords_tot = nwords + 8; // +8 is for network transfer header size // This is worth a note since it took me a couple of days to figure this // out: The 6th word is the Bit Info/Version word. The documentation at // the top of the BlockHeaderV4.java file describes the bits, but the numbers // start from "1", not 0. Therefore to set "bit 10" we really need to add // (1<<9). This bit turns out to be crucial since it tells the Event Recorder // that there are no more events stacked in the ET event. Without it, the ER // will assume there are more, encounter nonsense bytes, and then report that // the buffer is not in EVIO 4 format. The (4<<10) bit should signify that // this is "user data" though it is unclear if that affects anything. The // final "4" indicates this is EVIO version 4. *iptr++ = nwords_tot; // Block Length (inclusive) *iptr++ = 0; // Block Number *iptr++ = 8; // Header Length *iptr++ = 1; // Event count *iptr++ = 0; // Reserved 1 *iptr++ = (4<<10) + (1<<9) + 4; // Bit Info/Version (see note above) *iptr++ = 0; // Reserved 2 *iptr++ = 0xc0da0100; // Magic number // Byte swap NTH header so it will match banks after they are swapped below // n.b. this must be done since the 0xcoda0100 magic word is used to tell // the endianess of both the NTH and the bank contents that follow. uint32_t *itmp = (uint32_t*)pdata; for(uint32_t i=0; i<8; i++) itmp[i] = EVIO_SWAP32(itmp[i]); } // Event Recorder balks if buffer is not big endian so swap it // while copying it into the output ET event buffer. evioswap( buff, 0, iptr); // put event back into the ET system status = et_event_put(sys_id, att_id, pe); if (status != ET_OK) { printf("et_producer: put error\n"); exit(0); } if(VERBOSE)cout<<"epics2et: --- Event Added to ET ("< &epics_vals) { // Write the event to CDAQ system. // Skip first 8 words if writing NTH, otherwise write to top of buffer uint32_t BUFFER[50000]; LENEVENT=8195; const unsigned int EPICS_TRIGGERID= 0xAAAA5555; const unsigned int EVT_Type_EPICS=(0x3&0xFF)<<16; //-- EPICS=0x3 const int RocID=0xFF; // epics ID BUFFER[0]=1&0xff ; //-- number of ROCs=1 (max 255 !) BUFFER[0] |= EVT_Type_EPICS; int RocID=0xFF; // epics ID unsigned int MODID=(RocID&0xFF)<<24; //-- ModID 8-bit IP based BUFFER[0] |= MODID; BUFFER[1]=EPICS_TRIGGERID; BUFFER[2]=LENEVENT; //-- hdr=(i & 0xF) << 24; uint32_t *iptr = &BUFFER[3]; BuildEPICSEvent(iptr, epics_vals); // unsigned int evtTrigID=BUFFER[1]; // int evtModID=(BUFFER[0]>>24)&0xf; // int evtSize=BUFFER[0]&0xfffff; // // printf("==> SEND:: Trg=%d(%d,%d) Mod=%d(%d) siz=%d(%d), rate=%.1f data=%.3f GB/s\n" // ,evtTrigID,itrg,BUFFER[1],evtModID,i,evtSize,LENEVENT,rate,LENEVENT*4*rate*nmod/1073741824.); rc=tcp_event_snd(BUFFER,LENEVENT,nmod,i,hdr,itrg); if (rc<0) { printf(" ERROR send \n"); sleep(1); } } //-------------------- // cdaq_tcp_event_snd //-------------------- int cdaq_tcp_event_snd( unsigned int *DATA, int lenDATA,int n,int k, unsigned int evtHDR, unsigned int TriggerID ) { static char hostname[100]; static int sd; static struct sockaddr_in pin; struct hostent *hp; static int TCP_FLAG,HEADER[10]; int nleft,nsent; static unsigned int time0,time1; char* snd; // 0=initial -1 -no connection -2 error send if (PORT==0) { //-- set HOST and PORT to default: localhost:20249 strncpy(HOST,"localhost",128); PORT=20249; printf("==> TCPclient:: set DEFAULT!! HOST=%s PORT=%d \n",HOST,PORT); } if(TCP_FLAG==0) { TCP_FLAG=-2; time((time_t*)&time0); strcpy(hostname,HOST); printf("TCPclient:: go find out about the desired host machine \n"); if ((hp = gethostbyname(hostname)) == 0) { perror("gethostbyname"); return -1; } printf( "IP=%u.%u.%u.%u \n", (unsigned char) hp->h_addr_list[0][0], (unsigned char) hp->h_addr_list[0][1], (unsigned char) hp->h_addr_list[0][2], (unsigned char) hp->h_addr_list[0][3]); //-------- fill in the socket structure with host information memset(&pin, 0, sizeof(pin)); pin.sin_family = AF_INET; pin.sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr; pin.sin_port = htons(PORT); //-------- grab an Internet domain socket if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); return -1; } printf("TCPclient:: try to connect to %s port=%d\n",hostname,PORT); //------- connect to PORT on HOST if (connect(sd,(struct sockaddr *) &pin, sizeof(pin)) == -1) { perror("connect"); return -1; } printf("TCPclient:: CONNECTED to %s port=%d local_sock=%d port=%d\n",hostname,PORT,sd,ntohs(pin.sin_port)); TCP_FLAG=1; //--- server OK, send DATA } else if (TCP_FLAG==-1) { //--- no server (error connect) time((time_t*)&time1); //---- timer for retry printf("TCPclient:: t1=%d t0=%d t1-t0=%d \n",time1,time0,time1-time0); if ((time1-time0) > T_WAIT ) { time0=time1; printf("TCPclient:: %d RE-try to connect to %s\n",time1-time0,hostname); TCP_FLAG=0; }; return -1; } else if (TCP_FLAG==-2) { //--- server disappeared (error send) printf("TCPclient:: close socket... %s\n",hostname); close(sd); time((time_t*)&time0); TCP_FLAG=-1; return -1; }; //-------------------------------------------------- //------------ ENDIF TCP_FLAG ------------------- //-------------------------------------------------- HEADER[0]=0x5; //--- buffered for evb HEADER[1]=0xAABBCCDD; HEADER[2]=lenDATA; HEADER[3]=evtHDR; HEADER[4]=TriggerID; HEADER[5]=n; HEADER[6]=k; HEADER[7]=k; #ifdef DEBUG printf("send HEADER size=%d\n",sizeof(HEADER)); #endif if (send(sd, (char*) HEADER, sizeof(HEADER), 0) == -1) { perror("send"); TCP_FLAG=-2; return -1; } #ifdef DEBUG printf("send DATA lenDATA=%d bytes (%d words) nmod=%d mod=%d trigger=%d\n",lenDATA*4,lenDATA,n,k, TriggerID); #endif nleft=lenDATA*4; snd=(char*)DATA; while(nleft>0){ if(DEBUG>3) printf("try to send = %d of %d\n",PACKSIZE,nleft); if (nleft &epics_vals) { // Write the event to output EVIO file. // Skip first 8 words if writing NTH, otherwise write to top of buffer uint32_t buff[50000]; uint32_t *iptr = ADD_NETWORK_TRANSFER_HEADER_EVIOFILE ? &buff[8]:buff; BuildEPICSEvent(iptr, epics_vals); int nwords = iptr[0] + 1; // +1 is for length word if(nwords <=1 ) return; // Optionally prepend NTH uint32_t nwords_tot = nwords; if(ADD_NETWORK_TRANSFER_HEADER_EVIOFILE){ iptr = buff; nwords_tot += 8; // +8 is for network transfer header size *iptr++ = nwords_tot; // Block Length (inclusive) *iptr++ = 0; // Block Number *iptr++ = 8; // Header Length *iptr++ = 1; // Event count *iptr++ = 0; // Reserved 1 *iptr++ = (4<<10) + (1<<9) + 4; // Bit Info/Version (Bit info in bits 11-14. 4=User event) version=4 *iptr++ = 0; // Reserved 2 *iptr++ = 0xc0da0100; // Magic number } if(USE_EVIO_LIBRARY){ if(eviofilehandle) evWrite(eviofilehandle, buff); }else{ if(evio_ofs) evio_ofs->write((const char*)buff, nwords_tot*sizeof(uint32_t)); } if(VERBOSE)cout<<"epics2et: --- Event Added to File ("< &epics_vals) { // Write the event to output ASCII file. time_t t = atol(epics_vals["timestamp"].c_str()); *ofs << "+++++++ New Event: " << ctime(&t); map::iterator iter=epics_vals.begin(); for(; iter!=epics_vals.end(); iter++){ const string &name = iter->first; const string &val = iter->second; int len = 35 - name.length(); if(len<1) len =1; string pad(len,' '); *ofs << name << pad << val < &epics_vals) { // Store the EPICS event in EVIO as a bank of SEGMENTs // The first segment is a 32-bit unsigned int for the current // time. This is followed by additional SEGMENTs that are // 8-bit unsigned integer types, one for each variable being // written. All variables are written as strings in the form: // // varname=value // // where "varname" is the EPICS variable name and "value" its // value in string form. If there are multiple elements for // the PV, then value will be a comma separated list. The // contents of "value" are set in GetEPICSvalue() while // the "varname=value" string is formed here. buff[0] = 0; // initialize to zero in case we exit early // If no values to write then return now if(epics_vals.size() == 0) return 0; // Outermost EVIO header is a bank of segments. uint32_t *iptr = &buff[1]; // skip length word // Overall bank header indicating it contains SEGMENTs *iptr++ = (0x60<<16) + (0xD<<8) + (0x1<<0); // Time word (unsigned 32bit SEGMENT) *iptr++ = (0x61<<24) + (0x1<<16) + (1<<0); *iptr++ = (uint32_t)atol(epics_vals["timestamp"].c_str()); // Loop over PVs, writing each as a SEGMENT to the buffer map::iterator iter=epics_vals.begin(); for(; iter!=epics_vals.end(); iter++){ const string &name = iter->first; const string &val = iter->second; string str = name + "=" + val; uint32_t Nbytes = str.size()+1; // +1 is for terminating 0 uint32_t Nwords = (Nbytes+3)/4; // bytes needed for padding uint32_t Npad = (Nwords*4) - Nbytes; *iptr++ = (0x62<<24) + (Npad<<22) + (0x7<<16) + (Nwords<<0); // 8bit unsigned char segment unsigned char *ichar = (unsigned char*)iptr; for(unsigned int j=0; j &epics_vals) { // This routine will loop over the list of EPICs PVs and // try to get each one. Values that it successfully retreieves // will be copied into the caller-supplied "epics_vals" map. // In addition, the current time to associate with these // readings is copied into the map with the key "timestamp". // Get current time in seconds since 1970 uint32_t now = time(NULL); // Loop over the variable list and see if anyone is due to be written out for(int i=0;i EPICS_CHANNELS[i].last_read+EPICS_CHANNELS[i].period ){ string val = GetEPICSvalue(EPICS_CHANNELS[i]); if(val.length()>0){ epics_vals[EPICS_CHANNELS[i].name] = val; EPICS_CHANNELS[i].last_read = now; } } } // Don't add timestamp if no values! if(epics_vals.empty()) return 0; // Add timestamp char tstr[256]; sprintf(tstr, "%d", now); epics_vals["timestamp"] = tstr; // Return number of values return epics_vals.size(); } //-------------------- // GetEPICSvalue //-------------------- string GetEPICSvalue(epics_channel_t &ch) { // Alloocate array to hold all values uint32_t Nelem = ch.num_elements; float *fvals = new float[Nelem]; stringstream ss; // holds values in string form #ifdef HAVE_EZCA // Get the values int ret = ezcaGet((char*)(ch.name.c_str()), ezcaFloat, Nelem, fvals); // Form string of value(s) obtained) if(ret == EZCA_OK){ for(uint32_t i=0; i0) ss << ","; ss << fvals[i]; } }else{ cerr << "Error retrieving variable: " << ch.name << " (code=" << hex << ret << dec << ")" << endl; } #endif // HAVE_EZCA delete[] fvals; return ss.str(); }