// $Id$ // hdmon_ctl.cc // Ported from cMsg to ZeroMQ pub/sub messaging with class name hdmon_ctl // Created: [Date] by [Your Name] #include "hdmon_ctl.h" #include #include #include #include #include #include #include #include #include #include #include #ifdef __APPLE__ #define HAVE_SYSCTL #include #else #define HAVE_PROC #endif using namespace std; extern vector PROG_ARGS; // A simple helper to serialize a vector into a comma-separated string. template string serializeVector(const vector& vec) { ostringstream oss; oss << "["; for (size_t i = 0; i < vec.size(); ++i) { if (i > 0) oss << ", "; oss << vec[i]; } oss << "]"; return oss.str(); } //--------------------------------------------------------------------- // hdmon_ctl (Constructor) //--------------------------------------------------------------------- hdmon_ctl::hdmon_ctl(JApplication *japp) : context(1), subscriber(context, ZMQ_SUB), publisher(context, ZMQ_PUB), running(true) { this->japp = japp; // Create a unique name for ourself based on hostname and PID. char hostname[256]; gethostname(hostname, 256); char str[512]; sprintf(str, "%s_%d", hostname, getpid()); myname = string(str); // Default endpoints (client mode: use connect on both sockets). // By default we assume the proxy is running on localhost: // - The proxy binds an XSUB socket on port 11246 (for publishers to connect) // - The proxy binds an XPUB socket on port 11247 (for subscribers to connect) pub_endpoint = "tcp://localhost:11246"; // For sending messages (publisher connects to proxy's XSUB) sub_endpoint = "tcp://localhost:11247"; // For receiving messages (subscriber connects to proxy's XPUB) // Check for the HDMON_UDL environment variable. // Expected format: "ZMQ://host:pub-port:sub-port/hdmonctl" const char* udl_env = getenv("HDMON_UDL"); if (udl_env != NULL) { std::string udl_str(udl_env); // Expected format: "ZMQ://host:pub-port:sub-port/topic" const std::string prefix = "ZMQ://"; if (udl_str.compare(0, prefix.size(), prefix) == 0) { // Remove the "ZMQ://" prefix. std::string rest = udl_str.substr(prefix.size()); // Find the '/' separating the address part and the topic. size_t pos = rest.find('/'); if (pos != std::string::npos) { std::string addr_part = rest.substr(0, pos); // The topic is rest.substr(pos + 1) but we ignore it here. // Now, addr_part is expected to be "host:pub-port:sub-port" size_t pos1 = addr_part.find(":"); size_t pos2 = addr_part.find(":", pos1 + 1); if (pos1 != std::string::npos && pos2 != std::string::npos) { std::string host = addr_part.substr(0, pos1); std::string pub_port_str = addr_part.substr(pos1 + 1, pos2 - pos1 - 1); std::string sub_port_str = addr_part.substr(pos2 + 1); int pub_port_val = atoi(pub_port_str.c_str()); int sub_port_val = atoi(sub_port_str.c_str()); // In our design, the hdmon_ctl publisher socket will connect to the proxy's XSUB, // and the subscriber socket will connect to the proxy's XPUB. pub_endpoint = "tcp://" + host + ":" + std::to_string(pub_port_val); sub_endpoint = "tcp://" + host + ":" + std::to_string(sub_port_val); } } } } // Set configuration parameters. These defaults can be overridden via the command line. JParameterManager *parms = japp->GetJParameterManager(); parms->SetDefaultParameter("HDMON:SUB_ENDPOINT", sub_endpoint); parms->SetDefaultParameter("HDMON:PUB_ENDPOINT", pub_endpoint); parms->SetDefaultParameter("HDMON:Name", myname); parms->SetDefaultParameter("HDMON:VERBOSE", VERBOSE); parms->SetDefaultParameter("HDMON:LEVEL", hdmon_level); // Log the connection information. LOG << "---------------------------------------------------" << LOG_END; LOG << "hdmon_ctl name: \"" << myname << "\"" << LOG_END; LOG << "Will connect as subscriber to: " << sub_endpoint << LOG_END; LOG << "Will connect as publisher to: " << pub_endpoint << LOG_END; LOG << "---------------------------------------------------" << LOG_END; // Set up the subscriber socket: connect and subscribe to the generic topic "hdmonctl" // and to our unique name. try { subscriber.connect(sub_endpoint); subscriber.set(zmq::sockopt::subscribe, "hdmonctl"); subscriber.set(zmq::sockopt::subscribe, myname); } catch (const zmq::error_t &e) { LOG << "Error setting up subscriber socket: " << e.what() << LOG_END; return; } // Set up the publisher socket: connect (do not bind, since the proxy is binding). try { publisher.connect(pub_endpoint); } catch (const zmq::error_t &e) { LOG << "Error setting up publisher socket: " << e.what() << LOG_END; return; } // Start the background thread to receive messages. recv_thread = std::thread([this](){ while (running) { zmq::pollitem_t items[] = { { static_cast(subscriber), 0, ZMQ_POLLIN, 0 } }; // Poll with a 100ms timeout to allow checking the running flag. zmq::poll(items, 1, std::chrono::milliseconds(100)); if(items[0].revents & ZMQ_POLLIN) { try { // Expect three message parts: subject, sender, and command text. zmq::message_t part; if(!subscriber.recv(part, zmq::recv_flags::none)) continue; string subject(static_cast(part.data()), part.size()); if(!subscriber.recv(part, zmq::recv_flags::none)) continue; string sender(static_cast(part.data()), part.size()); if(!subscriber.recv(part, zmq::recv_flags::none)) continue; string cmd(static_cast(part.data()), part.size()); if(VERBOSE > 0) { LOG << "Received message -- Subject: " << subject << " Sender: " << sender << " Text: " << cmd << LOG_END; } processMessage(subject, sender, cmd); } catch (const zmq::error_t &e) { LOG << "ZMQ error in recvLoop: " << e.what() << LOG_END; } } } }); // Broadcast our presence (similar to sending an "I am here" message). sendMessage("hdmonctl", myname, "I am here"); // Start periodic thread info updates to the "hdmongui" topic every 2 seconds. update_thread = std::thread([this,japp](){ while (running) { // Retrieve thread info from japp. auto NThreads = japp->GetNThreads(); auto InstantaneousRate = japp->GetInstantaneousRate(); auto IntegratedRate = japp->GetIntegratedRate(); auto NEventsProcessed = japp->GetNEventsProcessed(); ostringstream oss; oss << "thread info\n" << "threads: " << NThreads << "\n" << "instantaneous_rates: " << InstantaneousRate << "\n" << "average_rates: " << IntegratedRate << "\n" << "nevents: " << NEventsProcessed << "\n" << "level: " << this->hdmon_level; sendMessage("hdmongui", myname, oss.str()); std::this_thread::sleep_for(std::chrono::seconds(2)); } }); } //--------------------------------------------------------------------- // ~hdmon_ctl (Destructor) //--------------------------------------------------------------------- hdmon_ctl::~hdmon_ctl() { running = false; if(recv_thread.joinable()){ recv_thread.join(); } if(update_thread.joinable()){ update_thread.join(); } subscriber.close(); publisher.close(); // The context is cleaned up automatically. } //--------------------------------------------------------------------- // sendMessage: Sends a multipart message (subject, sender, text) via ZeroMQ. //--------------------------------------------------------------------- void hdmon_ctl::sendMessage(const string &subject, const string &sender, const string &text) { try { if( VERBOSE > 0 ){ LOG << "hdmon sending message: subject=\"" << subject << "\" mess=\"" << text << "\"" << LOG_END; } zmq::message_t m_subject(subject.data(), subject.size()); zmq::message_t m_sender(sender.data(), sender.size()); zmq::message_t m_text(text.data(), text.size()); publisher.send(m_subject, zmq::send_flags::sndmore); publisher.send(m_sender, zmq::send_flags::sndmore); publisher.send(m_text, zmq::send_flags::none); } catch (const zmq::error_t &e) { LOG << "Error sending message: " << e.what() << LOG_END; } } //--------------------------------------------------------------------- // processMessage: Contains the logic previously implemented in the cMsg callback. //--------------------------------------------------------------------- void hdmon_ctl::processMessage(const string &subject, const string &sender, const string &cmd) { // In our convention, the “sender” is the unique name of the sender. // We reply to the sender by sending a message with subject = sender. auto respond = [this, &sender](const string &responseText) { sendMessage(sender, myname, responseText); }; if(cmd == "who's there?"){ respond("I am here"); return; } if(cmd == "kill"){ LOG << "Killing application ..." << LOG_END; exit(-1); return; } if(cmd == "quit"){ LOG << "Quitting application ..." << LOG_END; japp->Quit(); return; } if(cmd == "pause"){ LOG << "Pausing event processing ..." << LOG_END; japp->Stop(); return; } if(cmd == "resume"){ LOG << "Resuming event processing ..." << LOG_END; japp->Run(); return; } if(cmd == "get threads"){ auto NThreads = japp->GetNThreads(); auto InstantaneousRate = japp->GetInstantaneousRate(); auto IntegratedRate = japp->GetIntegratedRate(); auto NEventsProcessed = japp->GetNEventsProcessed(); ostringstream oss; oss << "thread info\n" << "threads: " << NThreads << "\n" << "instantaneous_rates: " << InstantaneousRate << "\n" << "average_rates: " << IntegratedRate << "\n" << "nevents: " << NEventsProcessed << "\n" << "level: " << this->hdmon_level; respond(oss.str()); return; } if(cmd.find("killthread ") == 0){ // This was available in JANA1, but dropped in JANA2 respond("Individual thread killing no longer supported"); return; } if(cmd.find("set nthreads ") == 0){ stringstream ss(cmd.substr(13)); int Nthreads = 0; ss >> Nthreads; if(Nthreads > 0){ japp->Scale(Nthreads); respond("OK"); } else { respond("BAD value for nthreads"); } return; } if(cmd.find("list configuration parameters") == 0){ respond("listing configuration parameters not currently supported"); return; } if(cmd.find("list sources") == 0){ respond("listing sources not currently supported"); return; } if(cmd.find("host info") == 0){ vector keys, vals; #ifdef HAVE_PROC HostInfoPROC(keys, vals); #endif #ifdef HAVE_SYSCTL HostInfoSYSCTL(keys, vals); #endif if(keys.size() != vals.size()){ LOG << "keys.size() != vals.size() when returning host info!!" << LOG_END; keys.clear(); vals.clear(); } for(size_t i = 0; i < keys.size(); i++){ if(keys[i] == "") keys[i] = ""; if(vals[i] == "") vals[i] = ""; } ostringstream oss; oss << "host info\n"; for (size_t i = 0; i < keys.size(); ++i) oss << keys[i] << ": " << vals[i] << "\n"; respond(oss.str()); return; } if(cmd.find("host status") == 0){ vector keys, vals; #ifdef HAVE_PROC HostStatusPROC(keys, vals); #endif #ifdef HAVE_SYSCTL HostStatusSYSCTL(keys, vals); #endif if(keys.size() != vals.size()){ LOG << "keys.size() != vals.size() when returning host status!!" << LOG_END; keys.clear(); vals.clear(); } for(size_t i = 0; i < keys.size(); i++){ if(keys[i] == "") keys[i] = ""; if(vals[i] == "") vals[i] = ""; } ostringstream oss; oss << "host status\n"; for (size_t i = 0; i < keys.size(); ++i) oss << keys[i] << ": " << vals[i] << "\n"; respond(oss.str()); return; } if(cmd.find("command line") == 0){ ostringstream oss; oss << "command line\n"; for(auto &arg : PROG_ARGS) oss << arg << " "; respond(oss.str()); return; } LOG << "Received unknown command: " << cmd << LOG_END; } //--------------------------------------------------------------------- // HostStatusPROC (Linux): Reads /proc/stat to compute CPU usage. //--------------------------------------------------------------------- void hdmon_ctl::HostStatusPROC(vector &keys, vector &vals) { #ifdef HAVE_PROC static time_t last_time = 0; static double last_user = 0.0, last_nice = 0.0, last_sys = 0.0, last_idle = 0.0; static double delta_user = 0.0, delta_nice = 0.0, delta_sys = 0.0, delta_idle = 1.0; time_t now = time(NULL); if(now > last_time){ ifstream ifs("/proc/stat"); if(ifs.is_open()){ string cpu; double user, nice, sys, idle; ifs >> cpu >> user >> nice >> sys >> idle; ifs.close(); delta_user = user - last_user; delta_nice = nice - last_nice; delta_sys = sys - last_sys; delta_idle = idle - last_idle; last_user = user; last_nice = nice; last_sys = sys; last_idle = idle; last_time = now; } } double norm = delta_user + delta_nice + delta_sys + delta_idle; double user_percent = 100.0 * delta_user / norm; double nice_percent = 100.0 * delta_nice / norm; double sys_percent = 100.0 * delta_sys / norm; double idle_percent = 100.0 * delta_idle / norm; double cpu_usage = 100.0 - idle_percent; char buf[256]; sprintf(buf, "%5.1f", user_percent); keys.push_back("user"); vals.push_back(buf); sprintf(buf, "%5.1f", nice_percent); keys.push_back("nice"); vals.push_back(buf); sprintf(buf, "%5.1f", sys_percent); keys.push_back("sys"); vals.push_back(buf); sprintf(buf, "%5.1f", idle_percent); keys.push_back("idle"); vals.push_back(buf); sprintf(buf, "%5.1f", cpu_usage); keys.push_back("cpu_usage"); vals.push_back(buf); #endif } //--------------------------------------------------------------------- // HostInfoPROC (Linux): Reads /proc/cpuinfo, /proc/meminfo, etc. //--------------------------------------------------------------------- void hdmon_ctl::HostInfoPROC(vector &keys, vector &vals) { #ifdef HAVE_PROC ifstream ifs("/proc/cpuinfo"); if(!ifs.is_open()) return; vector> procinfo; map pinfo; while(!ifs.eof()){ char cline[1024]; ifs.getline(cline, 1024); if(ifs.gcount() < 1) break; string line(cline); size_t pos = line.find(":"); if(pos == string::npos) continue; string key = line.substr(0, pos-1); string val = (pos+1)==line.length() ? "" : line.substr(pos+2); pos = key.find_last_not_of(" \t"); if(pos != string::npos) key = key.substr(0, pos+1); if(key == "processor"){ if(!pinfo.empty()) procinfo.push_back(pinfo); pinfo.clear(); } pinfo[key] = val; } ifs.close(); if(!pinfo.empty()) procinfo.push_back(pinfo); map real_cores; for(size_t i = 0; i < procinfo.size(); i++){ map &p = procinfo[i]; int physical_id = atoi(p["physical id"].c_str()); int cpu_cores = atoi(p["cpu cores"].c_str()); real_cores[physical_id] = cpu_cores; } if(!procinfo.empty()){ keys.push_back("CPU Brand"); vals.push_back(procinfo[0]["model name"]); } FILE *pipe = popen("uname -m", "r"); if(pipe){ char buff[128]; if(fgets(buff, 128, pipe) != NULL){ buff[strlen(buff)-1] = 0; // trim newline keys.push_back("Machine type"); vals.push_back(buff); } pclose(pipe); } char buf[256]; if(!procinfo.empty()){ sprintf(buf, "%3.1f GHz", atof(procinfo[0]["cpu MHz"].c_str())/1000.0); keys.push_back("Clock Speed"); vals.push_back(buf); } ifs.open("/proc/meminfo"); int mem_kB = 0; if(ifs.is_open()){ char buff[4096]; memset(buff, 0, 4096); ifs.read(buff, 4095); ifs.close(); string sbuff(buff); size_t pos = sbuff.find("MemTotal:"); if(pos != string::npos) mem_kB = atoi(&buff[pos+11]); } int mem_GB = (int)round(0.531161471 + (double)mem_kB * 9.65808E-7); sprintf(buf, "%d GB", mem_GB); keys.push_back("RAM"); vals.push_back(buf); int Ncores_physical = 0; for(auto &p : real_cores) Ncores_physical += p.second; sprintf(buf, "%d", Ncores_physical); keys.push_back("Ncores (physical)"); vals.push_back(buf); sprintf(buf, "%lu", procinfo.size()); keys.push_back("Ncores (logical)"); vals.push_back(buf); #endif } //--------------------------------------------------------------------- // HostStatusSYSCTL (Mac OS X) //--------------------------------------------------------------------- void hdmon_ctl::HostStatusSYSCTL(vector &keys, vector &vals) { // Implement as needed for Mac OS X. } //--------------------------------------------------------------------- // HostInfoSYSCTL (Mac OS X) //--------------------------------------------------------------------- void hdmon_ctl::HostInfoSYSCTL(vector &keys, vector &vals) { #ifdef HAVE_SYSCTL stringstream ss; char buff[256]; size_t bufflen = 256; // Model bzero(buff, bufflen); sysctlbyname("hw.model", buff, &bufflen, NULL, 0); keys.push_back("Model"); vals.push_back(buff); // CPU Brand bufflen = 256; bzero(buff, bufflen); if(sysctlbyname("machdep.cpu.brand_string", buff, &bufflen, NULL, 0) == 0){ keys.push_back("CPU Brand"); vals.push_back(buff); } // Machine type bufflen = 256; bzero(buff, bufflen); sysctlbyname("hw.machine", buff, &bufflen, NULL, 0); keys.push_back("Machine type"); vals.push_back(buff); // Clock Speed uint64_t cpufrequency = 0; bufflen = sizeof(cpufrequency); sysctlbyname("hw.cpufrequency", &cpufrequency, &bufflen, NULL, 0); double fcpufrequency = ((double)cpufrequency) / 1.0E9; ss.str(""); ss << fcpufrequency << " GHz"; keys.push_back("Clock Speed"); vals.push_back(ss.str()); // RAM uint64_t memsize = 0; bufflen = sizeof(memsize); sysctlbyname("hw.memsize", &memsize, &bufflen, NULL, 0); double fmemsize = ((double)memsize) / 1024.0 / 1024.0 / 1024.0; ss.str(""); ss << fmemsize << " GB"; keys.push_back("RAM"); vals.push_back(ss.str()); // Ncores (physical) uint32_t Ncores = 0; bufflen = sizeof(Ncores); sysctlbyname("hw.physicalcpu", &Ncores, &bufflen, NULL, 0); ss.str(""); ss << Ncores; keys.push_back("Ncores (physical)"); vals.push_back(ss.str()); // Ncores (logical) Ncores = 0; bufflen = sizeof(Ncores); sysctlbyname("hw.logicalcpu", &Ncores, &bufflen, NULL, 0); ss.str(""); ss << Ncores; keys.push_back("Ncores (logical)"); vals.push_back(ss.str()); #endif }