// // Created by hdsys on 11/21/19. // #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; #include "hdRDMAcontrol.h" extern bool QUIT; extern void *HDRDMA_ZEROMQ_CONTEXT; extern int HDRDMA_RET_VAL; extern std::mutex HDRDMA_RECV_FNAMES_MUTEX; extern std::set HDRDMA_RECV_FNAMES; extern atomic BYTES_RECEIVED_TOT; extern atomic NFILES_RECEIVED_TOT; extern double HDRDMA_LAST_RATE_10sec; extern double HDRDMA_LAST_RATE_1min; extern double HDRDMA_LAST_RATE_5min; //------------------------------------------------------------- // hdRDMAcontrol //------------------------------------------------------------- hdRDMAcontrol::hdRDMAcontrol(int port):port(port) { cout << "Launching hdRDMAcontrol thread ..." << endl; thr = new std::thread( &hdRDMAcontrol::ServerLoop, this ); } //------------------------------------------------------------- // ~hdRDMAcontrol //------------------------------------------------------------- hdRDMAcontrol::~hdRDMAcontrol(void) { done = true; if(thr != nullptr) { cout << "Joining hdRDMAcontrol thread ..." << endl; thr->join(); cout << "hdRDMAcontrol thread joined." << endl; } } //------------------------------------------------------------- // ServerLoop //------------------------------------------------------------- void hdRDMAcontrol::ServerLoop(void) { cout << "hdRDMAcontrol::Publish called" << endl; pthread_setname_np( pthread_self(), "hdRDMAcontrol" ); #if HAVE_ZEROMQ char bind_str[256]; sprintf( bind_str, "tcp://*:%d", port ); void *responder = zmq_socket( HDRDMA_ZEROMQ_CONTEXT, ZMQ_REP ); int rc = zmq_bind( responder, bind_str); if( rc != 0 ){ cout << "Unable to bind zeroMQ control socket " << port << "!" << endl; perror("zmq_bind"); return; } // All messages will include host char host[256]; gethostname( host, 256 ); // Loop until told to quit while( !done && ! QUIT){ // Listen for request (non-blocking) char buff[2048]; auto rc = zmq_recv( responder, buff, 2048, ZMQ_DONTWAIT); if( rc< 0 ){ if( (errno==EAGAIN) || (errno==EINTR) ){ std::this_thread::sleep_for(std::chrono::milliseconds(250)); continue; }else{ cerr << "ERROR listening on control socket: errno=" << errno << endl; done = true; continue; } } // Split request into tokens std::vector vals; istringstream iss( std::string(buff, rc) ); copy( istream_iterator(iss), istream_iterator(), back_inserter(vals) ); // Response stringstream ss; if( vals.empty()){ ss << ""; }else if( vals[0] == "quit" ){ QUIT = true; done = true; HDRDMA_RET_VAL = vals.size()>1 ? atoi(vals[1].c_str()):-1; // allow remote user to set return value. default to -1 so system service will restart ss << "OK"; }else if( vals[0] == "reset_counters" ){ BYTES_RECEIVED_TOT = 0; NFILES_RECEIVED_TOT = 0; HDRDMA_LAST_RATE_10sec = 0.0; HDRDMA_LAST_RATE_1min = 0.0; HDRDMA_LAST_RATE_5min = 0.0; ss << "OK"; }else if( vals[0] == "get_file_size" ){ // mulitple files may be specified if( vals.size()<2){ ss << ""; }else{ for( uint32_t i=1; i