/*----------------------------------------------------------------------------* GlueX file to ET producer This program reads events from an EVIO(CODA) file(s) and then inserts them into an ET system. 11/20/2015 I did some profiling of this program to see how it might be optimized. In it's current state (svn revision 19684) it can write 27.5kB events from run 2931 into a local ET system at about 30kHz. Roughly 1/3 of this time is spent reading (from file cache in RAM). About 1/3 is spent byte swapping, and about 1/4 to 1/5 is spent in memcpy. This could be optimized to get better performance, but will require major refactoring (or possibly rewriting from scratch). At best, one could improve the speed by about a factor of 3. Rewriting this now is not the highest priority so I'm leaving this note to remind me what my thoughts are so I can take it up later. The optimized program should work with at least 4 threads: Thread 1: Read from file into large buffers Thread 2: Make list of pointers to individual EVIO events within the large buffers. Events spanning more than one buffer will need to have the parts copied into a separate, single buffer. Thread 3: Byte swap the single event buffers into a separate set of buffers. Thread 4: Copy events from the byte-swapped buffers into ET events A fifth thread may also be needed to print the event ticker. Since the data will flow through multiple layers of buffers handled by multiple threads, back pressure will need to be implemented. This can be done using pthread condition waits so upstream threads can sleep while waiting for downstream threads to finish consuming existing buffers. *----------------------------------------------------------------------------*/ #include #include #include #include #include using namespace std; #include #include #include #include #include #include #include #include #include //#define max_event_size 100000 uint32_t max_event_size=20000000; // Globals et_sys_id id; et_att_id attach; string et_fname(""); useconds_t DELAY = 0; vector source_files; bool QUIT = false; bool USE_HDEVIO = true; bool READ_EVIO = true; bool WRITE_ET = true; uint64_t WRITE_ET_PRESCALE = 0; bool LOOP=false; bool ADD_NETWORK_TRANSFER_HEADER = true; bool FORCE_SWAP=false; uint32_t BLOCK_SIZE=1; int PORT = 0; int VERBOSE = 0; HDEVIO *hdevio = NULL; #if defined(__APPLE__) const char *HOST = "127.0.0.1"; #else const char *HOST = "localhost"; #endif // Routines int InsertEventsIntoET(vector &buffs, vector &buff_sizes, uint32_t &NwordsET); void DumpBinary(const uint32_t *iptr, const uint32_t *iend=NULL, uint32_t MaxWords=0, const uint32_t *imark=NULL); void Usage(void); int SIGINT_RECEIVED = 0; //----------------------------------------------------------------- // ctrlCHandle //----------------------------------------------------------------- void ctrlCHandle(int x) { SIGINT_RECEIVED++; cerr<1000000){ cout << "BLOCK_SIZE > 1M ("< buffs; for(uint32_t i=0; i buff_sizes; // Outer loop for when LOOP is true if(LOOP){ cout<<"Entering infinite loop over events. Hit ctl-C to quit ..."<ResizeFileBuffer(1024*1024*1); //hdevio->disable_bank_swap = true; if(!hdevio->is_open){ cerr<<"Error opening \""<readNoFileBuff(iptr, max_event_size, false); if(hdevio->err_code == HDEVIO::HDEVIO_EOF) break; if(hdevio->err_code != HDEVIO::HDEVIO_OK){ cout << endl << "HEDVIO ERROR: " << hdevio->err_code << endl; if(!LOOP) QUIT = true; // only quit if not looping break; } }else{ if(evRead(handle, iptr, max_event_size)!=S_SUCCESS) break; } // Event size in words should be first word in buffer event_size = (hdevio->swap_needed ? swap32(*iptr):*iptr) + 1; // Number of L1 events should be in 2nd word uint32_t header = (hdevio->swap_needed ? swap32(iptr[1]):iptr[1]); NeventsL1 += header&0xFF; }else{ event_size = 4980; // dummy event size (~19.4kB) } if(event_size>max_event_size)event_size = max_event_size; buff_sizes.push_back( event_size ); if(VERBOSE>=3) DumpBinary(iptr, &iptr[event_size], 256); if(buff_sizes.size() == BLOCK_SIZE){ uint32_t nwordsET=0; if(WRITE_ET) { bool skip_et = false; if( (WRITE_ET_PRESCALE!=0) && (NeventsEVIO%WRITE_ET_PRESCALE!=0) ) skip_et = true; if(!skip_et) { InsertEventsIntoET(buffs, buff_sizes, nwordsET); NeventsET++; NwordsET += nwordsET; } } buff_sizes.clear(); } // Print ticker indicating rate and average event sizes NeventsEVIO++; NwordsEVIO += event_size; time_t t = time(NULL); time_t delta_t = t - last_time; if(delta_t>=3){ float rateL1 = (float)NeventsL1/(float)delta_t/1000.0; float rateEVIO = (float)NeventsEVIO/(float)delta_t/1000.0; float rateET = (float)NeventsET/(float)delta_t/1000.0; float avgsizeL1 = (float)NwordsEVIO*4.0/(float)NeventsL1/1024.0; float avgsizeEVIO = (float)NwordsEVIO*4.0/(float)NeventsEVIO/1024.0; float avgsizeET = (float)NwordsET*4.0/(float)NeventsET/1024.0; cout << " L1/EVIO/ET rate: " << rateL1 << " kHz/ " << rateEVIO << " kHz/ " << rateET << " kHz Avg. L1/EVIO/ET evt size: " << avgsizeL1 << "kB/ " << avgsizeEVIO << "kB/ " << avgsizeET << "kB \r"; cout.flush(); NeventsL1 = 0; NeventsEVIO = 0; NwordsEVIO = 0; NeventsET = 0; NwordsET = 0; last_time = time(NULL); } // Set buffer for next evio event read iptr = (uint32_t*)buffs[buff_sizes.size() ]; } cout << endl; // Close EVIO file if(handle) evClose(handle); if(hdevio) {hdevio->PrintStats(); delete hdevio;} } if(QUIT) break; }while(LOOP); uint32_t nwordsET; if(!buff_sizes.empty()) InsertEventsIntoET(buffs, buff_sizes, nwordsET); // Free memory used for event buffers for(uint32_t i=0; i &buffs, vector &buff_sizes, uint32_t &NwordsET) { et_event *pe; int status; char *pdata; // Make sure ET system is still alive. Wait for it if not. if (!et_alive(id)) { et_wait_for_alive(id); } // Get total size of ET buffer needed for all events uint32_t nwords = 0; for(uint32_t i=0; iswap_block((uint32_t*)pdata, 8, (uint32_t*)pdata); hdevio->swap_block((uint32_t*)pdata, 8, (uint32_t*)pdata); } // Copy contents into new event for(uint32_t i=0; i=3) cout << "swapping " << buff_sizes[i] << " words to " << hex << iptr << dec << endl; hdevio->swap_bank(iptr, (uint32_t*)buffs[i], buff_sizes[i]); if(VERBOSE>=3) cout << "err_mess: " << hdevio->err_mess.str() << endl; }else{ if(VERBOSE>=3) cout << "copying " << buff_sizes[i] << " words to " << hex << iptr << dec << endl; memcpy( iptr, (char*)buffs[i], buff_sizes[i]*sizeof(int)); } iptr = &iptr[buff_sizes[i]]; } if(VERBOSE>=3){ iptr = (uint32_t*)pdata; DumpBinary(iptr, &iptr[iptr[1]], 256); } // put event back into the ET system status = et_event_put(id, attach, pe); if (status != ET_OK) { printf("et_producer: put error\n"); exit(0); } // Delay a little (if specified) to lower the event rate if(DELAY != 0)usleep(DELAY); return 0; } //---------------- // DumpBinary //---------------- void DumpBinary(const uint32_t *iptr, const uint32_t *iend, uint32_t MaxWords, const uint32_t *imark) { /// This is used for debugging. It will print to the screen the words /// starting at the address given by iptr and ending just before iend /// or for MaxWords words, whichever comes first. If iend is NULL, /// then MaxWords will be printed. If MaxWords is zero then it is ignored /// and only iend is checked. If both iend==NULL and MaxWords==0, then /// only the word at iptr is printed. cout << "Dumping binary: istart=" << hex << iptr << " iend=" << iend << " MaxWords=" << dec << MaxWords << endl; if(iend==NULL && MaxWords==0) MaxWords=1; if(MaxWords==0) MaxWords = (uint32_t)0xffffffff; uint32_t Nwords=0; while(iptr!=iend && Nwords=MaxWords) break; stringstream iptr_hex; iptr_hex << hex << "0x" << *iptr; string mark = (iptr==imark ? "*":" "); line1 << setw(12) << iptr_hex.str() << mark; line2 << setw(12) << *iptr << mark; } cout << line1.str() << endl; cout << line2.str() << endl; cout << endl; } } //------------------------ // Usage //------------------------ void Usage(void) { cout<