import numpy as np
from lightlab import logger
from lightlab.util.data import Waveform, FunctionBundle
from .configurable import Configurable
from . import AbstractDriver
# pylint: disable=no-member
[docs]class TekScopeAbstract(Configurable, AbstractDriver):
'''
General class for several Tektronix scopes, including
* `DPO 4034 <http://websrv.mece.ualberta.ca/electrowiki/images/8/8b/MSO4054_Programmer_Manual.pdf>`_
* `DPO 4032 <http://websrv.mece.ualberta.ca/electrowiki/images/8/8b/MSO4054_Programmer_Manual.pdf>`_
* `DSA 8300 <http://download.tek.com/manual/DSA8300-Programmer-Manual-077057006.pdf>`_
* `TDS 6154C <http://www.tek.com/sites/tek.com/files/media/media/resources/55W_14873_9.pdf>`_
The main method is :meth:`acquire`, which takes and returns a :class:`~Waveform`.
Todo:
These behave differently. Be more explicit about sample mode::
timebaseConfig(avgCnt=1)
acquire([1])
acquire([1], avgCnt=1)
Does DPO support sample mode at all?
'''
# This should be overloaded by the particular driver
totalChans = None
_recLenParam = None
_clearBeforeAcquire = None
_measurementSourceParam = None
_runModeParam = None
_runModeSingleShot = None
_yScaleParam = None
[docs] def startup(self):
# Make sure sampling and data transferring are in a consistent state
initNpts = self.getConfigParam(self._recLenParam)
self.acquire(nPts=initNpts)
[docs] def timebaseConfig(self, avgCnt=None, duration=None, position=None, nPts=None):
''' Timebase and acquisition configure
Args:
avgCnt (int): averaging done by the scope
duration (float): time, in seconds, for data to be acquired
position (float): trigger delay
nPts (int): number of samples taken
Returns:
(dict) The present values of all settings above
'''
if avgCnt is not None and avgCnt > 1:
self.setConfigParam('ACQUIRE:NUMAVG', avgCnt, forceHardware=True)
if duration is not None:
self.setConfigParam('HORIZONTAL:MAIN:SCALE', duration / 10)
if position is not None:
self.setConfigParam('HORIZONTAL:MAIN:POSITION', position)
if nPts is not None:
self.setConfigParam(self._recLenParam, nPts)
self.setConfigParam('DATA:START', 1)
self.setConfigParam('DATA:STOP', nPts)
presentSettings = dict()
presentSettings['avgCnt'] = self.getConfigParam('ACQUIRE:NUMAVG', forceHardware=True)
presentSettings['duration'] = self.getConfigParam('HORIZONTAL:MAIN:SCALE', forceHardware=True)
presentSettings['position'] = self.getConfigParam('HORIZONTAL:MAIN:POSITION', forceHardware=True)
presentSettings['nPts'] = self.getConfigParam(self._recLenParam, forceHardware=True)
return presentSettings
[docs] def acquire(self, chans=None, timeout=None, **kwargs):
''' Get waveforms from the scope.
If chans is None, it won't actually trigger, but it will configure.
If unspecified, the kwargs will be derived from the previous state of the scope.
This is useful if you want to play with it in lab while working with this code too.
Args:
chans (list): which channels to record at the same time and return
avgCnt (int): number of averages. special behavior when it is 1
duration (float): window width in seconds
position (float): trigger delay
nPts (int): number of sample points
timeout (float): time to wait for averaging to complete in seconds
If it is more than a minute, it will do a test first
Returns:
list[Waveform]: recorded signals
'''
self.timebaseConfig(**kwargs)
if chans is None:
return
for c in chans:
if c > self.totalChans:
raise Exception('Received channel: ' + str(c) +
'. Max channels of this scope is ' + str(self.totalChans))
# Channel select
for ich in range(1, 1 + self.totalChans):
thisState = 1 if ich in chans else 0
self.setConfigParam('SELECT:CH' + str(ich), thisState)
isSampling = kwargs.get('avgCnt', 0) == 1
self._setupSingleShot(isSampling)
self._triggerAcquire(timeout=timeout)
wfms = [None] * len(chans)
for i, c in enumerate(chans):
vRaw = self.__transferData(c)
t, v = self.__scaleData(vRaw)
# Optical modules might produce 'W' instead of 'V'
unit = self.__getUnit()
wfms[i] = Waveform(t, v, unit=unit)
return wfms
def _setupSingleShot(self, isSampling, forcing=False):
''' Set up a single shot acquisition.
Not running continuous, and
acquire mode set SAMPLE/AVERAGE
Subclasses usually have additional settings to set here.
Args:
isSampling (bool): is it in sampling (True) or averaging (False) mode
forcing (bool): if False, trusts that no manual changes were made, except to run continuous/RUNSTOP
Todo:
Missing DPO trigger source setting.
Should we force it when averaging?
Probably not because it could be CH1, CH2, AUX.
'''
self.run(False)
self.setConfigParam('ACQUIRE:MODE',
'SAMPLE' if isSampling else 'AVERAGE',
forceHardware=forcing)
def _triggerAcquire(self, timeout=None):
''' Sends a signal to the scope to wait for a trigger event.
Waits until acquisition completes or timeout (in seconds).
If timeout is very long, it will try a test first
'''
if timeout is None:
timeout = self.timeout / 1e3
if timeout > 60:
logger.warning('Long timeout %s specified, testing', timeout)
old_avgCnt = self.timebaseConfig()['avgCnt']
self.timebaseConfig(avgCnt=2)
self._triggerAcquire(timeout=10)
logger.warning('Test succeeded. Doing long average now')
self.timebaseConfig(avgCnt=old_avgCnt)
if self._clearBeforeAcquire:
self.write('ACQUIRE:DATA:CLEAR') # clear out average history
self.write('ACQUIRE:STATE 1') # activate the trigger listener
# Bus and entire program stall until acquisition completes. Maximum of 30 seconds
self.wait(int(timeout * 1e3))
def __transferData(self, chan):
''' Returns the raw data pulled from the scope as time (seconds) and voltage (Volts)
Args:
chan (int): one channel at a time
Returns:
:mod:`data.Waveform`: a time, voltage paired signal
Todo:
Make this binary transfer to go even faster
'''
chStr = 'CH' + str(chan)
self.setConfigParam('DATA:ENCDG', 'ASCII')
self.setConfigParam('DATA:SOURCE', chStr)
voltRaw = self.query_ascii_values('CURV?')
return voltRaw
def __scaleData(self, voltRaw):
''' Scale to second and voltage units.
DSA and DPO are very annoying about treating ymult and yscale differently.
TDS uses ymult not yscale
Args:
voltRaw (ndarray): what is returned from ``__transferData``
Returns:
(ndarray): time in seconds, centered at t=0 regardless of timebase position
(ndarray): voltage in volts
Notes:
The formula for real voltage should be (Y - YOFF) * YSCALE + YZERO.
The Y represents the position of the sampled point on-screen,
YZERO, the reference voltage, YOFF, the offset position, and
YSCALE, the conversion factor between position and voltage.
'''
get = lambda param: float(self.getConfigParam('WFMOUTPRE:' + param, forceHardware=True))
voltage = (np.array(voltRaw) - get('YOFF')) \
* get(self._yScaleParam) \
+ get('YZERO')
timeDivision = float(self.getConfigParam('HORIZONTAL:MAIN:SCALE', forceHardware=True))
time = np.linspace(-1, 1, len(voltage)) / 2 * timeDivision * 10
return time, voltage
def __getUnit(self):
''' Gets the unit of the waveform as a string.
Normally, this will be '"V"', which can be converted to 'V'
'''
yunit_query = self.getConfigParam('WFMOUTPRE:YUNIT', forceHardware=True)
return yunit_query.replace('"', '')
[docs] def wfmDb(self, chan, nWfms, untriggered=False):
''' Transfers a bundle of waveforms representing a signal database. Sample mode only.
Configuration such as position, duration are unchanged, so use an acquire(None, ...) call to set them up
Args:
chan (int): currently this only works with one channel at a time
nWfms (int): how many waveforms to acquire through sampling
untriggered (bool): if false, temporarily puts scope in free run mode
Returns:
(FunctionBundle(Waveform)): all waveforms acquired
'''
bundle = FunctionBundle()
with self.tempConfig('TRIGGER:SOURCE',
'FREERUN' if untriggered else 'EXTDIRECT'):
for _ in range(nWfms):
bundle.addDim(self.acquire([chan], avgCnt=1)[0]) # avgCnt=1 sets it to sample mode
return bundle
[docs] def run(self, continuousRun=True):
''' Sets the scope to continuous run mode, so you can look at it in lab,
or to single-shot mode, so that data can be acquired
Args:
continuousRun (bool)
'''
self.setConfigParam(self._runModeParam,
'RUNSTOP' if continuousRun else self._runModeSingleShot,
forceHardware=True)
if continuousRun:
self.setConfigParam('ACQUIRE:STATE', 1, forceHardware=True)
[docs] def setMeasurement(self, measIndex, chan, measType):
'''
Args:
measIndex (int): used to refer to this measurement itself. 1-indexed
chan (int): the channel source of the measurement.
measType (str): can be 'PK2PK', 'MEAN', etc.
'''
if measIndex == 0:
raise ValueError('measIndex is 1-indexed')
measSubmenu = 'MEASUREMENT:MEAS' + str(measIndex) + ':'
self.setConfigParam(measSubmenu + self._measurementSourceParam, 'CH' + str(chan))
self.setConfigParam(measSubmenu + 'TYPE', measType.upper())
self.setConfigParam(measSubmenu + 'STATE', 1)
[docs] def measure(self, measIndex):
'''
Args:
measIndex (int): used to refer to this measurement itself. 1-indexed
Returns:
(float)
'''
measSubmenu = 'MEASUREMENT:MEAS' + str(measIndex) + ':'
return float(self.getConfigParam(measSubmenu + 'VALUE', forceHardware=True))
[docs] def autoAdjust(self, chans):
''' Adjusts offsets and scaling so that waveforms are not clipped '''
# Save the current measurement status. They will be restored at the end.
self.saveConfig(dest='+autoAdjTemp', subgroup='MEASUREMENT')
for ch in chans:
chStr = 'CH' + str(ch)
# Set up measurements
self.setMeasurement(1, ch, 'pk2pk')
self.setMeasurement(2, ch, 'mean')
for _ in range(100):
# Acquire new data
self.acquire(chans=[ch], avgCnt=1)
# Put measurements into measResult
pk2pk = self.measure(1)
mean = self.measure(2)
span = float(self.getConfigParam(chStr + ':SCALE'))
offs = float(self.getConfigParam(chStr + ':OFFSET'))
# Check if scale is correct within the tolerance
newSpan = None
newOffs = None
if pk2pk < 0.7 * span:
newSpan = pk2pk / 0.75
elif pk2pk > 0.8 * span:
newSpan = 2 * span
if newSpan < 0.1 or newSpan > 100:
raise Exception('Scope channel ' + chStr + ' could not be adjusted.')
# Check if offset is correct within the tolerance
if abs(mean) > 0.05 * span:
newOffs = offs - mean
# If we didn't set the new variables, then we're good to go
if newSpan is not None and newOffs is not None:
break
# Adjust settings
self.setConfigParam(chStr + ':SCALE', newSpan / 10)
self.setConfigParam(chStr + ':OFFSET', newOffs)
# Recover the measurement setup from before adjustment
self.loadConfig(source='+autoAdjTemp', subgroup='MEASUREMENT')
self.config.pop('autoAdjTemp')
# pylint: enable=no-member