from abc import ABC, abstractmethod
from contextlib import contextmanager
import socket
import time
from lightlab import visalogger as logger
from pyvisa.util import from_ascii_block
[docs]class InstrumentSessionBase(ABC):
''' Base class for Instrument sessions, to be inherited and specialized
by VISAObject and PrologixGPIBObject'''
[docs] @abstractmethod
def spoll(self):
pass
[docs] @abstractmethod
def LLO(self):
pass
[docs] @abstractmethod
def LOC(self):
pass
[docs] @abstractmethod
def open(self):
pass
[docs] @abstractmethod
def close(self):
pass
[docs] @abstractmethod
def write(self):
pass
[docs] @abstractmethod
def query(self):
pass
[docs] @abstractmethod
def wait(self):
pass
[docs] @abstractmethod
def clear(self):
pass
[docs] @abstractmethod
def query_raw_binary(self):
pass
[docs] def query_ascii_values(self, message, converter='f', separator=',',
container=list):
''' Taken from pvisa.'''
block = self.query(message)
return from_ascii_block(block, converter, separator, container)
[docs] def instrID(self):
r"""Returns the \*IDN? string"""
return self.query('*IDN?')
@property
@abstractmethod
def timeout(self):
pass
@timeout.setter
@abstractmethod
def timeout(self, newTimeout):
pass
CR = '\r'
LF = '\n'
[docs]class TCPSocketConnection(object):
''' Opens a TCP socket connection, much like netcat.
Usage:
s = TCPSocketConnection('socket-server.school.edu', 1111)
s.connect() # connects to socket and leaves it open
s.send('command') # sends the command through the socket
r = s.recv(1000) # receives a message of up to 1000 bytes
s.disconnect() # shuts down connection
'''
port = None #: socket server's port number
_socket = None
_termination = None
def __init__(self, ip_address, port, timeout=2, termination=LF):
"""
Args:
ip_address (str): hostname or ip address of the socket server
port (int): socket server's port number
timeout (float): timeout in seconds for establishing socket
connection to socket server, default 2.
"""
self.timeout = timeout
self.port = port
self.ip_address = ip_address
self._termination = termination
def _send(self, socket, value):
encoded_value = (('%s' % value) + self._termination).encode('ascii')
sent = socket.sendall(encoded_value)
return sent
def _recv(self, socket, msg_length=2048):
received_value = socket.recv(msg_length)
return received_value.decode('ascii')
[docs] def connect(self):
''' Connects to the socket and leaves the connection open.
If already connected, does nothing.
Returns:
socket object.
'''
if self._socket is None:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
try:
logger.debug("Attempting new connection (timeout = %s)", str(self.timeout))
init_time = time.time()
s.settimeout(self.timeout)
s.connect((self.ip_address, self.port))
except socket.error:
# avoiding shutdown to prevent sending any data to remote socket
# https://stackoverflow.com/questions/13109899/does-socket-become-unusable-after-connect-fails
# s.shutdown(socket.SHUT_WR)
s.close()
del s
logger.error('Cannot connect to resource.')
raise
else:
final_time = time.time()
elapsed_time_ms = 1e3 * (final_time - init_time)
logger.debug("Connected. Time elapsed: %s msec", '{:.2f}'.format(elapsed_time_ms))
self._socket = s
return self._socket
else:
return self._socket
[docs] def disconnect(self):
''' If connected, disconnects and kills the socket.'''
if self._socket is not None:
self._socket.shutdown(socket.SHUT_WR)
self._socket.close()
self._socket = None
[docs] @contextmanager
def connected(self):
''' Context manager for ensuring that the socket is connected while
sending and receiving commands to remote socket.
This is safe to use everywhere, even if the socket is previously connected.
It can also be nested.
This is useful to bundle multiple commands that you desire to be
executed together in a single socket connection, for example:
.. code-block:: python
def query(self, query_msg, msg_length=2048):
with self.connected():
self._send(self._socket, query_msg)
recv = self._recv(self._socket, msg_length)
return recv
'''
previously_connected = (self._socket is not None)
self.connect()
try:
yield self
finally:
if not previously_connected:
self.disconnect()
[docs] def startup(self):
raise NotImplementedError
[docs] def send(self, value):
''' Sends an ASCII string to the socket server. Auto-connects if necessary.
Args:
value (str): value to be sent
'''
with self.connected():
sent = self._send(self._socket, value)
return sent
[docs] def recv(self, msg_length=2048):
''' Receives an ASCII string from the socket server. Auto-connects if necessary.
Args:
msg_length (int): maximum message length.
'''
with self.connected():
recv = self._recv(self._socket, msg_length)
return recv
[docs] def query(self, query_msg, msg_length=2048):
raise NotImplementedError