Source code for lightlab.equipment.visa_bases.visa_object

import visa as pyvisa
import time
from lightlab import visalogger as logger
from .driver_base import InstrumentSessionBase

OPEN_RETRIES = 5

CR = '\r'
LF = '\n'


[docs]class VISAObject(InstrumentSessionBase): ''' Abstract class for something that communicates via messages (GPIB/USB/Serial/TCPIP/etc.). It handles message-based sessions in a way that provides a notion of object permanence to the connection with a particular address. It acts like a ``pyvisa`` message-based session, but it is not a subclass; it is a wrapper. It only contains one (at at time). That means VISAObject can offer extra opening, closing, session management, and error reporting features. This class relies on pyvisa to work ''' mbSession = None resMan = None _open_retries = 0 _termination = CR + LF __timeout = None def __init__(self, address=None, tempSess=False): ''' Args: tempSess (bool): If True, the session is opened and closed every time there is a command address (str): The full visa address ''' self.tempSess = tempSess self.resMan = None self.mbSession = None self.address = address self._open_retries = 0 self.__timeout = None
[docs] def open(self): ''' Open connection with 5 retries. ''' if self.mbSession is not None: return if self.address is None: raise RuntimeError("Attempting to open connection to unknown address.") if self.resMan is None: self.resMan = pyvisa.ResourceManager() try: self.mbSession = self.resMan.open_resource(self.address) self.mbSession.write_termination = self.termination if not self.tempSess: logger.debug('Opened %s', self.address) except pyvisa.VisaIOError as err: logger.warning('There was a problem opening the VISA %s... Error code: %s', self.address, err.error_code) if self._open_retries < OPEN_RETRIES: self._open_retries += 1 time.sleep(0.5 * self._open_retries) logger.warning('Trying again... (try = %s/%s)', self._open_retries, OPEN_RETRIES) self.open() else: logger.error(err) raise else: if self._open_retries != 0: logger.warning('Found it!') self._open_retries = 0
[docs] def close(self): if self.mbSession is None: return try: self.mbSession.close() except pyvisa.VisaIOError: logger.error('There was a problem closing the VISA %s', self.address) raise self.mbSession = None if not self.tempSess: logger.debug('Closed %s', self.address)
[docs] def write(self, writeStr): try: self.open() try: self.mbSession.write(writeStr) except Exception: logger.error('Problem writing to %s', self.address) raise logger.debug('%s - W - %s', self.address, writeStr) finally: if self.tempSess: self.close()
[docs] def query(self, queryStr, withTimeout=None): retStr = None try: self.open() logger.debug('%s - Q - %s', self.address, queryStr) toutOrig = self.timeout try: if withTimeout is not None: self.timeout = withTimeout retStr = self.mbSession.query(queryStr) if withTimeout is not None: self.timeout = toutOrig except Exception: logger.error('Problem querying to %s', self.address) # self.close() raise retStr = retStr.rstrip() logger.debug('Query Read - %s', retStr) finally: if self.tempSess: self.close() return retStr
[docs] def instrID(self): r"""Returns the \*IDN? string""" return self.query('*IDN?')
@property def timeout(self): if self.__timeout is None: if self.mbSession is None: try: self.open() self.__timeout = self.mbSession.get_visa_attribute( pyvisa.constants.VI_ATTR_TMO_VALUE) # None means default finally: if self.tempSess: self.close() else: self.__timeout = self.mbSession.get_visa_attribute( pyvisa.constants.VI_ATTR_TMO_VALUE) # None means default return self.__timeout @timeout.setter def timeout(self, newTimeout): if self.mbSession is None: raise Exception( 'This mbSession is None. Perhaps you are setting timeout when the session is closed temporarily') self.mbSession.set_visa_attribute( pyvisa.constants.VI_ATTR_TMO_VALUE, newTimeout) self.__timeout = newTimeout
[docs] def wait(self, bigMsTimeout=10000): self.query('*OPC?', withTimeout=bigMsTimeout)
[docs] def LLO(self): raise NotImplementedError()
[docs] def LOC(self): raise NotImplementedError()
[docs] def clear(self): if self.mbSession is None: try: self.open() self.mbSession.clear() finally: self.close() else: self.mbSession.clear()
[docs] def query_raw_binary(self): raise NotImplementedError()
[docs] def spoll(self): raise NotImplementedError()
@property def termination(self): if self.mbSession is not None: self._termination = self.mbSession.write_termination return self._termination @termination.setter def termination(self, value): if value in (CR, LF, CR + LF, ''): self._termination = value else: raise ValueError("Termination must be one of these: CR, CRLF, LR, ''") if self.mbSession is not None: self.mbSession.write_termination = value