''' This module provides an interface for instruments, hosts and benches in the lab.
'''
import os
import platform
from uuid import getnode as get_mac # https://stackoverflow.com/questions/159137/getting-mac-address
from contextlib import contextmanager
from lightlab.laboratory import Node, typed_property, TypedList
from lightlab.equipment.visa_bases import VISAObject, DefaultDriver
from lightlab.util.data import mangle
from lightlab import logger
import pyvisa
[docs]class Host(Node):
""" Computer host, from which GPIB/VISA commands are issued.
"""
_name = None
hostname = None
mac_address = None
os = "linux-ubuntu" # linux-ubuntu, linux-centos, windows, mac etc.
__cached_list_resources_info = None
__cached_gpib_instrument_list = None
def __init__(self, name='Unnamed Host', hostname=None, **kwargs):
if hostname is None:
logger.warning("Hostname not set. isLive and list_resources not functional.")
self.hostname = hostname
self._name = name
super().__init__(**kwargs)
@property
def name(self):
return self._name
@property
def instruments(self):
from lightlab.laboratory.state import lab
return TypedList(Instrument, *list(filter(lambda x: x.host == self, lab.instruments)), read_only=True)
def __contains__(self, item):
instrument_search = item in self.instruments
if not instrument_search:
logger.info("%s not found in %s's instruments.", item, self)
return instrument_search
[docs] def isLive(self):
''' Pings the system and returns if it is alive.
'''
if self.hostname is not None:
logger.debug("Pinging %s...", self.hostname)
response = os.system("ping -c 1 {}".format(self.hostname))
if response != 0:
logger.warning("%s is not reachable via ping.", self)
return response == 0
else:
logger.warning("Hostname not set. Unable to ping.")
return False
def _visa_prefix(self):
''' The prefix necessary for connecting to remote visa servers.
Ex. 'visa://remote-server.university.edu/'
Returns:
(str)
'''
return 'visa://{}/'.format(self.hostname)
[docs] def gpib_port_to_address(self, port, board=0):
'''
Args:
port (int): The port on the GPIB bus of this host
board (int): For hosts with multiple GPIB busses
Returns:
(str): the address that can be used in an initializer
'''
localSerialStr = 'GPIB{}::{}::INSTR'.format(board, port)
return self._visa_prefix() + localSerialStr
[docs] def list_resources_info(self, use_cached=True):
""" Executes a query to the NI Visa Resource manager and
returns a list of instruments connected to it.
Args:
use_cached (bool): query only if not cached, default True
Returns:
list: list of `pyvisa.highlevel.ResourceInfo` named tuples.
"""
if self.__cached_list_resources_info is None:
use_cached = False
if use_cached:
return self.__cached_list_resources_info
else:
list_query = self._visa_prefix() + "?*::INSTR"
rm = pyvisa.ResourceManager()
logger.debug("Caching resource list in %s", self)
self.__cached_list_resources_info = rm.list_resources_info(
query=list_query)
return self.__cached_list_resources_info
[docs] def list_gpib_resources_info(self, use_cached=True):
""" Like :meth:`list_resources_info`, but only returns gpib
resources.
Args:
use_cached (bool): query only if not cached, default True.
Returns:
(list): list of ``pyvisa.highlevel.ResourceInfo`` named tuples.
"""
return {resource_name: resource
for resource_name, resource in self.list_resources_info(use_cached=use_cached).items()
if resource.interface_type == pyvisa.constants.InterfaceType.gpib}
[docs] def get_all_gpib_id(self, use_cached=True):
""" Queries the host for all connected GPIB instruments, and
queries their identities with ``instrID()``.
Warning: This might cause your instrument to lock into remote mode.
Args:
use_cached (bool): query only if not cached, default True
Returns:
dict: dictionary with gpib addresses as keys and \
identity strings as values.
"""
gpib_resources = self.list_gpib_resources_info(use_cached=use_cached)
if self.__cached_gpib_instrument_list is None:
use_cached = False
if use_cached:
return self.__cached_gpib_instrument_list
else:
gpib_instrument_list = dict()
logger.debug("Caching GPIB instrument list in %s", self)
for gpib_address in gpib_resources.keys():
visa_object = VISAObject(gpib_address, tempSess=True)
try:
instr_id = visa_object.instrID()
gpib_instrument_list[gpib_address] = instr_id
except pyvisa.VisaIOError as err:
logger.error(err)
self.__cached_gpib_instrument_list = gpib_instrument_list
return gpib_instrument_list
[docs] def findGpibAddressById(self, id_string_search, use_cached=True):
""" Finds a gpib address using :meth:`get_all_gpib_id`, given
an identity string.
Args:
id_string_search (str): identity string
use_cached (bool): query only if not cached, default True
Returns:
str: address if found.
Raises:
NotFoundError: If the instrument is not found.
"""
gpib_ids = self.get_all_gpib_id(use_cached=use_cached)
for gpib_address, id_string in gpib_ids.items():
if id_string_search == id_string:
logger.info("Found %s in %s.", id_string_search, gpib_address)
return gpib_address
logger.warning("%s not found in %s", id_string_search, self)
raise NotFoundError(
"{} not found in {}".format(id_string_search, self))
[docs] def addInstrument(self, *instruments):
r""" Adds an instrument to lab.instruments if it is not already present.
Args:
\*instruments (Instrument): instruments
"""
from lightlab.laboratory.state import lab
for instrument in instruments:
if instrument not in lab.instruments:
lab.instruments.append(instrument)
instrument.host = self
[docs] def removeInstrument(self, *instruments):
r""" Disconnects the instrument from the host
Args:
\*instruments (Instrument): instruments
Todo:
Remove all connections
"""
for instrument in instruments:
if type(instrument) is str:
logger.warning('Cannot remove by name string. Use the object')
instrument.host = None
[docs] def checkInstrumentsLive(self):
""" Checks whether all instruments are "live".
Instrument status is checked with the :meth:`Instrument.isLive` method
Returns:
(bool): True if all instruments are live, False otherwise
"""
all_live = True
for instrument in self.instruments:
if instrument.isLive():
logger.info("%s is live.", instrument)
else:
all_live = False
return all_live
def __str__(self):
return "Host {}".format(self.name)
[docs] def display(self):
""" Displays the host's instrument table in a nice format."""
lines = ["{}".format(self)]
lines.append("===========")
lines.append("Instruments")
lines.append("===========")
if len(self.instruments) > 0:
lines.extend([" {} ({})".format(str(instrument), str(instrument.host))
for instrument in self.instruments])
else:
lines.append(" No instruments.")
lines.append("***")
print("\n".join(lines))
[docs]class LocalHost(Host):
def __init__(self, name=None):
if name is None:
name = 'localhost'
super().__init__(name=name, hostname=platform.node())
mac = get_mac()
# converts 90520734586583 to 52:54:00:3A:D6:D7
self.mac_address = ':'.join(("%012X" % mac)[i:i + 2] for i in range(0, 12, 2))
self.os = platform.system()
def _visa_prefix(self):
''' How the visa server is specified. If this is a local host,
then there is no visa:// prefix
Returns:
(str)
'''
return ''
[docs] def isLive(self):
return True
[docs]class Bench(Node):
""" Represents an experiment bench for the purpose of facilitating
its location in lab.
"""
name = None
def __init__(self, name, *args, **kwargs):
self.name = name
super().__init__(*args, **kwargs)
def __contains__(self, item):
if isinstance(item, Instrument):
instrument_search = item in self.instruments
if not instrument_search:
logger.info("%s not found in %s's instruments.", item, self)
return instrument_search
elif isinstance(item, Device):
device_search = item in self.devices
if not device_search:
logger.info("%s not found in %s's devices.", item, self)
return device_search
else:
logger.debug("%s is neither an Instrument nor a Device", item)
return False
@property
def instruments(self):
from lightlab.laboratory.state import lab
return TypedList(Instrument, *list(filter(lambda x: x.bench == self, lab.instruments)), read_only=True)
@property
def devices(self):
from lightlab.laboratory.state import lab
return TypedList(Device, *list(filter(lambda x: x.bench == self, lab.devices)))
[docs] def addInstrument(self, *instruments):
r""" Adds an instrument to lab.instruments if it is not already
present and connects to the host.
Args:
\*instruments (Instrument): instruments
"""
from lightlab.laboratory.state import lab
for instrument in instruments:
if instrument not in lab.instruments:
lab.instruments.append(instrument)
instrument.bench = self
[docs] def removeInstrument(self, *instruments):
r""" Detaches the instrument from the bench.
Args:
\*instruments (Instrument): instruments
Todo:
Remove all connections
"""
for instrument in instruments:
if type(instrument) is str:
raise TypeError('Cannot remove by name string. Use the object')
instrument.bench = None
[docs] def addDevice(self, *devices):
r""" Adds a device to lab.devices if it is not already present
and places it in the bench.
Args:
\*devices (Device): devices
"""
from lightlab.laboratory.state import lab
for device in devices:
if not isinstance(device, Device):
raise TypeError(f"{device} is not an instance of Device.")
if device not in lab.devices:
lab.devices.append(device)
device.bench = self
[docs] def removeDevice(self, *devices):
r""" Detaches the device from the bench.
Args:
\*devices (Device): devices
Todo:
Remove all connections
"""
# TODO Remove all connections
for device in devices:
if type(device) is str:
raise TypeError('Cannot remove by name string. Use the object')
device.bench = None
[docs] def display(self):
""" Displays the bench's table in a nice format."""
lines = ["{}".format(self)]
lines.append("===========")
lines.append("Instruments")
lines.append("===========")
if len(self.instruments) > 0:
lines.extend([" {} ({})".format(str(instrument), str(instrument.host))
for instrument in self.instruments])
else:
lines.append(" No instruments.")
lines.append("=======")
lines.append("Devices")
lines.append("=======")
if len(self.devices) > 0:
lines.extend([" {}".format(str(device)) for device in self.devices])
else:
lines.append(" No devices.")
lines.append("***")
print("\n".join(lines))
def __str__(self):
return "Bench {}".format(self.name)
[docs]class Instrument(Node):
""" Represents an instrument in lab.
This class stores information about instruments, for the purpose of
facilitating verifying whether it is connected to the correct devices.
Driver feedthrough
Methods, properties, and even regular attributes
that are in :py:data:`essential_attributes` of the class
will get/set/call through to the driver object.
Do not instantiate directly
Calling a **VISAInstrumentDriver** class will return an **Instrument** object
Short example::
osa = Apex_AP2440A_OSA(name='foo', address='NULL')
osa.spectrum()
Long example
:ref:`/ipynbs/Others/labSetup.ipynb`
Detailed testing
:py:func:`~tests.test_abstractDrivers.test_driver_init`
"""
_driver_class = None
__driver_object = None
#: Complete Visa address of the instrument (e.g. :literal:`visa\://hostname/GPIB0::1::INSTR`)
address = None
_id_string = None
_name = None
_bench = None
_host = None
ports = None #: list(str) Port names of instruments. To be used with labstate connections.
essentialMethods = ['startup'] #: list of methods to be fed through the driver
essentialProperties = [] #: list of properties to be fed through the driver
optionalAttributes = [] #: list of optional attributes to be fed through the driver
def __init__(self, name="Unnamed Instrument", id_string=None, address=None, **kwargs):
self.bench = kwargs.pop("bench", None)
self.host = kwargs.pop("host", None)
self.ports = kwargs.pop("ports", dict())
self.address = address
self.__driver_object = kwargs.pop("driver_object", None)
if self.__driver_object is not None:
self._driver_class = self.__driver_object.__class__
self._name = name
self._id_string = id_string
super().__init__(**kwargs)
def __dir__(self):
''' For autocompletion in ipython '''
return super().__dir__() + self.essentialProperties \
+ self.essentialMethods + self.implementedOptionals
@property
def implementedOptionals(self):
implementedOptionals = list()
for opAttr in self.optionalAttributes:
if hasattr(self._driver_class, opAttr):
implementedOptionals.append(opAttr)
return implementedOptionals
# These control feedthroughs to the driver
def __getattr__(self, attrName):
errorText = f"'{str(self)}' has no attribute '{attrName}'"
if attrName in self.essentialProperties \
+ self.essentialMethods \
+ self.implementedOptionals:
return getattr(self.driver, attrName)
# Time to fail
if attrName in self.optionalAttributes:
errorText += '\nThis is an optional attribute of {} '.format(type(self).__name__)
errorText += 'not implemented by this particular driver'
elif hasattr(self._driver_class, attrName) or hasattr(self.__driver_object, attrName):
errorText += '\nIt looks like you are trying to access a low-level attribute'
errorText += '\nUse ".driver.{}" to get it'.format(attrName)
# This was put here to match normal behavior while trying to
# set obj.__mangled_variable = 'something'
try:
return self.__dict__[mangle(attrName, self.__class__.__name__)]
except KeyError:
raise AttributeError(errorText)
def __setattr__(self, attrName, newVal):
if attrName in self.essentialProperties \
+ self.essentialMethods \
+ self.implementedOptionals:
setattr(self.driver, attrName, newVal)
else:
if attrName == 'address': # Reinitialize the driver
if self.__driver_object is not None:
self.__driver_object.close()
self.__driver_object.address = newVal
super().__setattr__(mangle(attrName, self.__class__.__name__), newVal)
def __delattr__(self, attrName):
if attrName in self.essentialProperties + self.essentialMethods: # or methods
self.driver.__delattr__(attrName)
else:
try:
del self.__dict__[mangle(attrName, self.__class__.__name__)]
except KeyError:
super().__delattr__(attrName)
# These control contextual behavior. They are used by DualInstrument
[docs] def hardware_warmup(self):
""" Called before the beginning of an experiment.
Typical warmup procedures include RESET gpib commands.
"""
pass
[docs] def hardware_cooldown(self):
""" Called after the end of an experiment.
Typical cooldown procedures include laser turn-off, or orderly
wind-down of current etc.
"""
pass
[docs] @contextmanager
def warmedUp(self):
''' A context manager that warms up and cools down in a "with" block
Usage:
.. code-block:: python
with instr.warmedUp() as instr: # warms up instrument
instr.doStuff()
raise Exception("Interrupting experiment")
# cools down instrument, even in the event of exception
'''
try:
self.hardware_warmup()
yield self
finally:
self.hardware_cooldown()
# These control properties
@property
def driver_class(self):
""" Class of the actual equipment driver
(from :mod:`lightlab.equipment.lab_instruments`)
This way the object knows how to instantiate a driver instance
from the labstate.
"""
if self._driver_class is None:
logger.warning("Using default driver for %s.", self)
return DefaultDriver
else:
return self._driver_class
@property
def driver_object(self):
""" Instance of the equipment driver."""
if self.__driver_object is None:
try:
kwargs = self.driver_kwargs
except AttributeError: # Fall back to the jank version where we try to guess what is important
kwargs = dict()
for kwarg in ["useChans", "elChans", "dfbChans", "stateDict", "sourceMode"]:
try:
kwargs[kwarg] = getattr(self, kwarg)
except AttributeError:
pass
kwargs['directInit'] = True
self.__driver_object = self.driver_class( # pylint: disable=not-callable
name=self.name, address=self.address, **kwargs)
return self.__driver_object
@property
def driver(self):
""" Alias of :meth:`driver_object`."""
return self.driver_object
bench = typed_property(Bench, "_bench")
host = typed_property(Host, "_host")
@property
def name(self):
""" (property) Instrument name (can only set during initialization) """
return self._name
@property
def id_string(self):
"""
The id_string should match the value returned by
``self.driver.instrID()``, and is checked by the command
``self.isLive()`` in order to authenticate that the intrument
in that address is the intended one.
"""
return self._id_string
def __str__(self):
return "{}".format(self.name)
def __repr__(self):
return "<{} name={}, address={}, id={}>".format(self.__class__.__name__,
self.name, self.address, id(self))
[docs] def display(self):
""" Displays the instrument's info table in a nice format."""
lines = ["{}".format(self)]
lines.append("Bench: {}".format(self.bench))
lines.append("Host: {}".format(self.host))
lines.append("address: {}".format(self.address))
lines.append("id_string: {}".format(self.id_string))
if not self.id_string:
lines.append("The id_string should match the value returned by"
" self.driver.instrID(), and is checked by the command"
" self.isLive() in order to authenticate that the intrument"
" in that address is the intended one.")
lines.append("driver_class: {}".format(self.driver_class.__name__))
lines.append("=====")
lines.append("Ports")
lines.append("=====")
if len(self.ports) > 0:
lines.extend([" {}".format(str(port)) for port in self.ports])
else:
lines.append(" No ports.")
if hasattr(self, 'driver_kwargs'):
lines.append("=====")
lines.append("Driver kwargs")
lines.append("=====")
for k, v in self.driver_kwargs.items():
lines.append(" {} = {}".format(str(k), str(v)))
lines.append("***")
print("\n".join(lines))
[docs] def isLive(self):
""" Attempts VISA connection to instrument, and checks whether
:meth:`~lightlab.equipment.visa_bases.visa_object.instrID`
matches :data:`id_string`.
Produces a warning if it is live but the id_string is wrong.
Returns:
(bool): True if "live", False otherwise.
"""
try:
driver = self.driver_object
query_id = driver.instrID()
logger.info("Found %s in %s.", self.name, self.address)
if self.id_string is not None:
if self.id_string == query_id:
logger.info("id_string of %s is accurate", self.name)
return True
else:
logger.warning("%s: %s, expected %s", self.address,
query_id, self.id_string)
return False
else:
logger.debug("Cannot authenticate %s in %s.",
self.name, self.address)
return True
except pyvisa.VisaIOError as err:
logger.warning(err)
return False
[docs] def connectHost(self, new_host):
""" Sets/changes instrument's host.
Equivalent to ``self.host = new_host``
"""
self.host = new_host
[docs] def placeBench(self, new_bench):
""" Sets/changes instrument's bench.
Equivalent to ``self.bench = new_bench``
"""
self.bench = new_bench
# @classmethod
# def fromGpibAddress(cls, gpib_address):
# visa_object = VISAObject(gpib_address, tempSess=True)
# # TODO untreated error when there is no device with that address!
# id_string = visa_object.instrID()
# return cls(id_string, gpib_address=gpib_address)
[docs]class MockInstrument(Instrument):
def __getattr__(self, attrName):
def noop(*args, **kwargs):
raise AttributeError("Attempted to call method ('{}') of a mock Instrument.".format(attrName))
return noop
[docs]class NotFoundError(RuntimeError):
""" Error thrown when instrument is not found
"""
pass
[docs]class Device(Node):
""" Represents a device in lab.
Only useful for documenting the experiment.
Todo:
Add equality function
"""
name = None #: device name
ports = None #: list(str) port names
_bench = None
def __init__(self, name, **kwargs):
self.name = name
self.ports = kwargs.pop("ports", list())
self.bench = kwargs.pop("bench", None)
super().__init__(**kwargs)
bench = typed_property(Bench, '_bench')
def __str__(self):
return "Device {}".format(self.name)
[docs] def display(self):
""" Displays the device's info table in a nice format."""
lines = ["{}".format(self)]
lines.append("Bench: {}".format(self.bench))
lines.append("=====")
lines.append("Ports")
lines.append("=====")
if len(self.ports) > 0:
lines.extend([" {}".format(str(port)) for port in self.ports])
else:
lines.append(" No ports.")
lines.append("***")
print("\n".join(lines))