// $Id$ // // File: rs_cmsg.cc // Created: Thu Aug 27 13:40:02 EDT 2009 // Creator: davidl (on Darwin harriet.jlab.org 9.8.0 i386) // #include #include #include #include #include #include #include #include // RR added to include cmhbook conversion functions #include "CMHbookToROOTLib.h" #include "RootSpy.h" #include "rs_cmsg.h" #include "rs_info.h" #include "tree_info_t.h" using namespace std; // See http://www.jlab.org/Hall-D/software/wiki/index.php/Serializing_and_deserializing_root_objects class MyTMessage: public TMessage { public: MyTMessage(void *buf, Int_t len) : TMessage(buf, len) { } }; //--------------------------------- // rs_cmsg (Constructor) //--------------------------------- rs_cmsg::rs_cmsg(string &udl, string &name) { // Connect to cMsg system string myDescr = "Access ROOT objects in a running program"; cMsgSys = new cMsg(udl, name, myDescr); // the cMsg system object, where try { // all args are of type string cMsgSys->connect(); } catch (cMsgException e) { cout << endl << endl << endl << endl << "_______________ ROOTSPY unable to connect to cMsg system! __________________" << endl; cout << e.toString() << endl; cout << endl << endl << endl << endl; return; } // Create a unique name for ourself char hostname[256]; gethostname(hostname, 256); char str[512]; sprintf(str, "%s_%d", hostname, getpid()); myname = string(str); cout << "---------------------------------------------------" << endl; cout << " cMsg name: \"" << name << "\"" << endl; cout << "rootspy name: \"" << myname << "\"" << endl; cout << "---------------------------------------------------" << endl; // Subscribe to generic rootspy info requests subscription_handles.push_back( cMsgSys->subscribe("rootspy", "*", this, NULL)); // Subscribe to rootspy requests specific to us subscription_handles.push_back(cMsgSys->subscribe(myname, "*", this, NULL)); // Start cMsg system cMsgSys->start(); // Broadcast request for available servers PingServers(); } //--------------------------------- // ~rs_cmsg (Destructor) //--------------------------------- rs_cmsg::~rs_cmsg() { // Unsubscribe for (unsigned int i = 0; i < subscription_handles.size(); i++) { cMsgSys->unsubscribe(subscription_handles[i]); } // Stop cMsg system cMsgSys->stop(); } //--------------------------------- // PingServers //--------------------------------- void rs_cmsg::PingServers(void) { cMsgMessage whosThere; whosThere.setSubject("rootspy"); whosThere.setType(myname); whosThere.setText("who's there?"); cMsgSys->send(&whosThere); } //--------------------------------- // RequestHists //--------------------------------- void rs_cmsg::RequestHists(string servername) { cMsgMessage listHists; listHists.setSubject(servername); listHists.setType(myname); listHists.setText("list hists"); cMsgSys->send(&listHists); } //--------------------------------- // RequestHistogram //--------------------------------- void rs_cmsg::RequestHistogram(string servername, string hnamepath) { cMsgMessage requestHist; requestHist.setSubject(servername); requestHist.setType(myname); requestHist.setText("get hist"); requestHist.add("hnamepath", hnamepath); cMsgSys->send(&requestHist); } //--------------------------------- // RequestTreeInfo //--------------------------------- void rs_cmsg::RequestTreeInfo(string servername) { cMsgMessage treeinfo; treeinfo.setSubject(servername); treeinfo.setType(myname); treeinfo.setText("tree info"); cMsgSys->send(&treeinfo); } //--------------------------------- // FinalHistogram //--------------------------------- void rs_cmsg::FinalHistogram(string servername, vector hnamepaths) { cMsgMessage finalhist; finalhist.setSubject(servername); finalhist.setType(myname); finalhist.setText("provide final"); finalhist.add("hnamepaths", hnamepaths); cMsgSys->send(&finalhist); cerr << "final hist request sent" << endl; } //--------------------------------- // callback //--------------------------------- void rs_cmsg::callback(cMsgMessage *msg, void *userObject) { if (!msg) return; // The convention here is that the message "type" always constains the // unique name of the sender and therefore should be the "subject" to // which any reponse should be sent. string sender = msg->getType(); if (sender == myname) { delete msg; return; } // no need to process messages we sent! // Look for an entry for this server in RS_INFO map. // If it is not there then add it. RS_INFO->Lock(); if (RS_INFO->servers.find(sender) == RS_INFO->servers.end()) { RS_INFO->servers[sender] = server_info_t(sender); cout << "Added \"" << sender << "\" to server list." << endl; } else { // Update lastHeardFrom time for this server RS_INFO->servers[sender].lastHeardFrom = time(NULL); } RS_INFO->Unlock(); // The actual command is always sent in the text of the message if (msg->getText() == "null") { delete msg; return; } string cmd = msg->getText(); // Dispatch command bool handled_message = false; //=========================================================== if (cmd == "who's there?") { // We don't actually respond to these, only servers handled_message = true; } //=========================================================== if (cmd == "I am here") { // We don't really need to do anything here since any message // from the server automatically updates the list and lastHeardForm // time above. handled_message = true; } //=========================================================== if (cmd == "hists list") { RegisterHistList(sender, msg); handled_message = true; } //=========================================================== if (cmd == "histogram") { RegisterHistogram(sender, msg); handled_message = true; } if (cmd == "tree info") { RegisterTreeInfo(sender, msg); handled_message = true; } //=========================================================== if (!handled_message) { _DBG_ << "Received unknown message -- Subject:" << msg->getSubject() << " Type:" << msg->getType() << " Text:" << msg->getText() << endl; } delete msg; } //--------------------------------- // RegisterHistList //--------------------------------- void rs_cmsg::RegisterHistList(string server, cMsgMessage *msg) { /// Copy list of histograms from cMsg into RS_INFO structures. bool good_response = true; // Get pointers to STL containers that hold the histogram information vector *hist_names; vector *hist_types; vector *hist_paths; vector *hist_titles; try { // all args are of type string hist_names = msg->getStringVector("hist_names"); hist_types = msg->getStringVector("hist_types"); hist_paths = msg->getStringVector("hist_paths"); hist_titles = msg->getStringVector("hist_titles"); } catch (cMsgException e) { _DBG_ << "Poorly formed response for \"hists list\". Ignoring. 1" << endl; return; } // Make sure we have valid pointers if (!hist_names) good_response = false; if (!hist_types) good_response = false; if (!hist_paths) good_response = false; if (!hist_titles) good_response = false; // Make sure containers all have the same number of entries if (good_response) { if (hist_names->size() != hist_types->size()) good_response = false; if (hist_names->size() != hist_paths->size()) good_response = false; if (hist_names->size() != hist_titles->size()) good_response = false; } // If the response is incomplete for any reason, then alert user and return. if (!good_response) { _DBG_ << "Poorly formed response for \"hists list\". Ignoring. 2" << endl; return; } // Looks like we got a good response. Loop over histograms and add them to // list of hdef_t objects kept in RS_INFO. If there is already an entry // for a histogram, verify that the definition matches this new one. for (unsigned int i = 0; i < hist_names->size(); i++) { // Get name string name = (*hist_names)[i]; // Get path without the preceeding root: string path = (*hist_paths)[i]; // RR added to read cmhbook hist list hdef_t hdef(name, path); if ((*hist_names)[0] != "cmhbook") { size_t pos = path.find_first_of("/"); if (pos != string::npos) path = path.substr(pos); // Create temporary hdef_t object if ((*hist_types)[i].find("TH1") != string::npos) hdef.type = hdef_t::oneD; else if ((*hist_types)[i].find("TH2") != string::npos) hdef.type = hdef_t::twoD; else if ((*hist_types)[i].find("TH3") != string::npos) hdef.type = hdef_t::threeD; else hdef.type = hdef_t::noneD; } else { if ((*hist_types)[i] == "1D") { hdef.type = hdef_t::oneD; } else if ((*hist_types)[i] == "2D") { hdef.type = hdef_t::twoD; } } hdef.title = (*hist_titles)[i]; hdef.active = true; // Look for entry in RS_INFO RS_INFO->Lock(); if (RS_INFO->histdefs.find(hdef.hnamepath) == RS_INFO->histdefs.end()) { RS_INFO->histdefs[hdef.hnamepath] = hdef; } else { // Need code to verify hdefs are same!! } // Make sure this server is in list of this hdef's servers map &servers = RS_INFO->histdefs[hdef.hnamepath].servers; if (servers.find(server) == servers.end()) servers[server] = true; // Make sure this hdef is in list of this servers hdef's vector &hnamepaths = RS_INFO->servers[server].hnamepaths; if (find(hnamepaths.begin(), hnamepaths.end(), hdef.hnamepath) == hnamepaths.end()) hnamepaths.push_back(hdef.hnamepath); RS_INFO->Unlock(); } } //--------------------------------- // RegisterTreeInfo //--------------------------------- void rs_cmsg::RegisterTreeInfo(string server, cMsgMessage *msg) { /// Copy tree information from cMsg into RS_INFO structures. bool good_response = true; // Get pointers to STL containers that hold the tree information vector tree_names; vector tree_paths; vector tree_titles; vector > tree_branch_info; for (unsigned int i = 0; i < msg->getUserInt(); i++) { stringstream ss; ss << i; string tree_name = ss.str() + "_tree_info"; cMsgMessage *submsg = msg->getMessage(tree_name); try { // all args are of type string tree_names.push_back(submsg->getString("tree_name")); tree_paths.push_back(submsg->getString("tree_path")); tree_titles.push_back(submsg->getString("tree_title")); vector *branch_info = submsg->getStringVector( "tree_branch_info"); tree_branch_info.push_back(*branch_info); } catch (cMsgException e) { _DBG_ << "Poorly formed response for \"tree info\". Ignoring." << endl; return; } } // Make sure containers all have the same number of entries if (good_response) { if (tree_names.size() != tree_paths.size()) good_response = false; if (tree_names.size() != tree_titles.size()) good_response = false; if (tree_names.size() != tree_branch_info.size()) good_response = false; } // If the response is incomplete for any reason, then alert user and return. if (!good_response) { _DBG_ << "Poorly formed response for \"tree info\". Ignoring." << endl; return; } // Now that the message is valid make tree_info_t // objects out of the tree information and // store them in the server_info_t's tree vector. vector &rs_trees = RS_INFO->servers[server].trees; for (unsigned int i = 0; i < msg->getUserInt(); i++) { string name = tree_names[i]; string path = tree_paths[i]; string tnamepath = name + "/" + path; vector branch_info = tree_branch_info[i]; rs_trees.push_back(tree_info_t(server, tnamepath, branch_info)); } } //--------------------------------- // RegisterHistogram //--------------------------------- void rs_cmsg::RegisterHistogram(string server, cMsgMessage *msg) { // Get hnamepath from message string hnamepath = msg->getString("hnamepath"); // Lock RS_INFO mutex while working with RS_INFO RS_INFO->Lock(); // Get pointer to hdef_t map::iterator hdef_iter = RS_INFO->histdefs.find(hnamepath); if (hdef_iter == RS_INFO->histdefs.end()) { _DBG_ << "No hdef_t object for hnamepath=\"" << hnamepath << "\"!" << endl; _DBG_ << "Throwing away histogram." << endl; RS_INFO->Unlock(); return; } hdef_t *hdef = &(hdef_iter->second); // Get pointer to server_info_t map::iterator server_info_iter = RS_INFO->servers.find(server); if (server_info_iter == RS_INFO->servers.end()) { _DBG_ << "No server_info_t object for server=\"" << server << "\"!" << endl; _DBG_ << "Throwing away histogram." << endl; RS_INFO->Unlock(); return; } server_info_t *server_info = &(server_info_iter->second); // Get pointer to hinfo_t hid_t hid(server, hnamepath); hinfo_t *hinfo = NULL; map::iterator hinfo_iter = RS_INFO->hinfos.find(hid); if (hinfo_iter == RS_INFO->hinfos.end()) { // hinfo_t object doesn't exist. Add one to RS_INFO RS_INFO->hinfos[hid] = hinfo_t(server, hnamepath); hinfo_iter = RS_INFO->hinfos.find(hid); } hinfo = &(hinfo_iter->second); // Lock ROOT mutex while working with ROOT objects pthread_rwlock_wrlock (ROOT_MUTEX); // RR added to convert cmhbook Histograms to ROOT TH1 *h; if (msg->payloadContainsName ("type")) { // RR added do decipher between cmhbook Histograms and ROOT string type = msg->getString("type"); if (type == "cmhbook") { string csv = msg->getString("csv"); string convert = msg->getString("convert"); if (convert == "TH1I") { TH1I *cmhist = fromCSVToROOTHist(csv); h = dynamic_cast(cmhist); } else if (convert == "TH2I") { TH2I *cmhist = fromCSVToROOTHist(csv); h = dynamic_cast(cmhist); } else if (convert == "TH1F") { TH1F *cmhist = fromCSVToROOTHist(csv); h = dynamic_cast(cmhist); } else if (convert == "TH2F") { TH2F *cmhist = fromCSVToROOTHist(csv); h = dynamic_cast(cmhist); } if (!h) { _DBG_ << "Object received of type \"" << type << "\" is not converted properly to a TH1 type" << endl; pthread_rwlock_unlock(ROOT_MUTEX); RS_INFO->Unlock(); return; } } } else { // Get ROOT object from message and cast it as a TNamed* pthread_rwlock_wrlock(ROOT_MUTEX); MyTMessage *myTM = new MyTMessage(msg->getByteArray(), msg->getByteArrayLength()); TNamed *namedObj = (TNamed*) myTM->ReadObject(myTM->GetClass()); if (!namedObj) { _DBG_ << "No valid object returned in histogram message." << endl; pthread_rwlock_unlock(ROOT_MUTEX); RS_INFO->Unlock(); return; } // Cast this as a histogram pointer h = dynamic_cast(namedObj); if (!h) { _DBG_ << "Object received of type \"" << namedObj->ClassName() << "\" is not a TH1 type" << endl; pthread_rwlock_unlock(ROOT_MUTEX); RS_INFO->Unlock(); return; } } // Update hinfo hinfo->received = time(NULL); if (hinfo->hist) { // Subtract old histo from sum if (hdef->sum_hist) hdef->sum_hist->Add(hinfo->hist, -1.0); // Delete old histo delete hinfo->hist; hinfo->hist = NULL; } // Set pointer to hist in hinfo to new histogram hinfo->hist = h; // Change ROOT TDirectory of new histogram to server's hinfo->hist->SetDirectory(server_info->dir); // Adds the new histogram to the hists map in hdef_t map::iterator hinfo_it = hdef->hists.find(server); // first we have to check for and delete any older versions // of the hist. if (hinfo_it != hdef->hists.end()) { hdef->hists.erase(hinfo_it); } // Now we add the newer version to the map hdef->hists.insert(pair(server, (hinfo_iter->second))); // Add new histogram to sum and flag it as modified _DBG_ << "Adding " << h->GetEntries() << " from " << server << " to hist" << endl; if (hdef->sum_hist) { // Reset sum histo first if showing only one histo at a time if (RS_INFO->viewStyle == rs_info::kViewByServer) hdef->sum_hist->Reset(); hdef->sum_hist->Add(h); } else { string sum_hist_name = string(h->GetName()) + "__sum"; hdef->sum_hist = (TH1*) h->Clone(sum_hist_name.c_str()); hdef->sum_hist->SetDirectory(RS_INFO->sum_dir); } //would want to update the hdef_t time stamp here (Justin B) _DBG_ << hdef->sum_hist << endl; hdef->sum_hist_modified = true; //Justin B hdef->sum_hist_present = true; // Unlock mutexes pthread_rwlock_unlock(ROOT_MUTEX); RS_INFO->Unlock(); }