Source code for lightlab.equipment.visa_bases.prologix_gpib

# prologix patch, to be inserted somewhere in visa_bases
import socket
import time
import re
from lightlab import visalogger as logger
from .driver_base import InstrumentSessionBase, TCPSocketConnection


[docs]class PrologixResourceManager(TCPSocketConnection): '''Controls a Prologix GPIB-ETHERNET Controller v1.2 manual: http://prologix.biz/downloads/PrologixGpibEthernetManual.pdf Basic usage: .. code-block:: python p = PrologixResourceManager('prologix.school.edu') p.connect() # connects to socket and leaves it open p.startup() # configures prologix to communicate via gpib p.send('++addr 23') # talks to address 23 p.send('command value') # sends the command and does not expect to read anything p.query('command') # sends a command but reads stuff back, this might hang if buffer is empty p.disconnect() The problem with the above is that if there is any error with startup, send or query, the disconnect method will not be called. So we coded a decorator called ``connected``, to be used as such: .. code-block:: python p = PrologixResourceManager('prologix.school.edu') with p.connected(): p.startup() p.send('++addr 23') # talks to address 23 p.send('command value') # sends the command and does not expect to read anything p.query('command') # sends a command but reads stuff back If we try to send a message without the decorator, then we should connect and disconnect right before. .. code-block:: python p = PrologixResourceManager('prologix.school.edu') p.send('++addr 23') # opens and close socket automatically .. warning:: If a second socket is opened from the same computer while the first was online, the first socket will stop responding and Prologix will send data to the just-opened socket. .. todo:: Make this class a singleton to mitigate the issue above. ''' # TODO: make this class a singleton: # https://howto.lintel.in/python-__new__-magic-method-explained/ port = 1234 #: port that the Prologix GPIB-Ethernet controller listens to. _socket = None def __init__(self, ip_address, timeout=2): """ Args: ip_address (str): hostname or ip address of the Prologix controller timeout (float): timeout in seconds for establishing socket connection to socket server, default 2. """ self.timeout = timeout self.ip_address = ip_address super().__init__(ip_address, self.port, timeout=timeout, termination='\n')
[docs] def startup(self): ''' Sends the startup configuration to the controller. Just in case it was misconfigured. ''' with self.connected(): self.send('++auto 0') # do not read-after-write self.send('++mode 1') # controller mode self.send('++read_tmo_ms 2000') # timeout in ms self.send('++eos 0') # append CR+LF after every GPIB self.send('++savecfg 0') # Disable saving of configuration parameters in EPROM
[docs] def query(self, query_msg, msg_length=2048): ''' Sends a query and receives a string from the controller. Auto-connects if necessary. Args: query_msg (str): query message. msg_length (int): maximum message length. If the received message does not contain a '\n', it triggers another socket recv command with the same message length. ''' with self.connected(): self._send(self._socket, query_msg) recv = self._recv(self._socket, msg_length) while not recv.endswith('\n'): recv += self._recv(self._socket, msg_length) return recv
def _is_valid_hostname(hostname): '''Validates whether a hostname is valis. abc.example.com''' # from https://stackoverflow.com/questions/2532053/validate-a-hostname-string if hostname[-1] == ".": # strip exactly one dot from the right, if present hostname = hostname[:-1] if len(hostname) > 253: return False labels = hostname.split(".") # the TLD must be not all-numeric if re.match(r"[0-9]+$", labels[-1]): return False allowed = re.compile(r"(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE) return all(allowed.match(label) for label in labels) def _is_valid_ip_address(ip_address): '''Validates whether it is a true ip address, like 10.34.134.13''' try: return socket.gethostbyname(ip_address) == ip_address except (socket.gaierror, UnicodeError): return False return False def _validate_hostname(hostname): if _is_valid_hostname(hostname) or _is_valid_ip_address(hostname): return True else: return False def _sanitize_address(address): '''Takes in an address of the form 'prologix://prologix_ip_address/gpib_primary_address[:gpib_secondary_address]' and returns prologix_ip_address, gpib_primary_address, gpib_secondary_address. If secondary address is not given, gpib_secondary_address = None''' if address.startswith('prologix://'): _, address = address.split('prologix://', maxsplit=1) ip_address, gpib_address = address.split('/', maxsplit=1) if not _validate_hostname(ip_address): raise RuntimeError("invalid ip address: '{}'".format(ip_address)) try: if ':' in gpib_address: gpib_pad, gpib_sad = gpib_address.split(':', maxsplit=1) gpib_pad, gpib_sad = int(gpib_pad), int(gpib_sad) else: gpib_pad, gpib_sad = int(gpib_address), None except ValueError: raise RuntimeError( "invalid gpib format '{}', should be like '10[:0]'".format(gpib_address)) else: raise RuntimeError('invalid address: {}'.format(address)) return ip_address, gpib_pad, gpib_sad
[docs]class PrologixGPIBObject(InstrumentSessionBase): def __init__(self, address=None, tempSess=False): ''' Args: tempSess (bool): If True, the session is opened and closed every time there is a command address (str): The full visa address in the form: prologix://prologix_ip_address/gpib_primary_address:gpib_secondary_address ''' if type(address) != str: raise RuntimeError("Invalid address: {}".format(address)) self.tempSess = tempSess self.address = address self.ip_address, self.gpib_pad, self.gpib_sad = _sanitize_address(address) self._prologix_rm = PrologixResourceManager(self.ip_address) self._open_retries = 0 self.__timeout = 2 # 2 seconds is the default def _prologix_gpib_addr_formatted(self): if self.gpib_sad is None: return '{:d}'.format(self.gpib_pad) else: return '{:d} {:d}'.format(self.gpib_pad, self.gpib_sad) def _prologix_escape_characters(self, string): '''Escapes ESC, +, \n, \r characters with ESC. Refer to Prologix Manual.''' def escape(string, char): return string.replace(char, chr(17) + char) string = escape(string, chr(17)) # this must come first string = escape(string, '+') string = escape(string, '\n') string = escape(string, '\r') return string
[docs] def spoll(self): '''Return status byte of the instrument.''' gpib_addr = self._prologix_gpib_addr_formatted() spoll = self._prologix_rm.query('++spoll {}'.format(gpib_addr)) status_byte = int(spoll.rstrip()) return status_byte
[docs] def LLO(self): '''This command disables front panel operation of the currently addressed instrument.''' self._prologix_rm.send('++llo')
[docs] def LOC(self): '''This command enables front panel operation of the currently addressed instrument.''' self._prologix_rm.send('++loc')
@property def termination(self): r'''Termination GPIB character. Valid options: '\\r\\n', '\\r', '\\n', ''. ''' eos = int(self._prologix_rm.query('++eos').rstrip()) if eos == 0: return '\r\n' elif eos == 1: return '\r' elif eos == 2: return '\n' elif eos == 3: return '' else: raise RuntimeError('unknown termination.') @termination.setter def termination(self, value): eos = None if value == '\r\n': eos = 0 elif value == '\r': eos = 1 elif value == '\n': eos = 2 elif value == '': eos = 3 else: print("Invalid termination: {}".format(repr(value))) if eos is not None: self._prologix_rm.send('++eos {}'.format(eos))
[docs] def open(self): ''' Open connection with instrument. If ``tempSess`` is set to False, please remember to close after use. ''' if not self.tempSess: self._prologix_rm.connect() with self._prologix_rm.connected() as pconn: pconn.startup()
[docs] def close(self): ''' Closes the connection with the instrument. Side effect: disconnects prologix socket controller''' self._prologix_rm.disconnect()
[docs] def write(self, writeStr): with self._prologix_rm.connected() as pconn: pconn.send('++addr {}'.format(self._prologix_gpib_addr_formatted())) pconn.send(self._prologix_escape_characters(writeStr))
[docs] def query(self, queryStr, withTimeout=None): '''Read the unmodified string sent from the instrument to the computer. ''' logger.debug('%s - Q - %s', self.address, queryStr) retStr = self.query_raw_binary(queryStr, withTimeout) logger.debug('Query Read - %s', repr(retStr)) return retStr.rstrip()
[docs] def wait(self, bigMsTimeout=10000): self.query('*OPC?', withTimeout=bigMsTimeout)
[docs] def clear(self): '''This command sends the Selected Device Clear (SDC) message to the currently specified GPIB address.''' with self._prologix_rm.connected() as pconn: pconn.send('++addr {}'.format(self._prologix_gpib_addr_formatted())) pconn.send('++clr')
[docs] def query_raw_binary(self, queryStr, withTimeout=None): '''Read the unmodified string sent from the instrument to the computer. In contrast to query(), no termination characters are stripped. Also no decoding.''' with self._prologix_rm.connected() as pconn: pconn.send('++addr {}'.format(self._prologix_gpib_addr_formatted())) pconn.send(self._prologix_escape_characters(queryStr)) if withTimeout is None: withTimeout = self.timeout # checking if message is available in small increments currenttime = time.time() expirationtime = currenttime + withTimeout while time.time() < expirationtime: # self.timeout = newTimeout status_byte = self.spoll() # MAV indicates that the message is available MAV = (status_byte >> 4) & 1 if MAV == 1: retStr = self._prologix_rm.query('++read eoi') return retStr.rstrip() # ask for the message in small increments time.sleep(0.1) raise RuntimeError('Query timed out')
@property def timeout(self): ''' This timeout is between the user and the instrument. For example, if we did a sweep that should take ~10 seconds but ends up taking longer, you can set the timeout to 20 seconds. ''' return self.__timeout @timeout.setter def timeout(self, newTimeout): if newTimeout < 0: raise ValueError("Timeouts cannot be negative") self.__timeout = newTimeout