#include #include #include #include #include #include #include #include #include #include using namespace std; #include "byteswap.h" extern "C" { #include }; #include uint32_t BUFFER_SIZE_WORDS = 100000; string SQLITE_FILENAME = "hd_runlog.sqlite"; string MYSQL_HOST = "hallddb.jlab.org"; string MYSQL_USER = "datmon"; string MYSQL_PASSWORD = ""; string MYSQL_DB = "data_monitoring"; bool WRITE_TO_SCREEN = true; bool WRITE_TO_SQLITE = true; bool WRITE_TO_MYSQL = true; #define _DBG_ cout<<__FILE__<<":"<<__LINE__<<" " #define _DBG__ cout<<__FILE__<<":"<<__LINE__< &finfos); void WriteToSQLiteDB(map &finfos); void WriteToMySQLDB(map &finfos); void UpdateRunsTableinMySQLDB(MYSQL *con, set &runs); //----------------------- // Usage //----------------------- void Usage(void) { cout << endl; cout << "Usage():" << endl; cout << " eviocount file.evio" << endl; cout << endl; cout << " -sqlite Don't write to SQLite file" << endl; cout << " -mysql Don't write to MySQL DB" << endl; cout << " -screen Don't write to screen" << endl; cout << endl; exit(0); } //----------------------- // main //----------------------- int main(int narg, char *argv[]) { // Loop over command line arguments vector filenames; for(int i=1; i finfos; uint32_t *buff = new uint32_t[BUFFER_SIZE_WORDS]; uint32_t buff_size_bytes = BUFFER_SIZE_WORDS*sizeof(uint32_t); for(uint32_t i=0; ibuff_size_bytes ? buff_size_bytes:len)/sizeof(uint32_t); // File pointer is left pointing to end of file. Loop over file // contents staring from back until we find event header. uint64_t last_event=0; time_t tlast_event=0; uint32_t Ntrys=0; while(ifs.good()){ cout << "."; cout.flush(); // Read in next BUFFER_SIZE_WORDS words (working backwards from end) int32_t offset = (int32_t)(Nwords*sizeof(uint32_t)); ifs.seekg(-offset, ifs.cur); ifs.read((char*)buff, Nwords*sizeof(uint32_t)); Nwords = ifs.gcount()/sizeof(uint32_t); if(Nwords<8) break; for(uint32_t i=Nwords-8; i>0; i--){ uint32_t w = buff[i]; // Physics Event Header bits that should be set //if( (w && 0xFF501000) != 0xFF501000 ) continue; if( (w & 0x001050FF) != 0x001050FF ) continue; // Physics Event Header bits that should not be set //if( (w && 0x000E0F00) != 0x00000000 ) continue; if( (w & 0x000F0E00) != 0x00000000 ) continue; // Number of events in block (i.e. M) uint64_t Nevents_in_block = EVIO_SWAP32(w) & 0xFF; // Jump 2 words to Trigger bank w = buff[i+2]; // Built Trigger Bank bits that should be set //if( (w && 0xFF202000) != 0xFF202000 ) continue; if( (w & 0x002020FF) != 0x002020FF ) continue; // First bank in Trigger bank should be 64bit int type =0xa w = buff[i+3]; //if( w&& 0x000A0000 != 0x000A0000 ) continue; if( (w & 0x00000A00) != 0x00000A00 ) continue; last_event = EVIO_SWAP64(*((uint64_t*)&buff[i+4])) + Nevents_in_block - 1; tlast_event = EVIO_SWAP64(*((uint64_t*)&buff[i+6])); // units? offset? break; } if(++Ntrys >= 100) break; // limit how far we look back in file if(last_event != 0) break; ifs.seekg(-offset, ifs.cur); // jump back to before this last read ifs.seekg(8*sizeof(uint32_t), ifs.cur); // Move forward 8 words so we don't miss header due to boundary } cout << endl; // Close file ifs.close(); // If "RunPeriod" appears in filename, cut out all directories // in front of it so that it can more easily be used as a // reltive filename in /mss, /cache, /gluex/raid, ... size_t pos = filename.find("RunPeriod"); if(pos != filename.npos) filename.erase(0, pos); // Calculate end time for file from 250MHz clock timestamp of // last event, timestamp of first event, and start time of "Go" // event. double tdiff = 4.0E-9 * (double)(tlast_event - tfirst_event); time_t end_time = start_time + (time_t)tdiff; // Number of events this file uint32_t N = last_event - first_event + 1; if(N==0) N=1; // Calculate avg. event size in kB uint32_t avg_event_size_kB = (len/N)>>10; // Make reference to DataFileinfo object, creating it in the process DataFileInfo &finfo = finfos[filename]; finfo.filename = filename; finfo.size = len; finfo.Nevents = N; finfo.first_event = first_event; finfo.last_event = last_event; finfo.start_time = start_time; finfo.end_time = end_time; finfo.event_size_kB = avg_event_size_kB; // Extract run number and file number from filename if(filename.length()>=15 && filename.substr(filename.length()-5) == ".evio"){ finfo.run = atoi(filename.substr(filename.length()-15).c_str()); finfo.part = atoi(filename.substr(filename.length()- 8).c_str()); } } cout << endl; cout << "Found info for " << finfos.size() << " files." <::iterator it = finfos.begin(); for(; it!=finfos.end(); it++){ DataFileInfo &dfi = it->second; // Look for the next part file for this run. Since we key the map // by filename, they should be in order so we only need to look // forward. map::iterator it2 = it; for(it2++; it2!=finfos.end(); it2++){ if(it2->second.run != dfi.run) continue; if(it2->second.part != dfi.part+1) continue; if(it2->second.start_time != 0) continue; it2->second.start_time = dfi.end_time; it2->second.end_time += it2->second.start_time; } } if(WRITE_TO_SCREEN) WriteToScreen(finfos); if(WRITE_TO_SQLITE) WriteToSQLiteDB(finfos); if(WRITE_TO_MYSQL ) WriteToMySQLDB(finfos); return 0; } //----------------------- // WriteToScreen //----------------------- void WriteToScreen(map &finfos) { // Print results to screen map::iterator it; for(it=finfos.begin(); it!=finfos.end(); it++){ DataFileInfo &dfi = it->second; string stime = ctime(&dfi.start_time); string etime = ctime(&dfi.end_time); stime[stime.length()-1] = 0; // chop off "\n" etime[stime.length()-1] = 0; // chop off "\n" char str[256]; sprintf(str, "%7d [%9d - %9d]", dfi.Nevents, dfi.first_event, dfi.last_event); cout << dfi.run << ":" << dfi.part << " " << str << " " << stime << " -> " << etime << " " << dfi.filename << endl; } } //----------------------- // WriteToSQLiteDB //----------------------- void WriteToSQLiteDB(map &finfos) { // Open/Create SQLite DB sqlite3 *db; if( sqlite3_open(SQLITE_FILENAME.c_str(), &db) ){ cerr << "Can't open database: " << sqlite3_errmsg(db) << endl; return; } cout << "Writing values to: " << SQLITE_FILENAME << " ...." << endl; // Create tables stringstream ss; ss << "CREATE TABLE IF NOT EXISTS runFiles (\n" \ " filename TEXT,\n" \ " run INTEGER,\n"\ " part INTEGER,\n"\ " size INTEGER,\n"\ " Nevents INTEGER,\n"\ " first_event INTEGER,\n"\ " last_event INTEGER,\n"\ " start_time INTEGER,\n"\ " end_time INTEGER,\n"\ " event_size_kB INTEGER,\n"\ " PRIMARY KEY (run, part));"; char *zErrMsg = 0; if( sqlite3_exec(db, ss.str().c_str(), NULL, 0, &zErrMsg) != SQLITE_OK ){ cerr << "Unable to create table! command follows:" << endl; cerr << ss.str() << endl; cerr << zErrMsg << endl; sqlite3_free(zErrMsg); return ; } ss.str(""); ss.clear(); ss << "CREATE TABLE IF NOT EXISTS runs (\n" \ " run INTEGER,\n"\ " Nparts INTEGER,\n"\ " Nevents INTEGER,\n"\ " start_time INTEGER,\n"\ " end_time INTEGER,\n"\ " event_size_kB INTEGER,\n"\ " PRIMARY KEY (run));"; if( sqlite3_exec(db, ss.str().c_str(), NULL, 0, &zErrMsg) != SQLITE_OK ){ cerr << "Unable to create table! command follows:" << endl; cerr << ss.str() << endl; cerr << zErrMsg << endl; sqlite3_free(zErrMsg); return ; } // Create Triggers to update runs table as entries are made into runFiles ss.str(""); ss.clear(); ss << "CREATE TRIGGER IF NOT EXISTS update_runs AFTER INSERT on runFiles\n"; ss << " BEGIN\n"; ss << " INSERT OR REPLACE INTO runs (run, Nparts, Nevents, start_time, end_time, event_size_kB) "; ss << " VALUES( new.run,"; ss << " (SELECT count(*) FROM runFiles WHERE run=new.run),\n"; ss << " (SELECT SUM(Nevents) FROM runFiles WHERE run=new.run),\n"; ss << " (SELECT start_time FROM runFiles WHERE run=new.run ORDER BY start_time ASC LIMIT 1),\n"; ss << " (SELECT end_time FROM runFiles WHERE run=new.run ORDER BY end_time DESC LIMIT 1),\n"; ss << " (SELECT AVG(event_size_kB)FROM runFiles WHERE run=new.run)\n"; ss << " );\n"; ss << " END;\n" << endl; if( sqlite3_exec(db, ss.str().c_str(), NULL, 0, &zErrMsg) != SQLITE_OK ){ cerr << "Unable to create trigger! command follows:" << endl; cerr << ss.str() << endl; cerr << zErrMsg << endl; sqlite3_free(zErrMsg); return ; } // Loop over run files, inserting info for each map::iterator it = finfos.begin(); for(; it!=finfos.end(); it++){ DataFileInfo &dfi = it->second; stringstream ss; ss << "INSERT OR REPLACE INTO runFiles"; ss << " (filename, run, part, size, Nevents, first_event, last_event,start_time, end_time, event_size_kB)"; ss << " VALUES("; ss << "'" << dfi.filename << "',"; ss << dfi.run << ","; ss << dfi.part << ","; ss << dfi.size << ","; ss << dfi.Nevents << ","; ss << dfi.first_event << ","; ss << dfi.last_event << ","; ss << dfi.start_time << ","; ss << dfi.end_time << ","; ss << dfi.event_size_kB << ")"; if( sqlite3_exec(db, ss.str().c_str(), NULL, 0, &zErrMsg) != SQLITE_OK ){ cerr << "Unable to insert row! command follows:" << endl; cerr << ss.str() << endl; cerr << zErrMsg << endl; sqlite3_free(zErrMsg); return ; } } // Close DB sqlite3_close(db); } //----------------------- // WriteToMySQLDB //----------------------- void WriteToMySQLDB(map &finfos) { // Open connection to MySQL DB MYSQL *con = mysql_init(NULL); if(con == NULL){ cerr << mysql_error(con) << endl; return; } if(mysql_real_connect(con, MYSQL_HOST.c_str(), MYSQL_USER.c_str(), MYSQL_PASSWORD.c_str(), NULL, 0, NULL, 0) == NULL){ cerr << mysql_error(con) << endl; mysql_close(con); return; } cout << "Writing values to: mysql://" << MYSQL_USER << "@" << MYSQL_HOST << endl; // Create DB string sql = string("CREATE DATABASE IF NOT EXISTS ") + MYSQL_DB; if(mysql_query(con, sql.c_str())){ cerr << mysql_error(con) << endl; mysql_close(con); return; } sql = "USE " + MYSQL_DB; mysql_query(con, sql.c_str()); // Create tables stringstream ss; ss << "CREATE TABLE IF NOT EXISTS runFiles (\n" \ " filename TEXT,\n" \ " run INTEGER,\n"\ " part INTEGER,\n"\ " size BIGINT,\n"\ " Nevents INTEGER,\n"\ " first_event INTEGER,\n"\ " last_event INTEGER,\n"\ " start_time INTEGER,\n"\ " end_time INTEGER,\n"\ " event_size_kB INTEGER,\n"\ " PRIMARY KEY (run, part));"; if( mysql_query(con, ss.str().c_str()) ){ cerr << "Unable to create table! command follows:" << endl; cerr << ss.str() << endl; cerr << mysql_error(con) << endl; return ; } ss.str(""); ss.clear(); ss << "CREATE TABLE IF NOT EXISTS runs (\n" \ " run INTEGER,\n"\ " Nparts INTEGER,\n"\ " Nevents INTEGER,\n"\ " start_time INTEGER,\n"\ " end_time INTEGER,\n"\ " event_size_kB INTEGER,\n"\ " PRIMARY KEY (run));"; if( mysql_query(con, ss.str().c_str()) ){ cerr << "Unable to create table! command follows:" << endl; cerr << ss.str() << endl; cerr << mysql_error(con) << endl; return ; } // Drop the trigger if it already exists so we can re-create it without error ss.str(""); ss.clear(); ss << "DROP TRIGGER IF EXISTS update_runs;\n"; if( mysql_query(con, ss.str().c_str()) ){ cerr << "Unable to drop trigger. (Not necessarily a problem). command follows:" << endl; cerr << ss.str() << endl; cerr << mysql_error(con) << endl; cerr << "Continuing in spite of error ...." << endl; } // Create Triggers to update runs table as entries are made into runFiles bool trigger_created = false; ss.str(""); ss.clear(); ss << "CREATE TRIGGER update_runs AFTER INSERT on runFiles FOR EACH ROW\n"; ss << " BEGIN\n"; ss << " INSERT INTO runs (run, Nparts, Nevents, start_time, end_time, event_size_kB) "; ss << " VALUES( new.run,"; ss << " (SELECT count(*) FROM runFiles WHERE run=new.run),\n"; ss << " (SELECT SUM(Nevents) FROM runFiles WHERE run=new.run),\n"; ss << " (SELECT start_time FROM runFiles WHERE run=new.run ORDER BY start_time ASC LIMIT 1),\n"; ss << " (SELECT end_time FROM runFiles WHERE run=new.run ORDER BY end_time DESC LIMIT 1),\n"; ss << " (SELECT AVG(event_size_kB) FROM runFiles WHERE run=new.run)\n"; ss << " ) ON DUPLICATE KEY UPDATE \n"; ss << " Nparts=(SELECT count(*) FROM runFiles WHERE run=new.run),\n"; ss << " Nevents=(SELECT SUM(Nevents) FROM runFiles WHERE run=new.run),\n"; ss << " start_time=(SELECT start_time FROM runFiles WHERE run=new.run ORDER BY start_time ASC LIMIT 1),\n"; ss << " end_time=(SELECT end_time FROM runFiles WHERE run=new.run ORDER BY end_time DESC LIMIT 1),\n"; ss << " event_size_kB=(SELECT AVG(event_size_kB) FROM runFiles WHERE run=new.run);\n"; ss << " END;\n" << endl; if( mysql_query(con, ss.str().c_str()) ){ cerr << "Unable to create trigger! command follows:" << endl; cerr << ss.str() << endl; cerr << mysql_error(con) << endl; cerr << "Trigger was not created so the runs table will not be filled." << endl; cerr << "I will continue with filling the runFiles table though..." << endl; cerr << endl; }else{ trigger_created = true; } // Loop over run files, inserting info for each uint32_t Nsucceeded = 0; set runs; map::iterator it = finfos.begin(); for(; it!=finfos.end(); it++){ DataFileInfo &dfi = it->second; runs.insert(dfi.run); stringstream ss; ss << "REPLACE INTO runFiles"; ss << " (filename, run, part, size, Nevents, first_event, last_event,start_time, end_time, event_size_kB)"; ss << " VALUES("; ss << "'" << dfi.filename << "',"; ss << dfi.run << ","; ss << dfi.part << ","; ss << dfi.size << ","; ss << dfi.Nevents << ","; ss << dfi.first_event << ","; ss << dfi.last_event << ","; ss << dfi.start_time << ","; ss << dfi.end_time << ","; ss << dfi.event_size_kB << ")"; if( mysql_query(con, ss.str().c_str()) ){ cerr << "Unable to create table! command follows:" << endl; cerr << ss.str() << endl; cerr << mysql_error(con) << endl; return ; } Nsucceeded++; } if(!trigger_created) UpdateRunsTableinMySQLDB(con, runs); // Close DB mysql_close(con); cout << "Inserted/Updated " << Nsucceeded << " entries in runFiles." << endl; } //----------------------- // UpdateRunsTableinMySQLDB //----------------------- void UpdateRunsTableinMySQLDB(MYSQL *con, set &runs) { /// This is called if we are not able to use the (much more elegant) /// TRIGGER mechanism. This will take the set of run numbers provided /// and either insert a new entry in the runs table or update the existing /// one. uint32_t Nupdated = 0; uint32_t Ninserted = 0; for(set::iterator it=runs.begin(); it!=runs.end(); it++){ uint32_t run = *it; stringstream ss; ss << "SELECT * FROM runs WHERE run=" << run; if( mysql_query(con, ss.str().c_str()) ){ cerr << "Problem with query:" << endl; cerr << ss.str() << endl; cerr << mysql_error(con) << endl; return ; } MYSQL_RES *res = mysql_store_result(con); ss.str(""); ss.clear(); if(mysql_num_rows(res)==0){ // Insert row ss << "INSERT INTO runs (run, Nparts, Nevents, start_time, end_time, event_size_kB) "; ss << " VALUES( " << run << ",\n"; ss << " (SELECT count(*) FROM runFiles WHERE run="<