#include #include #include #include #include //#include #include #include #include #include #include #include #include using namespace std; using std::chrono::steady_clock; using std::chrono::duration; using std::chrono::duration_cast; int VERBOSE=1; bool QUIT = false; const char *HDRDMA_VERSION = "1.0.2"; int HDRDMA_RET_VAL = 0; bool HDRDMA_IS_SERVER = false; bool HDRDMA_IS_CLIENT = false; int HDRDMA_LOCAL_PORT = 10470; int HDRDMA_REMOTE_PORT = 10470; std::string HDRDMA_REMOTE_ADDR = "gluon47.jlab.org"; int HDRDMA_CONNECTION_TIMEOUT = 10; // seconds string HDRDMA_SRCFILENAME = ""; std::string HDRDMA_DSTFILENAME = ""; bool HDRDMA_DELETE_AFTER_SEND = false; bool HDRDMA_CALCULATE_CHECKSUM = false; bool HDRDMA_MAKE_PARENT_DIRS = false; string HDRDMA_USERNAME = ""; string HDRDMA_GROUPNAME = ""; int HDRDMA_ZEROMQ_STATS_PORT = 10471; // port to publish zeroMQ messages about our stats to. 0=don't publish int HDRDMA_ZEROMQ_CONTROL_PORT = 10472; // port to publish zeroMQ messages about our stats to. 0=don't publish string HDRDMA_COMMAND_HOST; string HDRDMA_COMMAND; void *HDRDMA_ZEROMQ_CONTEXT = nullptr; // will be set by hdRDMAstats or hdRDMAcontrol (whichever is called first) uint64_t HDRDMA_BUFF_LEN_GB = 0; // defaults differ for server and client modes uint64_t HDRDMA_NUM_BUFF_SECTIONS = 0; // so these are set in ParseCommandLineArguments std::mutex HDRDMA_RECV_FNAMES_MUTEX; std::set HDRDMA_RECV_FNAMES; atomic BYTES_RECEIVED_TOT(0); atomic NFILES_RECEIVED_TOT(0); double HDRDMA_LAST_RATE_10sec=0.0; double HDRDMA_LAST_RATE_1min=0.0; double HDRDMA_LAST_RATE_5min=0.0; #define _DBG__ cout<<__FILE__<<":"<<__LINE__<(now - t_last_10sec); auto duration_1min = duration_cast(now - t_last_1min); auto duration_5min = duration_cast(now - t_last_5min); auto delta_t_10sec = duration_10sec.count(); auto delta_t_1min = duration_1min.count(); auto delta_t_5min = duration_5min.count(); if( delta_t_10sec >= 10.0 ){ double GB_received = (BYTES_RECEIVED_TOT - last_bytes_received_10sec)/1000000000; double rate = GB_received/delta_t_10sec; if( VERBOSE>2 )cout << "=== [10 sec avg.] " << rate << " GB/s -- " << (double)BYTES_RECEIVED_TOT/1.0E12 << " TB total received" << endl; t_last_10sec = now; last_bytes_received_10sec = BYTES_RECEIVED_TOT; HDRDMA_LAST_RATE_10sec= rate; } if( delta_t_1min >= 60.0 ){ double GB_received = (BYTES_RECEIVED_TOT - last_bytes_received_1min)/1000000000; double rate = GB_received/delta_t_1min; if( VERBOSE>1 )cout << "=== [ 1 min avg.] " << rate << " GB/s" << endl; t_last_1min = now; last_bytes_received_1min = BYTES_RECEIVED_TOT; HDRDMA_LAST_RATE_1min= rate; } if( delta_t_5min >= 300.0 ){ double GB_received = (BYTES_RECEIVED_TOT - last_bytes_received_5min)/1000000000; double rate = GB_received/delta_t_5min; if( VERBOSE>0 )cout << "=== [ 5 min avg.] " << rate << " GB/s" << endl; t_last_5min = now; last_bytes_received_5min = BYTES_RECEIVED_TOT; HDRDMA_LAST_RATE_5min= rate; } std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); } // Stop server from listening (if one is) hdrdma.StopListening(); // Stop stats reporting if( hdrdmastats != nullptr ) delete hdrdmastats; if( hdrdmacontrol != nullptr ) delete hdrdmacontrol; // At this point we have shut most things down but there are still a few // threads running that are stuck in blocking calls that are just not // easy to kill. Thus, we harshly kill the entire process to force the // issue. // n.b. we set an exit value of -1 so the system service will re-start // automatically (if we're being run that way). _exit(HDRDMA_RET_VAL); #ifdef HAVE_ZEROMQ zmq_ctx_destroy( HDRDMA_ZEROMQ_CONTEXT ); // we'll never get here due to above call. This would block forever anyway. #endif // HAVE_ZEROMQ } // Connect to remote peer if we are in client mode. // This will attempt to connect to an hdRDMA object listening on the // specified host/port. If a connection is made then the RDMA transfer // information will be exchanged and stored in the hdRDMA object and // be made available for transfers. If the connection cannot be made // then it will exit the program with an error message. if( HDRDMA_IS_CLIENT ){ // Create an hdRDMA object hdRDMA hdrdma; hdrdma.Connect( HDRDMA_REMOTE_ADDR, HDRDMA_REMOTE_PORT ); hdrdma.SendFile( HDRDMA_SRCFILENAME, HDRDMA_DSTFILENAME, HDRDMA_DELETE_AFTER_SEND, HDRDMA_CALCULATE_CHECKSUM, HDRDMA_MAKE_PARENT_DIRS ); } // Send control command to remote hdrdmacp process being run in server mode if( !HDRDMA_COMMAND_HOST.empty() ) cout << SendControlCommand(HDRDMA_COMMAND_HOST, HDRDMA_COMMAND) << endl; return HDRDMA_RET_VAL; } //------------------------------------------------------------- // SendControlCommand // // Send a control command to a remote hdrdmacp process running in // server mode. The response is returned in the form of a string. //------------------------------------------------------------- string SendControlCommand(string host, string command) { #ifndef HAVE_ZEROMQ return string("Unable to send control command due to zeroMQ support not being compiled in."); #else try{ char conn_str[256]; sprintf( conn_str, "tcp://%s:%d", host.c_str(), HDRDMA_ZEROMQ_CONTROL_PORT ); HDRDMA_ZEROMQ_CONTEXT = zmq_ctx_new(); void *requester = zmq_socket( HDRDMA_ZEROMQ_CONTEXT, ZMQ_REQ ); zmq_connect( requester, conn_str); vector buff( 20480, 0); zmq_send( requester, command.c_str(), command.size(), 0 ); zmq_recv( requester, buff.data(), buff.size(), 0); zmq_close(requester); zmq_ctx_destroy( HDRDMA_ZEROMQ_CONTEXT ); return string( buff.data() ); }catch(...){ return string("Error sending zmq control message"); } #endif // HAVE_ZEROMQ } //------------------------------------------------------------- // GetTotalSystemRAMGB // // Returns the total system memory in GB. This is only used // when automatically determining amount to allocate when in // server mode. //------------------------------------------------------------- int GetTotalSystemRAMGB(void) { // Read memory from /proc/meminfo ifstream ifs("/proc/meminfo"); if(ifs.is_open()){ char buff[4096]; bzero(buff, 4096); ifs.read(buff, 4095); ifs.close(); string sbuff(buff); size_t pos = sbuff.find("MemTotal:"); if(pos != string::npos) return atoi(&buff[pos+10+1])/1E6; } return 0; } //------------------------------------------------------------- // ParseCommandLineArguments //------------------------------------------------------------- void ParseCommandLineArguments( int narg, char *argv[] ) { if( narg ==1 ) { Usage(); exit(0); } std::vector vfnames; for( int i=1; i8 ) HDRDMA_BUFF_LEN_GB = 8; } if( HDRDMA_NUM_BUFF_SECTIONS == 0 ){ HDRDMA_NUM_BUFF_SECTIONS = 4*HDRDMA_BUFF_LEN_GB; if( HDRDMA_NUM_BUFF_SECTIONS<8 ) HDRDMA_NUM_BUFF_SECTIONS = 8; } } } //------------------------------------------------------------- // Usage //------------------------------------------------------------- void Usage(bool show_extended) { cout << endl; cout << "Hall-D RDMA file copy server/client" <