Source code for lightlab.equipment.lab_instruments.Apex_AP2440A_OSA

from . import VISAInstrumentDriver
from lightlab.equipment.visa_bases.driver_base import TCPSocketConnection
from lightlab.laboratory.instruments import OpticalSpectrumAnalyzer

import numpy as np
from lightlab.util.data import Spectrum
import pyvisa
import time
from lightlab import visalogger as logger
import socket
from contextlib import closing
WIDEST_WLRANGE = [1505.765, 1572.418]


[docs]def check_socket(host, port): # learned from https://stackoverflow.com/questions/19196105/python-how-to-check-if-a-network-port-is-open-on-linux with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: if sock.connect_ex((host, port)) == 0: # pylint: disable=no-member logger.debug("%s:%s socket is open", host, port) port_open = True # Port is open else: port_open = False # Port is closed return port_open
[docs]class Apex_AP2440A_OSA(VISAInstrumentDriver): """Class for the OSA Basic functionality includes setting/getting wavelength range and sweeping Other functionality is for controlling TLS: on/off, wavelength (not implemented) The primary function is spectrum, which returns a Spectrum object Usage: :ref:`/ipynbs/Hardware/OpticalSpectrumAnalyzer.ipynb` """ instrument_category = OpticalSpectrumAnalyzer _tcpsocket = None __wlRange = None MAGIC_TIMEOUT = 30 def __init__(self, name='The OSA', address=None, **kwargs): """Initializes a fake VISA connection to the OSA. """ kwargs['tempSess'] = kwargs.pop('tempSess', True) super().__init__(name=name, address=address, **kwargs) self.reinstantiate_session(address, kwargs['tempSess'])
[docs] def reinstantiate_session(self, address, tempSess): if address is not None: # should be something like ['TCPIP0', 'xxx.xxx.xxx.xxx', '6501', 'SOCKET'] address_array = address.split("::") self._tcpsocket = TCPSocketConnection(ip_address=address_array[1], port=int(address_array[2]), timeout=self.MAGIC_TIMEOUT)
[docs] def startup(self): ''' Checks if it is alive, sets up standard OSA parameters ''' with self._tcpsocket.connected(): self.instrID() self.write('SPTRACESWP0', 'SP_SWEEP_TRACE_0') # select trace 0 self.write('SPXUNT1', 'SP_WAVELENGTH') # x-axis mode to wavelength self.write('SPLINSC1', 'SP_LOG') # y-axis to log scale self.write('SPSWPRES0', 'SP_RESOLUTION_100MHz') # resolution to 0.80 pm
# others?
[docs] def open(self): if self.address is None: raise RuntimeError("Attempting to open connection to unknown address.") try: self._tcpsocket.connect() except socket.error: self._tcpsocket.disconnect() raise
[docs] def close(self): self._tcpsocket.disconnect()
def _query(self, queryStr): with self._tcpsocket.connected() as s: s.send(queryStr) i = 0 old_timeout = s.timeout s.timeout = self.MAGIC_TIMEOUT received_msg = '' while i < 1024: # avoid infinite loop recv_str = s.recv(1024) received_msg += recv_str if recv_str.endswith('\n'): break s.timeout = 1 i += 1 s.timeout = old_timeout return received_msg.rstrip()
[docs] def query(self, queryStr, expected_talker=None): ret = self._query(queryStr) if expected_talker is not None: if ret != expected_talker: log_function = logger.warning else: log_function = logger.debug log_function("'%s' returned '%s', expected '%s'", queryStr, ret, str(expected_talker)) else: logger.debug("'%s' returned '%s'", queryStr, ret) return ret
[docs] def write(self, writeStr, expected_talker=None): ''' The APEX does not deal with write; you have to query to clear the buffer ''' self.query(writeStr, expected_talker) time.sleep(0.2)
[docs] def instrID(self): """Overloads the super function because the OSA does not respond to *IDN? Instead sends a simple command and waits for a confirmed return """ try: self.write('SPSWPMSK?') except socket.error: logger.error('OSA communication test failed. Sent: \'SPSWPMSK?\'') raise else: return 'Apex AP2440A'
[docs] def getWLrangeFromHardware(self): theRange = [0] * 2 retStr = self.query('SPSTOPWL?') parsed = retStr.split('_')[-1] # this removes the 'STRT_WL_' parsed = parsed[:-2] # this removes the 'nm' theRange[0] = float(parsed) retStr = self.query('SPSTRTWL?') parsed = retStr.split('_')[-1] # this removes the 'STRT_WL_' parsed = parsed[:-2] # this removes the 'nm' theRange[1] = float(parsed) return theRange
@property def wlRange(self): if self.__wlRange is None: self.__wlRange = self.getWLrangeFromHardware() return self.__wlRange @wlRange.setter def wlRange(self, newRange): """Assigns a new range to the wlRange property AND tells the OSA to update its range :param wlRange: a 2-element array of the form [start WL, end WL] :type wlRange: array of double This can be assigned using the typical notation, such as >>> osaInst.wlRange = [1550, 1551.3] It can also be accessed as normal, such as >>> x = osaInst.wlRange """ newRangeClipped = np.clip(newRange, a_min=1505.765, a_max=1572.418) if np.any(newRange != newRangeClipped): print('Warning: Requested OSA wlRange out of range. Got', newRange) self.write('SPSTRTWL' + str(np.max(newRangeClipped))) self.write('SPSTOPWL' + str(np.min(newRangeClipped))) self.__wlRange = newRangeClipped
[docs] def triggerAcquire(self): """Performs a sweep and reads the data Returns an array of dBm values as doubles :rtype: array """ logger.debug('The OSA is sweeping') self.write('SPTRACESWP0', 'SP_SWEEP_TRACE_0') # activate trace 0 self.write('SPSWP1', 'SP_SINGLE_SWEEP') # Initiates a sweep # self.write('*WAI') # Bus and entire program stall until sweep completes. logger.debug('Done')
[docs] def transferData(self): """ Performs a sweep and reads the data Gets the data of the sweep from the spectrum analyzer Returns: (ndarray, ndarray): wavelength in nm, power in dBm """ retStr = self._query('SPDATAD0') powerData = pyvisa.util.from_ascii_block(retStr, converter='f', separator=' ', container=list) dataLen = int(powerData[0]) powerData = np.array(powerData[1:]) retStr = self._query('SPDATAWL0') wavelengthData = pyvisa.util.from_ascii_block(retStr, converter='f', separator=' ', container=list) assert dataLen == wavelengthData[0] wavelengthData = np.array(wavelengthData[1:]) # wavelengthData = np.linspace(self.wlRange[1], self.wlRange[0], dataLen) return wavelengthData[::-1], powerData[::-1]
[docs] def spectrum(self, average_count=1): """Take a new sweep and return the new data. This is the primary user function of this class """ if not (type(average_count) == int and average_count > 0): raise RuntimeError('average_count must be positive integer, used {}'.format(average_count)) for i in range(average_count): self.triggerAcquire() nm, dbm = self.transferData() if i == 0: dbmAvg = dbm / average_count else: dbmAvg = dbmAvg + dbm / average_count return Spectrum(nm, dbmAvg, inDbm=True)
# TLS access methods currently not implemented @property def tlsEnable(self): retStr = self.query('TLSON?') # do some parsing return int(retStr) == 1 @tlsEnable.setter def tlsEnable(self, newState=None): """newState can be 0/1 or true/false if newState is -1 or None: do nothing Returns the current on/off state as boolean, read from the OSA """ if newState and newState != -1: if isinstance(newState, bool): newState = (newState != 0) writeVal = '1' if newState else '0' self.write('TLSON ' + writeVal) @property def tlsWl(self): retStr = self.query('TLSwl?') # do some parsing return float(retStr) @tlsWl.setter def tlsWl(self, newState=None): """newState is a float in units of nm """ if newState: self.write('TLSwl ' + str(newState))