/* * pxirootManager.cpp * * Created on: Mar 24, 2014 * Author: yqiang */ #include "pxirootManager.hh" #include "pxirootFile.hh" pxirootManager* pxirootManager::selfPtr = 0; Int_t pxirootManager::prmCompressFactor = 0; // no compression std::queue pxirootManager::prmBuffer; UInt_t pxirootFile::prfTreeBuffSize = static_cast(1 << 16); //UInt_t pxirootFile::prfAutoSaveSize = static_cast(1 << 30); // the documentation is not clear: positive number means number of events // negative number means number of bytes UInt_t pxirootFile::prfAutoSaveSize = static_cast(1800); string pxirootFile::prfBranchName = "record"; // constructor pxirootManager::pxirootManager() : prmPrefix(""), prmSuffix(""), prmThread(0), prmStop(true), prmFile(0), prmDir( ""), prmFileName("nofile"), prmFileLimit(1e9), prmFileSize(0) { // reinitialize self pointer if (selfPtr != 0) delete selfPtr; selfPtr = this; // clear FIFO while (prmBuffer.size() > 0) prmBuffer.pop(); // Enable RootSpy // DRootSpy *spy = new DRootSpy(); // Create mutex for buffer operation if (prmMutex) delete prmMutex; pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); prmMutex = new pthread_mutex_t; pthread_mutex_init(prmMutex, &attr); return; } // destructor pxirootManager::~pxirootManager() { if (prmFile != 0 && prmFile->IsOpen()) { prmFile->Close(); prmFile = 0; } selfPtr = 0; return; } void pxirootManager::StartDAQ(string dir, string prefix, string suffix, long nlim) { prmDir = dir; prmPrefix = prefix; prmSuffix = suffix; prmFileLimit = nlim; prmStop = false; pthread_create(&prmThread, NULL, WriteThread, (void*) this); return; } void pxirootManager::NewRootFile() { // new filename TDatime dtTime; TString rsTime(dtTime.AsSQLString()); rsTime.ReplaceAll(" ", "_"); rsTime.ReplaceAll(":", ""); rsTime.ReplaceAll("-", ""); prmFileName = prmPrefix + string(rsTime.Data()) + prmSuffix + ".root"; string fullpath = prmDir + prmFileName; prmFile = new pxirootFile(fullpath.c_str(), "RECREATE", "PXI ROOT File", prmCompressFactor); } void pxirootManager::WriteFile(const char* channelName, struct timespec* timeStamp, float* buffPtr, int nElm) { recordBuffer tmprecord(string(channelName), pxirootRecord(*timeStamp, nElm, buffPtr)); // push to buffer pthread_mutex_lock(prmMutex); prmBuffer.push(tmprecord); pthread_mutex_unlock(prmMutex); return; } void pxirootManager::CloseFile() { prmStop = true; if (prmThread) { pthread_join(prmThread, NULL); prmThread = 0; } } void *WriteThread(void *argument) { // sleep timer struct timespec sleeptime; sleeptime.tv_sec = 0; sleeptime.tv_nsec = (long int) (0.001 * 1.e9); pxirootManager *prmPtr = (pxirootManager*) argument; // CLose Existing Root file if (prmPtr->prmFile != 0 && prmPtr->prmFile->IsOpen()) prmPtr->prmFile->Close(); while (!prmPtr->prmStop) { // read buffer if non-empty if (prmPtr->prmBuffer.size()) { pthread_mutex_lock(prmPtr->prmMutex); recordBuffer tmprecord = prmPtr->prmBuffer.front(); prmPtr->prmBuffer.pop(); pthread_mutex_unlock(prmPtr->prmMutex); // check if file open if (prmPtr->prmFile == 0 || !prmPtr->prmFile->IsOpen()) { // pthread_rwlock_wrlock(gROOTSPY_RW_LOCK); prmPtr->NewRootFile(); // pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } // write to disk // pthread_rwlock_wrlock(gROOTSPY_RW_LOCK); prmPtr->prmFile->FillTree(tmprecord.treename, tmprecord.record); // pthread_rwlock_unlock(gROOTSPY_RW_LOCK); // check file size, close if larger than tolerance prmPtr->prmFileSize = prmPtr->prmFile->GetEND(); if (prmPtr->prmFileSize >= prmPtr->prmFileLimit) { // pthread_rwlock_wrlock(gROOTSPY_RW_LOCK); prmPtr->prmFile->Close(); // pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } } else nanosleep(&sleeptime, &sleeptime); } // clear buffer while (prmPtr->prmBuffer.size()) { pthread_mutex_lock(prmPtr->prmMutex); recordBuffer tmprecord = prmPtr->prmBuffer.front(); prmPtr->prmBuffer.pop(); pthread_mutex_unlock(prmPtr->prmMutex); // check if file open if (prmPtr->prmFile == 0 || !prmPtr->prmFile->IsOpen()) { // pthread_rwlock_wrlock(gROOTSPY_RW_LOCK); prmPtr->NewRootFile(); // pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } // write to disk // pthread_rwlock_wrlock(gROOTSPY_RW_LOCK); prmPtr->prmFile->FillTree(tmprecord.treename, tmprecord.record); // pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } // pthread_rwlock_wrlock(gROOTSPY_RW_LOCK); if (prmPtr->prmFile != 0 && prmPtr->prmFile->IsOpen()) prmPtr->prmFile->Close(); prmPtr->prmFile = 0; // pthread_rwlock_unlock(gROOTSPY_RW_LOCK); prmPtr->prmFileName = string("nofile"); prmPtr->prmFileSize = 0; return NULL; } extern "C" { #include "pxirootManager.h" void StartRootDAQ(const char* dir, const char* prefix, const char* suffix, long nlim) { string strdir(dir); string strpfx(prefix); string strsfx(suffix); pxirootManager::GetInstance()->StartDAQ(strdir, strpfx, strsfx, nlim); return; } void CloseRootFile() { pxirootManager::GetInstance()->CloseFile(); return; } void WriteRootFile(const char* channelName, struct timespec* timeStamp, float* buffPtr, int nElm) { pxirootManager::GetInstance()->WriteFile(channelName, timeStamp, buffPtr, nElm); return; } int GetBufferSize() { return pxirootManager::GetInstance()->prmBuffer.size(); } const char* GetFileName() { return pxirootManager::GetInstance()->prmFileName.c_str(); } long GetFileSize() { return pxirootManager::GetInstance()->prmFileSize; } } ;