'''
Two dimensional measured objects where the second abscissa variable is either
* discrete (:class:`FunctionBundle`), or
* continuous (:class:`MeasuredSurface`)
'''
import matplotlib.pyplot as plt
import numpy as np
from scipy import interpolate
import matplotlib.cm as cm
from functools import wraps
from itertools import repeat
from .one_dim import MeasuredFunction, Waveform
from lightlab.laboratory import Hashable
[docs]class FunctionBundle(Hashable): # pylint: disable=eq-without-hash
''' A bundle of :class:`~lightlab.util.data.one_dim.MeasuredFunction`'s: "z" vs. "x", "i"
The key is that they have the same abscissa base.
This class will take care of resampling in a common abscissa base.
The bundle can be:
* iterated to get the individual :class`~lightlab.util.data.one_dim.MeasuredFunction`'s
* operated on with other ``FunctionBundles``
* plotted with :meth`simplePlot` and :meth:`multiAxisPlot`
Feeds through **callable** signal processing methods to its members (type MeasuredFunction),
If the method is not found in the FunctionBundle, and it is in it's member,
it will be mapped to every function in the bundle, returning a new bundle.
Distinct from a :class:`MeasuredSurface` because
the additional axis does not represent a continuous thing.
It is discrete and sometimes unordered.
Distinct from a :class:`FunctionalBasis` because
it does not support most linear algebra-like stuff
(e.g. decomposision, matrix multiplication, etc.).
This is not a strict rule.
'''
def __init__(self, measFunList=None):
''' Can be initialized fully, or initialized with None to be built interactively.
Args:
measFunList (list[MeasuredFunction] or None): list of MeasuredFunctions that must have the same abscissa.
'''
self.absc = None
self.ordiMat = None
self.memberType = None
self.nDims = 0
if measFunList is not None:
if type(measFunList) is list:
for m in measFunList:
self.addDim(m)
else:
self.addDim(measFunList)
[docs] def addDim(self, newMeasFun):
if self.absc is None:
self.absc = newMeasFun.absc
self.ordiMat = np.matrix(newMeasFun.ordi)
self.memberType = type(newMeasFun)
else:
y = self._putInTimebase(newMeasFun) # This does the checking for type and timebase
self.ordiMat = np.append(self.ordiMat, [y], axis=0)
self.nDims += 1
def __getitem__(self, index):
''' Gives out individual measured functions of the type used, or
If it gets a slice, it returns a function bundle,
just like a sliced numpy array returns a numpy array.
Args:
index (int or slice): which of the function(s) do you want to get
Returns:
(MeasuredFunction or FunctionBundle): depending on type of index
'''
if type(index) is int:
theOrdi = self.ordiMat[index, :].A1 # A1 is a special numpy thing that converts from matrix to 1-d array
return self.memberType(self.absc, theOrdi)
elif type(index) is slice:
newBundle = self.copy()
newOrdiMat = self.ordiMat[index, :]
newBundle.ordiMat = newOrdiMat
newBundle.nDims = np.shape(newOrdiMat)[0]
return newBundle
def __len__(self):
return self.ordiMat.shape[0]
def __eq__(self, other):
return (self.memberType is other.memberType and
self.absc == other.absc and
self.ordiMat == other.ordiMat)
def __add__(self, other):
''' This works with scalars, vectors, MeasuredFunctions, and FunctionBundles
'''
if np.isscalar(other) or isinstance(other, MeasuredFunction):
other = repeat(other, self.nDims)
newBundle = type(self)()
for selfItem, otherItem in zip(self, other):
newItem = selfItem + otherItem
newBundle.addDim(newItem)
return newBundle
def __radd__(self, other):
return self.__add__(other)
def __sub__(self, other):
return self.__add__(-1 * other)
def __mul__(self, other):
''' This works with scalars, vectors, MeasuredFunctions, and FunctionBundles
'''
if np.isscalar(other) or isinstance(other, MeasuredFunction):
other = repeat(other, self.nDims)
newBundle = type(self)()
for selfItem, otherItem in zip(self, other):
newItem = selfItem * otherItem
newBundle.addDim(newItem)
return newBundle
def __rmul__(self, other):
return self.__mul__(other)
def __truediv__(self, other):
return self * (1 / other)
def __getattr__(self, attrName):
''' Feeds through **callable** methods to its members (type MeasuredFunction),
if the attribute is not found in the FunctionBundle.
If it finds it there, then applies it to every function in the bundle.
With binary operator math, it only works with scalars.
For example, starting with::
.. code-block:: python
spct1 = Spectrum([0, 1, 2], [2, 5, 3])
spct2 = Spectrum([0, 1, 2], [4, 4, 1])
funBun = FunctionBundle([spct1, spct2])
You can do
.. code-block:: python
croppedFunBund = FunctionBundle([
spct1.crop([0, 1]),
spct2.crop([0, 1]))
])
funBun.crop([0, 1]) == croppedFunBund
Note:
Be careful about "overloading" a MeasuredFunction method in FunctionBundle.
It can have a different meaning than what would happen if not overloaded.
For example ``MeasuredFunction.__mul__(nonscalar)`` is function multiplication,
while ``FunctionBundle.__mul__(nonscalar)`` means vector dot product.
Todo:
This only works with :class:`~lightlab.util.data.one_dim.SignalProcessingMixin`
methods that are MF-in/MF-out. How should we handle characteristic-type methods,
such as ``centerOfMass``, or ``getRange``?
'''
if attrName == 'memberType':
raise RuntimeError('Missed "memberType"')
try:
memberClassFunc = getattr(self.memberType, attrName)
except AttributeError as err:
betterErrStr = err.args[0] + ', neither does \'{}\' object'.format(type(self).__name__)
err.args = tuple([betterErrStr, err.args[1:]])
raise
if not callable(memberClassFunc):
raise TypeError(f'{memberClassFunc} in'
' {} of {}'.format(type(self).__name__, self.memberType.__name__) + ' '
'is not callable')
@wraps(memberClassFunc)
def fakeFun(*args, **kwargs):
newBundle = type(self)()
for item in self:
newItem = memberClassFunc(item, *args, **kwargs)
newBundle.addDim(newItem)
return newBundle
return fakeFun
[docs] def copy(self):
newObj = type(self)()
newObj.__dict__ = self.__dict__.copy()
newObj.ordiMat = self.ordiMat.copy()
return newObj
[docs] def extend(self, otherFunctionBund):
for func in otherFunctionBund:
self.addDim(func)
[docs] def max(self):
''' Returns a single MeasuredFunction(subclass) that is the maximum of all in this bundle
'''
return self.memberType(self.absc, np.max(self.ordiMat, axis=0).A1)
[docs] def min(self):
''' Returns a single MeasuredFunction(subclass) that is the minimum of all in this bundle
'''
return self.memberType(self.absc, np.min(self.ordiMat, axis=0).A1)
[docs] def mean(self):
''' Returns a single MeasuredFunction(subclass) that is the mean of all in this bundle
'''
return self.memberType(self.absc, np.mean(self.ordiMat, axis=0).A1)
def _putInTimebase(self, testFun):
''' Makes sure signal type is correct and time basis is the same
Args:
testFun (MeasuredFunction): the new function, as encapsulated by an object
Returns:
array: the ordinate of the new function in the right timebase
Raises:
TypeError: if the type of ``testFun`` is different than the others in this FunctionalBasis
'''
if type(testFun).__name__ is not self.memberType.__name__:
raise TypeError('This FunctionalBasis expects ' + str(self.memberType) +
', but was given ' + str(type(testFun)) + '.')
# Check time base
if np.any(testFun.absc != self.absc):
# logger.warning('Warning: Basis signal time abscissa are different. Interpolating...')
return testFun(self.absc)
else:
return testFun.ordi
[docs] def simplePlot(self, *args, **kwargs):
'''
'''
for f in self:
f.simplePlot(*args, **kwargs)
[docs] def multiAxisPlot(self, *args, axList=None, titleRoot=None, **kwargs):
''' titleRoot must take one argument in its format method, which is given the index
Returns:
(list(axis)): The axes that were plotted upon
'''
if axList is None:
_, axList = plt.subplots(nrows=len(self), figsize=(14, 14))
if len(axList) != len(self):
raise ValueError('Wrong number of axes. Got {}, need {}.'.format(
len(axList), len(self)))
for i, ax in enumerate(axList):
plt.sca(ax)
self[i].simplePlot(*args, **kwargs)
if titleRoot is not None:
plt.title(titleRoot.format(i + 1))
# plt.xlabel('Time (s)')
if i < len(self) - 1:
ax.xaxis.set_ticklabels([])
ax.set_xlabel('')
# plt.ylabel('Intensity (a.u.)')
# plt.xlim(0,2e-8)
return axList
[docs] def histogram(self):
''' Gives a MeasuredFunction of counts vs. ordinate values (typically voltage)
Does not maintain any abscissa information
At this point, does not allow caller to set the arguments passed to np.histogram
This is mainly just for plotting
'''
hist, bins = np.histogram(self.ordiMat, bins='auto', density=False)
histFun = MeasuredFunction([], [])
for iBin, thisOrdi in enumerate(hist):
thisAbsc = np.mean(bins[iBin:iBin + 1])
histFun.addPoint((thisAbsc, thisOrdi))
return histFun
[docs] def weightedAddition(self, weiVec):
''' Calculates the weighted addition of the basis signals
Args:
weiVec (array): weights to be applied to the basis functions
Returns:
(MeasuredFunction): weighted addition of basis signals
'''
assert(len(weiVec) == self.nDims)
weiVec = np.reshape(weiVec, (1, self.nDims))
outOrdi = np.dot(weiVec, self.ordiMat)
return self.memberType(self.absc, outOrdi.A1)
[docs] def moment(self, order=2, allDims=True, relativeGauss=False):
''' The order'th moment of all the points in the bundle.
Args:
order (integer): the polynomial moment of inertia. Don't trust the normalization of > 2'th order.
order = 1: mean
order = 2: variance
order = 3: skew
order = 4: kurtosis
allDims (bool): if true, collapses all signals, returning a scalar
Returns:
(ndarray or float): the specified moment(s)
'''
byDim = np.zeros(len(self))
for iDim in range(len(self)):
byDim[iDim] = self[iDim].moment(order, relativeGauss=relativeGauss)
if allDims:
return np.mean(byDim)
else:
return byDim
[docs] def componentAnalysis(self, *args, pcaIca=True, lNorm=2, expectedComponents=None, **kwargs):
''' Gives the waveform representing the principal component of the order
Args:
pcaIca (bool): if True, does PCA; if False, does ICA
lNorm (int): how to normalize weight vectors. L1 norm uses the maximum abs weight, while L2 norm (default) is vector unit
expectedComponents (FunctionBundle or subclass): Used for flipping signs
args, kwargs: Feed through to sklearn.decomposition.[PCA(), FastICA()]
Returns:
(FunctionBundle or subclass): principal component waveforms
'''
from sklearn.decomposition import PCA, FastICA
compBuiltIn = PCA if pcaIca else FastICA
compAnal = compBuiltIn(*args, **kwargs)
compAnal.fit(self.ordiMat.T)
pcBundle = type(self)()
for c in compAnal.components_:
if lNorm == 1:
w = c / np.sqrt(np.max(c ** 2))
elif lNorm == 2:
w = c / np.sqrt(np.sum(c ** 2))
else:
raise Exception('Are you serious? Use a valid L-norm')
pcWfm = self.weightedAddition(w)
pcBundle.addDim(pcWfm)
if expectedComponents is not None:
pcBundle = pcBundle.correctSigns(expectedComponents, maintainOrder=pcaIca)
return pcBundle
[docs] def correctSigns(self, otherBundle, maintainOrder=True):
''' Goes through each component and flips the sign if correlation is negative
ICA also has a permutation indeterminism.
'''
if len(self) != len(otherBundle):
raise ValueError('Self and other bundle must be the same length')
newBundle = type(self)()
usedPermInds = []
for iDim, oSig in enumerate(otherBundle):
if maintainOrder:
sSig = self[iDim]
if (sSig * oSig).getMean() > 0:
newWfm = sSig
else:
newWfm = -1 * sSig
else:
permCorrs = np.zeros(len(self))
for jDim, sSig in enumerate(self):
if jDim not in usedPermInds:
permCorrs[jDim] = (sSig * oSig).getMean()
permInd = np.argmax(np.abs(permCorrs))
permSign = int(np.sign(permCorrs[permInd]))
newWfm = permSign * self[permInd]
usedPermInds.append(permInd)
newBundle.addDim(newWfm)
return newBundle
[docs]class FunctionalBasis(FunctionBundle):
''' A FunctionBundle that supports additional linear algebra methods
Created for weighted addition, decomposition, and component analysis
'''
[docs] @classmethod
def independentDefault(cls, nDims):
''' Gives a basis of non-overlapping pulses. Waveforms only
'''
newAbsc = np.linspace(0, 1, 1000)
pWid = .5 / nDims
newObj = cls()
for iDim in range(nDims):
on = (iDim + .25) / nDims
sig = Waveform.pulse(newAbsc, tOn=on, tOff=on + pWid)
newObj.addDim(sig)
return newObj
[docs] def innerProds(self, trial):
''' takes the inner products of the trial function onto this basis.
'''
tvec = self._putInTimebase(trial)
tmat = np.tile(tvec, (self.nDims, 1))
products = np.sum(np.multiply(tmat, self.ordiMat), axis=1).A1
return products
[docs] def magnitudes(self):
''' The inner product of the basis with itself
'''
return np.sum(np.multiply(self.ordiMat, self.ordiMat), axis=1).A1
[docs] def project(self, trial):
''' Projects onto normalized basis
If the basis is orthogonal, this is equivalent to weight decomposition
'''
return self.innerProds(trial) / self.magnitudes()
[docs] def decompose(self, trial, moment=1):
''' Uses the Moore-Penrose pseudoinverse to get weight decomposition without orthogonality
Args:
trial (MeasuredFunction): signal to be decomposed
moment (float): polynomial moment of the basis to use when decomposing
'''
tvec = self._putInTimebase(trial)
momBasis = np.power(self.ordiMat, moment)
basisInverse = np.linalg.pinv(momBasis)
return np.dot(tvec, basisInverse).A1
[docs] def matrixMultiply(self, weiMat):
assert(weiMat.shape[1] == self.nDims)
outBasis = type(self)()
for weiVec in weiMat:
weightedDim = self.weightedAddition(weiVec)
outBasis.addDim(weightedDim)
return outBasis
[docs] def getMoment(self, weiVecs=None, order=2, relativeGauss=False):
''' This is actually the projected moment. Named for compatibility with bss package
Make sure weiVecs is two dimensional
'''
moms = np.zeros(len(weiVecs))
for iv, v in enumerate(weiVecs):
weighted = self.weightedAddition(v)
calcMom = weighted.moment(order, relativeGauss=relativeGauss)
moms[iv] = calcMom
return moms
[docs] def remainder(self, trial):
''' Gives the remaining parts of the signal that are not explained by the minimum-squared-error decomposition
'''
explainedWeights = self.decompose(trial)
explainedSignal = self.weightedAddition(explainedWeights)
return trial - explainedSignal
[docs] def covariance(self):
''' Returns covariance matrix of the basis, which is nDims x nDims '''
return np.cov(self.ordiMat)
[docs]class MeasuredSurface(object):
''' Basically a two dimensional measured function: "z" vs. "x", "y"
Useful trick when gathering data: build incrementally using :meth:`FunctionBundle.addDim`,
then convert that to this class using :meth:`MeasuredSurface.fromFunctionBundle`.
'''
def __init__(self, absc, ordi):
'''
Args:
absc (ndarray): same meaning as measured function
ordi (ndarray): two-dimensional array or matrix
'''
if type(absc) == np.ndarray and absc.ndim != 1:
raise Exception(
'absc should be a 2-element list of arrays or an array of objects (which are arrays)')
if len(absc) != 2:
raise Exception('Wrong number of abscissas. Need two')
abscshape = np.zeros(2)
for iDim in range(2):
abscshape[iDim] = len(absc[iDim])
if not np.all(ordi.shape == abscshape):
raise Exception('Dimensions of ordinate and abscissa do not match')
self.absc = absc
self.ordi = ordi
[docs] @classmethod
def fromFunctionBundle(cls, otherBund, addedAbsc=None):
''' gives back a MeasuredSurface from a function Bundle
Args:
otherBund (FunctionBundle): The source. The ordering of functions matters
addedAbsc (np.ndarray): the second dimension abscissa array (default, integers)
Returns:
(:class:`MeasuredSurface`) new object
'''
existingAbsc = otherBund.absc
if addedAbsc is None:
addedAbsc = np.arange(otherBund.nDims)
return cls([addedAbsc, existingAbsc], otherBund.ordiMat)
def __call__(self, testAbscissaVec=None):
f = interpolate.interp2d(*self.absc, z=self.ordi, kind='cubic')
return f(*testAbscissaVec)
[docs] def item(self, index, dim=None):
if np.isscalar(index):
assert(dim is not None)
newAbsc = self.absc[dim]
if dim == 0:
newOrdi = self.ordi[index, :]
else:
newOrdi = self.ordi[:, index]
return MeasuredFunction(newAbsc, newOrdi)
else:
assert(len(index) == 2)
firstDimMf = self.item(index[0], dim=0)
return firstDimMf[index[1]]
[docs] def shape(self):
return self.ordi.shape
[docs] def simplePlot(self, *args, **kwargs):
if 'cmap' not in kwargs.keys():
kwargs['cmap'] = cm.inferno # pylint: disable=no-member
if 'shading' not in kwargs.keys():
kwargs['shading'] = 'flat'
YY, XX = np.meshgrid(self.absc[0], self.absc[1])
plt.pcolormesh(XX, YY, np.array(self.ordi.T), *args, **kwargs)
plt.autoscale(tight=True)
[docs]class Spectrogram(MeasuredSurface):
pass
[docs]class MeasuredErrorField(object):
''' A field that hold two abscissa arrays and two ordinate matrices
Error is the measuredGrid - nominalGrid, which is a vector field
'''
def __init__(self, nominalGrid, measuredGrid):
assert(nominalGrid.ndim == 3)
self.nomiGrid = nominalGrid
if measuredGrid.ndim == 4:
self.measGrid = np.mean(measuredGrid, axis=0)
elif measuredGrid.ndim == 3:
self.measGrid = measuredGrid
else:
raise Exception('measuredGrid must be dimension 3 (meaned) or 4 (trials)')
def __call__(self, testVec=None):
xVec = self.nomiGrid[:, :, 0]
yVec = self.nomiGrid[:, :, 1]
uVec = self.measGrid[:, :, 0]
vVec = self.measGrid[:, :, 1]
u = interpolate.interp2d(xVec, yVec, uVec, kind='linear')
v = interpolate.interp2d(xVec, yVec, vVec, kind='linear')
testU = u(*testVec)[0]
testV = v(*testVec)[0]
return np.array([testU, testV])
[docs] def errorAt(self, testVec=None):
return self(testVec) - testVec
[docs] def invert(self, desiredVec):
reflectedVec = desiredVec - self.errorAt(desiredVec)
avgErr = (self.errorAt(desiredVec) + self.errorAt(reflectedVec)) / 2
commandVec = desiredVec - avgErr
return commandVec
[docs] def zeroCenteredSquareSize(self):
''' Very stupid, just look at corner points
Returns:
(tuple(float)): square sides of nominal and measured grids
'''
def cornerInd(grid, minOrMax):
score = np.sum(grid, axis=2)
cornerFlatInd = np.argmin(score) if minOrMax else np.argmax(score)
return np.unravel_index(cornerFlatInd, grid.shape[:2])
def zcSz(grid, corner):
cornerVec = grid[(corner + (slice(None), ))]
sqSide = np.min(np.abs(cornerVec))
return sqSide
nomiCornerInds = [cornerInd(self.nomiGrid, m) for m in [True, False]]
nomiSq = np.min([zcSz(self.nomiGrid, nomiCornerInds[i]) for i in range(2)])
measSq = np.min([zcSz(self.measGrid, nomiCornerInds[i]) for i in range(2)])
return nomiSq, measSq