Source code for lightlab.util.characterize

''' Timing is pretty important. These functions monitor behavior in various ways with timing considered.
    Included is strobeTest which sweeps the delay between actuate and sense, and monitorVariable for drift
'''

import matplotlib.pyplot as plt
from cycler import cycler
import numpy as np
import time
from IPython import display

from .data import FunctionBundle


[docs]def strobeTest(fActuate, fSense, fReset=None, nPts=10, maxDelay=1, visualize=True): # pylint: disable=W0613 ''' Looks at a sense variable at different delays after calling an actuate function. Good for determining the time needed to wait for settling. Calls each function once per delay point to construct a picture like the strobe experiment, or a sampling scope Args: fActuate (function): no arguments, no return. Called first. fSense (function): no arguments, returns a scalar or np.array. Called after a given delay fReset (function): no arguments, no return. Called after the trial unless None. Usually of the same form as fActuate Returns: (FunctionBundle): fSense values vs. delay ''' fi, ax = plt.subplots(figsize=(12, 7)) delays = np.linspace(0, maxDelay, nPts) # Figure out if sense is scalar or array; wrap accordingly testV = fSense() if np.isscalar(testV): w = 1 else: w = len(testV) fWrapped = lambda: np.reshape(np.array(fSense()), (1, w)) t = np.zeros((nPts, 1)) v = np.zeros((nPts, w)) for it, d in enumerate(delays): t[it] = d if fReset is not None: fReset() time.sleep(maxDelay) fActuate() time.sleep(d) v[it, :] = fWrapped() display.clear_output(wait=True) ax.cla() ax.plot(t[:it + 1], v[:it + 1]) display.display(fi) bund = FunctionBundle() bund.absc = t bund.ordiMat = v return bund
[docs]def sweptStrobe(varSwp, resetArg, nPts=10, maxDelay=1): ''' Takes in a NdSweeper and looks at the effect of delaying between actuation from measurement. Does the gathering. Starts by taking start and end baselines, for ease of visualization. Args: varSwp (NdSweeper): the original, with 1-d actuation, any measurements, any parsers resetArg (scalar): argument passed to varSwp's actuate procedure to reset and equilibrate nPts (int): number of strobe points maxDelay (float): in seconds, delay of strobe. Also the time to soak on reset Returns: (NdSweeper): the strobe sweep, with accessible data. It can be regathered if needed. Todo: It would be nice to provide timeconstant analysis, perhaps by looking at 50%, or by fitting an exponential ''' if len(varSwp.actuate) > 1: raise NotImplementedError( 'Since sweeper does not do well with >2 dimensions, you can only strobe a 1-D sweep') aKey, actu = list(varSwp.actuate.items())[0] varFun = actu.function varDom = actu.domain measParseKeys = list(varSwp.measure.keys()) + list(varSwp.parse.keys()) # soakFun wraps the existing actuator with a long delay afterwards to ensure equilibration def soakFun(actuArg): varFun(actuArg) time.sleep(maxDelay) def resetSoakAndActu(actuArg): soakFun(resetArg) varFun(actuArg) strobeSwp = varSwp.copy() strobeSwp.reinitActuation() # Another actuate/measure combo. Instead of sweeping actuation, it just resets, then soaks. There is only one sweep point. # Store the results in data for later normalization startValSwp = varSwp.copy() startValSwp.setMonitorOptions(livePlot=False) startValSwp.reinitActuation() startValSwp.addActuation(aKey, soakFun, [resetArg]) startValSwp.gather() startValData = {} for mpKey in measParseKeys: startValData[mpKey + '-start'] = startValSwp.data[mpKey][0] for sdKey, sdVal in startValData.items(): strobeSwp.addStaticData(sdKey, sdVal) strobeSwp.addActuation(aKey, resetSoakAndActu, varDom, doOnEveryPoint=True) # Begin by doing a standard 1-d sweep, but with significant delay to get equilibrium end values of every measurement/parser # Store the results in data for later normalization endValSwp = varSwp.copy() endValSwp.setMonitorOptions(livePlot=False) endValSwp.reinitActuation() endValSwp.addActuation(aKey, soakFun, varDom) endValSwp.gather() endValData = {} for mpKey in measParseKeys: endValData[mpKey + '-end'] = endValSwp.data[mpKey] for edKey, edVal in endValData.items(): strobeSwp.addStaticData(edKey, edVal) # Adding actuation following reset: the default actuation, then a delay # associated with the strobe strobeSwp.addActuation('strobeDelay', time.sleep, np.linspace(0, maxDelay, nPts)) # Provides normalized parsers for plotting strobeSwp.plotOptions['xKey'] = ('strobeDelay') strobeSwp.plotOptions['yKey'] = () for mpKey in measParseKeys: strobeSwp.addParser(mpKey + '-normalized', lambda dat: (dat[mpKey] - dat[mpKey + '-start']) / (dat[mpKey + '-end'] - dat[mpKey + '-start'])) strobeSwp.plotOptions['yKey'] += (mpKey + '-normalized', ) return strobeSwp
[docs]def monitorVariable(fValue, sleepSec=0, nReps=100, plotEvery=1): ''' Monitors some process over time. Good for observing drift. Args: valueFun (function): called at each timestep with no arguments. Must return a scalar or a 1-D np.array sleepSec (scalar): time in seconds to sleep between calls ''' curves = None testV = fValue() if np.isscalar(testV): w = 1 else: w = len(testV) fWrapped = lambda: np.reshape(np.array(fValue()), (1, w)) t0 = time.time() timeFun = lambda: time.time() - t0 _, ax = plt.subplots(figsize=(12, 7)) cycleDefault = plt.rcParams['axes.prop_cycle'].by_key()['color'] cycleContrained = cycleDefault[:w] ax.set_prop_cycle(cycler('color', cycleContrained)) t = np.zeros((nReps, 1)) v = np.zeros((nReps, w)) for it in range(nReps): t[it] = timeFun() v[it, :] = fWrapped() if (it + 1) % plotEvery == 0: if curves is not None: try: [c.remove() for c in curves] # pylint: disable=not-an-iterable except ValueError: # it was probably an old one pass curves = ax.plot(t[:it + 1], v[:it + 1]) display.clear_output(wait=True) display.display(plt.gcf()) time.sleep(sleepSec)