from . import VISAInstrumentDriver
from lightlab.equipment.abstract_drivers import MultiModalSource, MultiChannelSource
from lightlab.equipment.visa_bases.driver_base import TCPSocketConnection
from lightlab.laboratory.instruments import CurrentSource
import numpy as np
import time
import socket
from lightlab.util.io import RangeError
from lightlab import visalogger as logger
[docs]class NI_PCI_6723(VISAInstrumentDriver, MultiModalSource, MultiChannelSource):
''' Primarily employs abstract classes. Follow the bases for more information
:class:`~lightlab.equipment.lab_instruments.VISAInstrumentDriver`
provides communication to the board
:class:`~lightlab.equipment.abstract_drivers.MultiModalSource`
provides unit support and range checking
:py:class:`~lightlab.equipment.abstract_drivers.MultiChannelSource`
provides **notion of state** (stateDict) and channel support
Usage: :ref:`/ipynbs/Hardware/CurrentSources-NI.ipynb`
'''
instrument_category = CurrentSource
# The natural unit is volts so don't confuse
supportedModes = MultiModalSource.supportedModes - {'baseunit'}
baseUnitBounds = [0, 10]
baseToVoltCoef = 1
v2maCoef = 4 # current (milliamps) = v2maCoef * voltage (volts)
exceptOnRangeError = True # If False, it will constrain it and print a warning
maxChannel = 32 # number of dimensions that the current sources are expecting
targetPort = 16022 # TCPIP server port; charge of an electron (Coulombs)
waitMsOnWrite = 500 # Time to settle after tuning
_tcpsocket = None
MAGIC_TIMEOUT = 30
def __init__(self, name='The current source', address=None, useChans=None, **kwargs):
kwargs['tempSess'] = kwargs.get('tempSess', True)
if 'elChans' in kwargs.keys():
useChans = kwargs.pop('elChans')
self.reinstantiate_session(address, kwargs['tempSess'])
MultiChannelSource.__init__(self, useChans=useChans)
[docs] def reinstantiate_session(self, address, tempSess):
if address is not None:
# should be something like ['TCPIP0', 'xxx.xxx.xxx.xxx', '6501', 'SOCKET']
address_array = address.split("::")
self._tcpsocket = TCPSocketConnection(ip_address=address_array[1],
port=int(address_array[2]),
timeout=self.MAGIC_TIMEOUT,
termination='\r\n')
[docs] def startup(self):
self.off()
[docs] def open(self):
if self.address is None:
raise RuntimeError("Attempting to open connection to unknown address.")
try:
self._tcpsocket.connect()
except socket.error:
self._tcpsocket.disconnect()
raise
[docs] def close(self):
self._tcpsocket.disconnect()
def _query(self, queryStr):
with self._tcpsocket.connected() as s:
s.send(queryStr)
i = 0
old_timeout = s.timeout
s.timeout = self.MAGIC_TIMEOUT
received_msg = ''
while i < 1024: # avoid infinite loop
recv_str = s.recv(1024)
received_msg += recv_str
if recv_str.endswith('\n'):
break
s.timeout = 1
i += 1
s.timeout = old_timeout
return received_msg.rstrip()
[docs] def query(self, queryStr, expected_talker=None):
ret = self._query(queryStr)
if expected_talker is not None:
if ret != expected_talker:
log_function = logger.warning
else:
log_function = logger.debug
log_function("'%s' returned '%s', expected '%s'", queryStr, ret, str(expected_talker))
else:
logger.debug("'%s' returned '%s'", queryStr, ret)
return ret
[docs] def write(self, writeStr, expected_talker=None):
''' The APEX does not deal with write; you have to query to clear the buffer '''
self.query(writeStr, expected_talker)
time.sleep(0.2)
[docs] def instrID(self):
r''' There is no "\*IDN?" command. Instead, test if it is alive,
and then return a reasonable string
'''
self.tcpTest()
return 'Current Source'
[docs] def tcpTest(self, num=2):
print('x = ' + str(num))
# self.open()
ret = self.query('Test: ' + str(num) + ' ' + str(num + .5))
# self.close()
retNum = [0] * len(ret.split())
for i, s in enumerate(ret.split(' ')):
retNum[i] = float(s) + .01
print('[x+1, x+1.5] = ' + str(retNum))
[docs] def setChannelTuning(self, chanValDict, mode, waitTime=None): # pylint: disable=arguments-differ
oldState = self.getChannelTuning(mode)
# Check range and convert to base units
chanBaseDict = dict()
for ch, val in chanValDict.items():
try:
enforced = self.enforceRange(val, mode)
except RangeError as err:
self.off()
raise err
chanBaseDict[ch] = self.val2baseUnit(enforced, mode)
# Change the state
super().setChannelTuning(chanBaseDict)
# Was there a change
if not oldState == self.getChannelTuning(mode):
self.sendToHardware(waitTime)
else:
self.wake()
[docs] def getChannelTuning(self, mode): # pylint: disable=arguments-differ
baseDict = super().getChannelTuning()
return self.baseUnit2val(baseDict, mode)
[docs] def off(self): # pylint: disable=arguments-differ
self.setChannelTuning(dict([ch, 0] for ch in self.stateDict.keys()), 'volt')
[docs] def wake(self):
''' Don't change the value but make sure it doesn't go to sleep after inactivity.
Good for long sweeps
'''
self.sendToHardware(waitTime=0)
[docs] def sendToHardware(self, waitTime=None):
"""Updates current drivers with the present value of tuneState
Converts it to a raw voltage, depending on the mode of the driver
Args:
"""
# First get a complete array in terms of voltage needed by the NI-PCI card
fullVoltageState = np.zeros(self.maxChannel)
for ch, baseVal in self.stateDict.items():
fullVoltageState[ch] = self.baseUnit2val(baseVal, 'volt')
# Then write a TCPIP packet
writeStr = 'Set:'
for v in fullVoltageState:
writeStr += ' ' + str(v)
# self.open()
retStr = self.query(writeStr)
# self.close()
if retStr != 'ACK':
raise RuntimeError('Current driver is angry. Message: \"' + retStr + '\"')
if waitTime is None:
waitTime = self.waitMsOnWrite
logger.debug('Current settling for %s ms', waitTime)
time.sleep(waitTime / 1000)