/* * acrootManager.cpp * * Created on: Mar 24, 2014 * Author: yqiang * * Modified on: * Nov 19, 2014: added beam current and gain, changed gROOTSPY_MUTEX to rwlock, yqiang * */ #include "acrootManager.hh" #include "acrootFile.hh" acrootManager* acrootManager::selfPtr = 0; Int_t acrootManager::prmCompressFactor = 1; // no compression std::queue acrootManager::prmBuffer; UInt_t acrootFile::prfTreeBuffSize = static_cast(1 << 16); UInt_t acrootFile::prfAutoSaveSize = static_cast(1 << 30); string acrootFile::prfBranchName = "record"; // constructor acrootManager::acrootManager() : prmPrefix(""), prmSuffix(""), prmThread(0), prmStop(true), prmFile(0), prmDir( ""), prmFileName("nofile"), prmFileLimit(1e9), prmFileSize(0) { // reinitialize self pointer if (selfPtr != 0) delete selfPtr; selfPtr = this; // clear FIFO while (prmBuffer.size() > 0) prmBuffer.pop(); // Enable RootSpy // DRootSpy *spy = new DRootSpy(); // Create mutex for buffer operation if (prmMutex) delete prmMutex; pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); prmMutex = new pthread_mutex_t; pthread_mutex_init(prmMutex, &attr); return; } // destructor acrootManager::~acrootManager() { if (prmFile != 0 && prmFile->IsOpen()) { prmFile->Close(); prmFile = 0; } selfPtr = 0; return; } void acrootManager::StartDAQ(string dir, string prefix, string suffix, long nlim) { prmDir = dir; prmPrefix = prefix; prmSuffix = suffix; prmFileLimit = nlim; prmStop = false; pthread_create(&prmThread, NULL, WriteThread, (void*) this); return; } void acrootManager::NewRootFile() { // new filename TDatime dtTime; TString rsTime(dtTime.AsSQLString()); rsTime.ReplaceAll(" ", "_"); rsTime.ReplaceAll(":", ""); rsTime.ReplaceAll("-", ""); prmFileName = prmPrefix + string(rsTime.Data()) + prmSuffix + ".root"; string fullpath = prmDir + prmFileName; prmFile = new acrootFile(fullpath.c_str(), "RECREATE", "AC ROOT File", prmCompressFactor); } void acrootManager::WriteFile(const char* channelName, struct timespec* timeStamp, float* buffPtr, int nElm, float* beamCurrent, short* gain) { recordBuffer tmprecord(string(channelName), acrootRecord(*timeStamp, nElm, buffPtr, *beamCurrent, *gain)); // push to buffer pthread_mutex_lock(prmMutex); prmBuffer.push(tmprecord); pthread_mutex_unlock(prmMutex); return; } void acrootManager::CloseFile() { prmStop = true; if (prmThread) { pthread_join(prmThread, NULL); prmThread = 0; } } void *WriteThread(void *argument) { // sleep timer struct timespec sleeptime; sleeptime.tv_sec = 0; sleeptime.tv_nsec = (long int) (0.01 * 1.e9); acrootManager *prmPtr = (acrootManager*) argument; // CLose Existing Root file if (prmPtr->prmFile != 0 && prmPtr->prmFile->IsOpen()) prmPtr->prmFile->Close(); while (!prmPtr->prmStop) { // read buffer if non-empty if (prmPtr->prmBuffer.size()) { pthread_mutex_lock(prmPtr->prmMutex); recordBuffer tmprecord = prmPtr->prmBuffer.front(); prmPtr->prmBuffer.pop(); pthread_mutex_unlock(prmPtr->prmMutex); // check if file open if (prmPtr->prmFile == 0 || !prmPtr->prmFile->IsOpen()) { //pthread_rwlock_rdlock(gROOTSPY_RW_LOCK); prmPtr->NewRootFile(); //pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } // write to disk //pthread_rwlock_rdlock (gROOTSPY_RW_LOCK); prmPtr->prmFile->FillTree(tmprecord.treename, tmprecord.record); //pthread_rwlock_unlock(gROOTSPY_RW_LOCK); // check file size, close if larger than tolerance prmPtr->prmFileSize = prmPtr->prmFile->GetEND(); if (prmPtr->prmFileSize >= prmPtr->prmFileLimit) { //pthread_rwlock_rdlock(gROOTSPY_RW_LOCK); prmPtr->prmFile->Close(); //pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } } else nanosleep(&sleeptime, &sleeptime); } // clear buffer while (prmPtr->prmBuffer.size()) { pthread_mutex_lock(prmPtr->prmMutex); recordBuffer tmprecord = prmPtr->prmBuffer.front(); prmPtr->prmBuffer.pop(); pthread_mutex_unlock(prmPtr->prmMutex); // check if file open if (prmPtr->prmFile == 0 || !prmPtr->prmFile->IsOpen()) { // pthread_rwlock_rdlock (gROOTSPY_RW_LOCK); prmPtr->NewRootFile(); // pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } // write to disk //pthread_rwlock_rdlock (gROOTSPY_RW_LOCK); prmPtr->prmFile->FillTree(tmprecord.treename, tmprecord.record); //pthread_rwlock_unlock(gROOTSPY_RW_LOCK); } //pthread_rwlock_rdlock (gROOTSPY_RW_LOCK); if (prmPtr->prmFile != 0 && prmPtr->prmFile->IsOpen()) prmPtr->prmFile->Close(); prmPtr->prmFile = 0; //pthread_rwlock_unlock(gROOTSPY_RW_LOCK); prmPtr->prmFileName = string("nofile"); prmPtr->prmFileSize = 0; return NULL; } extern "C" { #include "acrootManager.h" void StartRootDAQ(const char* dir, const char* prefix, const char* suffix, long nlim) { string strdir(dir); string strpfx(prefix); string strsfx(suffix); acrootManager::GetInstance()->StartDAQ(strdir, strpfx, strsfx, nlim); return; } void CloseRootFile() { acrootManager::GetInstance()->CloseFile(); return; } void WriteRootFile(const char* channelName, struct timespec* timeStamp, float* buffPtr, int nElm, float* beamCurrent, short* gain) { acrootManager::GetInstance()->WriteFile(channelName, timeStamp, buffPtr, nElm, beamCurrent, gain); return; } int GetBufferSize() { return acrootManager::GetInstance()->prmBuffer.size(); } const char* GetFileName() { return acrootManager::GetInstance()->prmFileName.c_str(); } long GetFileSize() { return acrootManager::GetInstance()->prmFileSize; } } ;