#include #include #include #include #include #include #include #include #include #include using namespace std; #include #include #include #include #include #include "rsmon_cmsg.h" #include #include #include #include #include #include #include bool DONE =false; uint32_t DELAY=400000; // in microseconds string ROOTSPY_UDL = "cMsg://127.0.0.1/cMsg/rootspy"; string CMSG_NAME; int VERBOSE=0; bool REDRAW_SCREEN = true; // for debugging string FOCUS_NODE = ""; bool INCLUDE_ROOTSPY_STATS = true; bool CONNECT_AS_SERVER=false; bool PING_SERVERS = false; bool RESPOND_TO_PINGS = true; int REQUESTS_SENT=0; int REQUESTED_HISTOGRAMS=0; int REQUESTS_SATISFIED=0; int RECEIVED_HISTOGRAMS=0; double START_TIME = 0; map LAST_REQUEST_TIME; enum RSMON_MODE{ MODE_OBSERVE, MODE_SPEED_TEST, MODE_MESS_SIZES }; rsmon_cmsg *RSMON_CMSG = NULL; rs_cmsg *RS_CMSG = NULL; rs_xmsg *RS_XMSG = NULL; rs_info *RS_INFO = NULL; pthread_rwlock_t *ROOT_MUTEX = NULL; RSMON_MODE MODE = MODE_OBSERVE; void Usage(void); void ParseCommandLineArguments(int narg, char *argv[]); void sigHandler(int sig) { DONE = true; } void UpdateHistoRequests(double now); #define ESC (char)0x1B //#define ESC endl<<"ESC" #define CLEAR() {cout< &lines, uint32_t Nhdefs, uint32_t Nhinfos, uint32_t Ntdefs) { if(!REDRAW_SCREEN){ // for debugging cout << "---------- Skipping screen redraw of " << lines.size() << " content lines ----------" << endl; return; } // Get size of terminal struct winsize w; ioctl(STDOUT_FILENO, TIOCGWINSZ, &w); int Nrows = w.ws_row; int Ncols = w.ws_col; HOME(); CLEAR(); RESET(); PRINTAT(1, 1, string(Ncols,'-')); PRINTCENTERED(2, "ROOTSpy Monitor"); PRINTAT(1, 3, string(Ncols,'-')); string mode_str = "UNKNOWN"; switch(MODE){ case MODE_OBSERVE : mode_str="OBSERVE ONLY" ; break; case MODE_SPEED_TEST: mode_str="SPEED TEST" ; break; case MODE_MESS_SIZES: mode_str="MESSAGE SIZES"; break; } MOVETO( 3, 4); cout << "MODE: " << mode_str; MOVETO( 3, 5); cout << " Number of histograms published: " << Nhdefs; MOVETO( 3, 6); cout << "Number of histograms downloaded: " << Nhinfos; MOVETO( 3, 7); cout << " Number of trees published: " << Ntdefs; MOVETO(41, 4); cout << "Number of hists requested: " << REQUESTED_HISTOGRAMS; MOVETO(41, 5); cout << "Number of hists received: " << RECEIVED_HISTOGRAMS; MOVETO(41, 6); cout << "Number of requests issued: " << REQUESTS_SENT; MOVETO(41, 7); cout << "Number of requests received: " << REQUESTS_SATISFIED; PRINTAT(1, 8, string(Ncols,'.')); for(unsigned int i=0; i=narg) || next[0]=='-'){ cout << "Argument \""<Lock(); map &hdefs = RS_INFO->histdefs; map &hinfos = RS_INFO->hinfos; vector hists_to_request; map::iterator hdef_iter = hdefs.begin(); for(; hdef_iter!= hdefs.end() ; hdef_iter++){ hdef_t &hdef = hdef_iter->second; string &hnamepath = hdef.hnamepath; // Loop over servers who can provide this histogram map::iterator it = hdef.servers.begin(); for(; it!=hdef.servers.end(); it++){ string server = it->first; hid_t hid(server, hnamepath); map::iterator it_lrt = LAST_REQUEST_TIME.find(hid); if(it_lrt != LAST_REQUEST_TIME.end()){ // Time we last sent a request double lrt = it_lrt->second; // Look to see if we've received this histogram from this server map::iterator it_hinfo = hinfos.find(hid); if(it_hinfo != hinfos.end()){ double received = it_hinfo->second.received - START_TIME; if( lrt10.0 ) continue; } }else{ // If we've never received this histogram and // it has been less than 2 seconds, don't send // another request if((now-lrt)<2.0) continue; } } hists_to_request.push_back(hid); } } RS_INFO->Unlock(); // Sort list by server so we can send single request to each // server and they can return all histograms in single message map > hnamepaths_by_server; for(unsigned int i=0; i >::iterator hbs_iter = hnamepaths_by_server.begin(); for(; hbs_iter!=hnamepaths_by_server.end(); hbs_iter++){ string server = hbs_iter->first; vector &hnamepaths = hbs_iter->second; if( RS_CMSG) RS_CMSG->RequestHistograms(server, hnamepaths); if( RS_XMSG) RS_XMSG->RequestHistograms(server, hnamepaths); REQUESTS_SENT++; for(uint32_t i=0; iRequestHistogram(server, hnamepath); LAST_REQUEST_TIME[hid_t(server, hnamepath)] = now; REQUESTS_SENT++; } #endif } //------------------------------ // MakeRSMON //------------------------------ void MakeRSMON(void) { // Check if an RSMON object is needed based on the current mode. If // so and it does not already exist create one of the appropriate type. if(RSMON_CMSG==NULL && (RS_CMSG==NULL || RS_CMSG->IsOnline())){ RSMON_CMSG = new rsmon_cmsg(CMSG_NAME, (RS_CMSG==NULL ? NULL:RS_CMSG->GetcMsgPtr())); RSMON_CMSG->focus_node = FOCUS_NODE; RSMON_CMSG->respond_to_pings = RESPOND_TO_PINGS; } } //------------------------------ // main //------------------------------ int main(int narg, char *argv[]) { ParseCommandLineArguments(narg, argv); // Create rs_info object RS_INFO = new rs_info(); ROOT_MUTEX = new pthread_rwlock_t; pthread_rwlock_init(ROOT_MUTEX, NULL); // Create cMsg object char hostname[256]; gethostname(hostname, 256); char str[512]; sprintf(str, "rs_%s_%d", hostname, getpid()); CMSG_NAME = string(str); cout << "Full UDL is " << ROOTSPY_UDL << endl; if(CONNECT_AS_SERVER){ if( ROOTSPY_UDL.find("xMsg://") == 0){ RS_XMSG = new rs_xmsg(ROOTSPY_UDL, CMSG_NAME); RS_XMSG->verbose = VERBOSE; RS_XMSG->program_name = "RSMonitor"; } else { RS_CMSG = new rs_cmsg(ROOTSPY_UDL, CMSG_NAME); RS_CMSG->verbose = VERBOSE; RS_CMSG->program_name = "RSMonitor"; } } signal(SIGINT, sigHandler); // Loop forever Getting all Histograms START_TIME = rs_cmsg::GetTime(); double next_update = 1.0; // time relative to start_time while(!DONE){ // Get Current time double now = rs_cmsg::GetTime() - START_TIME; // measure time relative to program start switch(MODE){ case MODE_OBSERVE: case MODE_MESS_SIZES: if(RSMON_CMSG==NULL && (RS_CMSG==NULL || RS_CMSG->IsOnline())){ RSMON_CMSG = new rsmon_cmsg(CMSG_NAME, (RS_CMSG==NULL ? NULL:RS_CMSG->GetcMsgPtr())); RSMON_CMSG->focus_node = FOCUS_NODE; RSMON_CMSG->respond_to_pings = RESPOND_TO_PINGS; } break; case MODE_SPEED_TEST: UpdateHistoRequests(now); break; } // Update screen occasionally if( now >= next_update ){ // Container to hold lines to print in main content area of screen vector lines; // Fill in lines based on what mode we're running in switch(MODE){ case MODE_OBSERVE: if(RSMON_CMSG) RSMON_CMSG->FillLines(now, lines); break; case MODE_MESS_SIZES: if(RSMON_CMSG) RSMON_CMSG->FillLinesMessageSizes(now, lines); break; case MODE_SPEED_TEST: RS_INFO->Lock(); map &hinfos = RS_INFO->hinfos; for(map::iterator iter=hinfos.begin(); iter!=hinfos.end(); iter++){ hinfo_t &hinfo = iter->second; if(now + START_TIME - hinfo.received > 4.0) continue; // don't print lines for hists that are no longer being updated hdef_t &hdef = RS_INFO->histdefs[hinfo.hnamepath]; string line = PrintToString(hinfo, hdef); lines.push_back(line); } RS_INFO->Unlock(); break; } // Get some statistics to display RS_INFO->Lock(); uint32_t Nhdefs = RS_INFO->histdefs.size(); uint32_t Nhinfos = RS_INFO->hinfos.size(); uint32_t Ntdefs = RS_INFO->treedefs.size(); RS_INFO->Unlock(); // Redraw the screen RedrawScreen(lines, Nhdefs, Nhinfos, Ntdefs); // Do any follow-up based on mode switch(MODE){ case MODE_OBSERVE: case MODE_MESS_SIZES: if(RS_CMSG) { if(PING_SERVERS) RS_CMSG->PingServers(); } else { if(PING_SERVERS) RSMON_CMSG->PingServers(); } break; case MODE_SPEED_TEST: // Request new histogram list from all servers if(RS_CMSG) RS_CMSG->RequestHists("rootspy"); // Request new tree list from all servers if(RS_CMSG) RS_CMSG->RequestTreeInfo("rootspy"); break; } next_update = now + 1.0; // update again in 1s } // sleep a bit in order to limit how fast the histos are filled if(DELAY != 0)usleep(DELAY); } cout << endl; cout << "Ending" << endl; if(RSMON_CMSG) delete RSMON_CMSG; if(RS_CMSG) delete RS_CMSG; return 0; }