''' Created on Dec 4, 2015 Class to facilitate working with a list of EPICS PVs. Inherits from list class to extend the functionality of list. @author: Hovanes Egiyan ''' import time import sys import math from java.lang import Thread, Runnable from EpicsPV import EpicsPV class EpicsPVSet(dict): ''' Class to facilitate working with a list of EPICS PVs. Inherits from list class to extend the functionality of list. ''' sleepTime = 0.06 # Sleep time maxAttempts = 1000 # Number of attempts to connect this PV def __init__(self, pvNames, varType="double", connectType = "ASYNC" ): ''' Constructor ''' # First call the constructor of super class # to create an empty dictionary. super(EpicsPVSet, self).__init__({}) # Loop through names and create EpicsPV objects. for pvName in pvNames: self[pvName] = EpicsPV( pvName, varType, connectType ) return def destroy(self): ''' Disconnect all PVs and clear the dictionary ''' self.disconnect() self.clear() return def connect(self): ''' Connect the PVs ''' for pvName in self.keys(): self[pvName].connect() return def disconnect(self): ''' Disconnect all PVs ''' for pvName in self.keys(): self[pvName].disconnect() return def __del__(self): self.disconnect() super(EpicsPVSet, self).__del__() return def __repr__(self): for pvName in self.keys(): msg += self[pvName].__repr__() return msg def __str__(self): for pvName in self.keys(): msg += self[pvName].__str__() return msg def areConnected(self): allConnected = True for pvName in self.keys(): if( not self[pvName].isConnected() ) : return False return True def ensureConnected(self): ''' Wait for all PVs to get connected. Return false if it takes too long. Return True when all PVs are connected ''' if self.areConnected(): return True # Keep trying to connect attemptNumber = 0 while( not self.areConnected() and (attemptNumber < EpicsPVSet.maxAttempts) ): time.sleep( EpicsPVSet.sleepTime ) attemptNumber += 1 # If the number of maximum attempts is exceeded raise an exception if( attemptNumber >= EpicsPVSet.maxAttempts ): return False return True def readValues(self): ''' Read all PVs in this set ''' for pvName in sorted( self.keys() ): self[pvName].readValue() return def setValue(self, value): ''' Set the same value to all PVs in this set ''' if self.ensureConnected(): for pvName in self.keys(): self[pvName].setValue( value ) return def setValueAsync(self, value ): ''' Set single value to all PVs asynchronously ''' for pvName in self.keys(): self[pvName].setValueAsync( value ) return def setValues(self, valueMap): ''' Set values based on the map of value ''' if self.ensureConnected(): for pvName in self.keys(): if pvName in valueMap.keys(): self[pvName].setValue( valueMap[pvName] ) return def setValuesAsync(self, valueMap): ''' Set values based on the map of value ''' for pvName in self.keys(): if pvName in valueMap.keys(): self[pvName].setValueAsync( valueMap[pvName] ) return def printItems(self): for pvName in sorted(self): print self[pvName] return class testEpicsPVSet(Runnable): ''' Class for testing the EpicsPVSet in a separate thread so that when launched from CSS it does not block the main thread ''' def __init__(self, nPV, val ): self.numberOfPVs = nPV self.value = val return def run(self): pvNameList = [] for iPV in range(0, self.numberOfPVs ) : pvNameList.append( "testEpicsPV:" + str(iPV) ) pvSet = EpicsPVSet( pvNameList, "double", "ASYNC" ) pvSet.connect() pvSet.setValueAsync(self.value) pvSet.readValues() # time.sleep(10) pvSet.printItems() pvSet.destroy() return if __name__ == '__main__': thread = Thread( testEpicsPVSet( 2000, 1100000 ) ) thread.start()