// $Id$ // // File: jc_cmsg.cc // Created: Sun Dec 27 23:31:21 EST 2009 // Creator: davidl (on Darwin harriet.jlab.org 9.8.0 i386) // #include #include #include #include using namespace std; #include "jc_cmsg.h" #if HAVE_CMSG jc_cmsg *JC_CMSG = NULL; void* PingServersThread(void *arg); //--------------------------------- // jc_cmsg (Constructor) //--------------------------------- jc_cmsg::jc_cmsg(string myUDL, string myName, string myDescr) { // Initialize mutex pthread_mutex_init(&mutex, NULL); // Connect to cMsg system cMsgSys = new cMsg(myUDL,myName,myDescr); // the cMsg system object, where try { // all args are of type string cout << "CMSG UDL: " << myUDL << endl; cout << "Connecting to cMsg server using name \"" << myName << "\"" <connect(); is_connected = true; } catch (cMsgException e) { cout<subscribe(myname, "*", this, NULL)); // Start cMsg system cMsgSys->start(); // Start high resolution timer (if not already started) struct itimerval start_tmr; getitimer(ITIMER_REAL, &start_tmr); if(start_tmr.it_value.tv_sec==0 && start_tmr.it_value.tv_usec==0){ struct itimerval value, ovalue; value.it_interval.tv_sec = 1000000; value.it_interval.tv_usec = 0; value.it_value.tv_sec = 1000000; value.it_value.tv_usec = 0; setitimer(ITIMER_REAL, &value, &ovalue); } // Get starting time of high res timer start_time = GetTime(); verbose = 0; // initialize to minimal messages timeout = 0.25; // seconds done = false; JC_CMSG = this; // Launch thread to continually ping servers so we keep an updated list pthread_create(&ping_servers_thread, NULL, PingServersThread, NULL); } //--------------------------------- // ~jc_cmsg (Destructor) //--------------------------------- jc_cmsg::~jc_cmsg() { // Stop PingServersThread done = true; pthread_join(ping_servers_thread, NULL); // Unsubscribe for(unsigned int i=0; iunsubscribe(subscription_handles[i]); } // Stop cMsg system cMsgSys->stop(); JC_CMSG = NULL; } //--------------------------------- // SendCommand //--------------------------------- void jc_cmsg::SendCommand(string cmd, string subject) { cMsgMessage msg; msg.setSubject(subject); msg.setType(myname); msg.setText(cmd); cMsgSys->send(&msg); if(verbose>0) cout<<"Sent command: "<done) break; // Ping Servers to see who's still attached //JC_CMSG->PingServers(); // Request rate info from all processes JC_CMSG->SendCommand("get threads", "janactl"); JC_CMSG->SendCommand("host status", "janactl"); JC_CMSG->SendCommand("list configuration parameters", "janactl"); JC_CMSG->last_threadinfo_time = JC_CMSG->GetTime(); // Sleep for a couple of seconds sleep(2); } return NULL; } //--------------------------------- // callback //--------------------------------- void jc_cmsg::callback(cMsgMessage *msg, void *userObject) { if(!msg)return; if(verbose>0) cout<<"Received message -- Subject:"<getSubject()<<" Type:"<getType()<<" Text:"<getText()<getType(); if(sender == myname){delete msg; return;} // no need to process messages we sent! // Always record times of messages received double now = GetTime(); pthread_mutex_lock(&mutex); last_msg_received_time[sender] = now; hosts[sender] = msg->getSenderHost(); bool request_command_line = (levels.find(sender)==levels.end()); pthread_mutex_unlock(&mutex); // We extract the monitoring level from the command line so // if we've never received it, then ask for it now. if(request_command_line) SendCommand("command line", sender); // The actual command is always sent in the text of the message string cmd = msg->getText(); // Dispatch command //=========================================================== if(cmd=="who's there?"){ delete msg; return; // We don't actually respond to these, only servers } //=========================================================== if(cmd=="I am here"){ // No need to do anything here since sender and time are recorded // for all messages automatically. delete msg; return; } //=========================================================== if(cmd=="thread info"){ // Extract data from message vector *threads = msg->getUint64Vector("threads"); vector *instantaneous_rates = msg->getDoubleVector("instantaneous_rates"); vector *average_rates = msg->getDoubleVector("average_rates"); vector *nevents = msg->getUint64Vector("nevents"); if(threads->size() != instantaneous_rates->size() || threads->size() != average_rates->size() || nevents->size() != nevents->size()){ delete msg; return; } vector my_thrinfos; for(unsigned int i=0; isize(); i++){ thrinfo_t t; t.thread = (*threads)[i]; t.Nevents = (*nevents)[i]; t.rate_instantaneous = (*instantaneous_rates)[i]; t.rate_average = (*average_rates)[i]; my_thrinfos.push_back(t); } pthread_mutex_lock(&mutex); thrinfos[sender] = my_thrinfos; pthread_mutex_unlock(&mutex); delete msg; return; } //=========================================================== if(cmd=="configuration parameter list"){ // Extract data from message // msg->payloadPrint(); vector *names = msg->getStringVector("names"); vector *vals = msg->getStringVector("vals"); if(names->size() != vals->size()){ cerr << "ERROR: names and vals vector sizes don't match!!" << endl; exit(-2); } pthread_mutex_lock(&mutex); config_params_responder = sender; for(unsigned int i=0; isize(); i++){ config_params[(*names)[i]] = (*vals)[i]; if( (*names)[i] == "HDLOG_NUM_RESPAWNS" ) hostNrespawns[sender] = atoi((*vals)[i].c_str()); } pthread_mutex_unlock(&mutex); delete names; delete vals; delete msg; return; } //=========================================================== if(cmd=="sources list"){ // Extract data from message vector *classNames = NULL; vector *sourceNames = NULL; if(msg->payloadContainsName("classNames")) classNames = msg->getStringVector("classNames"); if(msg->payloadContainsName("sourceNames")) sourceNames = msg->getStringVector("sourceNames"); // If no active sources then the pointers will be NULL. This is not // necessarily an error. vector > mysources; if(classNames != NULL && sourceNames!=NULL){ if(classNames->size() != sourceNames->size()){ cerr << "ERROR: classNames and sourceNames vector sizes don't match!!" << endl; exit(-2); } for(unsigned int i=0; isize(); i++){ mysources.push_back(make_pair((*classNames)[i], (*sourceNames)[i])); } } pthread_mutex_lock(&mutex); sources[sender] = mysources; pthread_mutex_unlock(&mutex); delete classNames; delete sourceNames; delete msg; return; } //=========================================================== if(cmd=="host info"){ // Extract data from message vector *keys = NULL; vector *vals = NULL; if(msg->payloadContainsName("keys")) keys = msg->getStringVector("keys"); if(msg->payloadContainsName("vals")) vals = msg->getStringVector("vals"); // If no active sources then the pointers will be NULL. This is not // necessarily an error. vector > myhostInfos; if(keys != NULL && vals!=NULL){ if(keys->size() != vals->size()){ cerr << "ERROR: keys and vals vector sizes don't match!!" << endl; exit(-2); } for(unsigned int i=0; isize(); i++){ myhostInfos.push_back(make_pair((*keys)[i], (*vals)[i])); } } pthread_mutex_lock(&mutex); hostInfos[sender] = myhostInfos; pthread_mutex_unlock(&mutex); delete keys; delete vals; delete msg; return; } //=========================================================== if(cmd=="host status"){ // Extract data from message vector *keys = NULL; vector *vals = NULL; if(msg->payloadContainsName("keys")) keys = msg->getStringVector("keys"); if(msg->payloadContainsName("vals")) vals = msg->getStringVector("vals"); // If no active sources then the pointers will be NULL. This is not // necessarily an error. vector > myhostStatus; if(keys != NULL && vals!=NULL){ if(keys->size() != vals->size()){ cerr << "ERROR: keys and vals vector sizes don't match!!" << endl; exit(-2); } for(unsigned int i=0; isize(); i++){ myhostStatus.push_back(make_pair((*keys)[i], (*vals)[i])); } } pthread_mutex_lock(&mutex); hostStatus[sender] = myhostStatus; pthread_mutex_unlock(&mutex); delete keys; delete vals; delete msg; return; } //=========================================================== if(cmd=="command line"){ // Extract data from message if(msg->payloadContainsName("command")){ string command = msg->getString("command"); // Exract level string level = "--"; size_t pos_start = command.find("/hdmon"); if(pos_start!=string::npos){ pos_start += string("/hdmon").length(); size_t pos_end = command.find(".", pos_start); if(pos_end!=string::npos){ level = command.substr(pos_start, pos_end-pos_start); } } pthread_mutex_lock(&mutex); commandLines.push_back(pair(sender, command)); levels[sender] = level; pthread_mutex_unlock(&mutex); } delete msg; return; } //=========================================================== delete msg; } //--------------------------------- // ListRemoteProcesses //--------------------------------- void jc_cmsg::ListRemoteProcesses(void) { // First, ping all remote servers PingServers(); cout<<"Waiting "<::iterator iter=last_msg_received_time.begin(); for(; iter!=last_msg_received_time.end(); iter++){ cout<<" "<first<<" ("<<(last_ping_time-iter->second)<<" sec.)"<::iterator iter = last_msg_received_time.begin(); for(; iter!= last_msg_received_time.end(); iter++){ // Do not report info for nodes we haven't heard from for "max_time" seconds // (n.b. the value of GetTime() is counting down) if(now < (iter->second-max_time)) continue; if(Nnodes >= max_nodes){ cout << "Space for node info allocated by PYTHON is too small!! (" << max_nodes << " NodeInfo_t's allocated)" << endl; cout << "List returned will be truncated" << endl; break; } // Copy unique process name NodeInfo_t *node = &nodes[Nnodes++]; const string &name = iter->first; strcpy(node->name, name.c_str()); // Copy node name string host(""); map::iterator iter = hosts.find(name); if(iter != hosts.end()) host = iter->second; strcpy(node->node, host.c_str()); // Level if(levels.find(name) != levels.end()){ strcpy(node->level, levels[name].c_str()); }else{ strcpy(node->level, "--"); } // Get pointer to detailed thread info recorded from last read node->Nthreads = 0; node->Nevents = 0; node->rate = 0.0; node->cpu = 0.0; map >::iterator titer = thrinfos.find(name); if(titer != thrinfos.end()){ vector &tinfos = titer->second; node->Nthreads = tinfos.size(); for(unsigned int i=0; iNevents += tinfos[i].Nevents; node->rate += tinfos[i].rate_instantaneous; } // janactl will report the last instantaneous rate calculated from when // the last event wss processed. If events are not being processed because // the run is stalled, it will continue to report a non-zero rate. It does // this because it makes sense if the rate is very low (below 1 Hz). For // us though, we want to display 0 Hz. We detect this situation by keeping // track of the last Nevents reported by each node. If it has not increased, // then the reported rate is discarded and a rate of 0Hz for the node is // reported. if(last_nevents[name] == node->Nevents) node->rate = 0.0; last_nevents[name] = node->Nevents; }else{ } // Get CPU usage from hostStatus map > >::iterator hsiter = hostStatus.find(name); if(hsiter != hostStatus.end()){ // Loop over elements of reported host status looking for cpu_usage vector > &hs = hsiter->second; vector >::iterator it; for(it=hs.begin(); it!=hs.end(); it++){ string &key = it->first; string &val = it->second; if(key == "cpu_usage"){ node->cpu = atof(val.c_str()); break; } } } // Get number of respawns from hostNrespawns which is filled from config. parameters node->Nrespawns = hostNrespawns[name]; } pthread_mutex_unlock(&mutex); } //--------------------------------- // GetThreadInfo //--------------------------------- void jc_cmsg::GetThreadInfo(string subject) { SendCommand("get threads", subject); last_threadinfo_time = GetTime(); // If subject is janactl then assume we're sending this to multiple recipients // so we must wait the full "timeout". Otherwise, we want to continue as soon // as we get the first response. To make this happen, we sleep for 100 us increments // at a time. double time_slept = 0.0; double time_to_sleep_per_iteration = 0.1; do{ usleep((int)(time_to_sleep_per_iteration*1.0E6)); time_slept += time_to_sleep_per_iteration; if(subject!="janactl" && thrinfos.size()>0)break; }while(time_slept >::iterator iter=thrinfos.begin(); for(; iter!=thrinfos.end(); iter++){ cout<first<<":"< &thrinfo = iter->second; for(unsigned int i=0; i0)break; }while(time_slept::iterator iter=config_params.begin(); size_t max_len = 0; for(; iter!=config_params.end(); iter++){ size_t len = iter->first.length(); if(len > max_len) max_len = len; } max_len++; // Print results cout<first.length(); if(max_len > len) cout<second<0)break; }while(time_slept > >::iterator iter; size_t max_len = 0; for(iter = sources.begin(); iter!=sources.end(); iter++){ size_t len = iter->first.length(); if(len > max_len) max_len = len; } max_len++; // Print results cout<first; const vector > &mysources = iter->second; for(unsigned int i=0; i "<< className << " : " << sourceName << endl; } } cout<0)break; }while(time_slept"); vector > &my_sources = sources.begin()->second; if(my_sources.size()>0){ pair &my_source = my_sources[my_sources.size()-1]; string &className = my_source.first; string &sourceName = my_source.second; source = sourceName + " (type=" + className + ")"; } // unlock mutex pthread_mutex_unlock(&mutex); if(source.length() < len) strcpy(src, source.c_str()); } //--------------------------------- // ListSourceTypes //--------------------------------- void jc_cmsg::ListSourceTypes(string subject) { } //--------------------------------- // ListFactories //--------------------------------- void jc_cmsg::ListFactories(string subject) { } //--------------------------------- // ListPlugins //--------------------------------- void jc_cmsg::ListPlugins(string subject) { } //--------------------------------- // ListCommandLines //--------------------------------- void jc_cmsg::ListCommandLines(string subject) { /// Get command line used to launch remote process(es) SendCommand("command line", subject); // Wait for timeout seconds for all clients to respond. Sleep for // 100 ms increments at a time while waiting. double time_slept = 0.0; double time_to_sleep_per_iteration = 0.1; do{ usleep((int)(time_to_sleep_per_iteration*1.0E6)); time_slept += time_to_sleep_per_iteration; if(subject!="janactl" && commandLines.size()>0)break; }while(time_slept >::iterator iter; for(iter = commandLines.begin(); iter!=commandLines.end(); iter++){ const string &responder = iter->first; const string &commandLine = iter->second; cout << " " << responder << " : " << commandLine << endl; } cout< >::iterator iter; for(iter = commandLines.begin(); iter!=commandLines.end(); iter++){ const string &responder = iter->first; const string &commandLine = iter->second; if(responder == sname){ command = commandLine; break; } } // unlock mutex pthread_mutex_unlock(&mutex); if(command.length() < len) strcpy(cmd, command.c_str()); } //--------------------------------- // GetHostInfo //--------------------------------- void jc_cmsg::GetHostInfo(string subject) { /// The remote process will repond with two vectors representing /// key/value pairs which are then just printed. This allows the /// remote process to decide what info it sends. SendCommand("host info", subject); // Wait for timeout seconds for all clients to respond. Sleep for // 100 ms increments at a time while waiting. double time_slept = 0.0; double time_to_sleep_per_iteration = 0.1; do{ usleep((int)(time_to_sleep_per_iteration*1.0E6)); time_slept += time_to_sleep_per_iteration; if(subject!="janactl" && hostInfos.size()>0)break; }while(time_slept > >::iterator iter; size_t max_len = 0; size_t max_key_len = 0; for(iter = hostInfos.begin(); iter!=hostInfos.end(); iter++){ size_t len = iter->first.length(); if(len > max_len) max_len = len; const vector > &myinfos = iter->second; for(unsigned int i=0; i max_key_len) max_key_len = len; } } max_len++; max_key_len++; // Print results cout<first; const vector > &myinfos = iter->second; cout << " " << responder << ":" << endl; cout<<" -----------------------------"<