Source code for lightlab.util.io.progress

''' Some utility functions for printing to stdout used in the project

    Also contains web-based progress monitoring
'''
import sys
import numpy as np
import time
import socket
import tempfile
from pathlib import Path

from .paths import projectDir, monitorDir


[docs]def printWait(*args): r''' Prints your message followed by ``...`` This displays immediately, but * your next print will show up on the same line Args: \*args (Tuple(str)): Strings that will be written ''' msg = '' for a in args: msg += str(a) print(msg + "... ", end='') sys.stdout.flush()
[docs]def printProgress(*args): ''' Deletes current line and writes. This is used for updating iterating values so to not produce a ton of output Args: *args (str, Tuple(str)): Arguments that will be written ''' msg = '' for a in args: msg += str(a) sys.stdout.write('\b' * 1000) sys.stdout.flush() sys.stdout.write(msg) sys.stdout.write('\n')
[docs]class ProgressWriter(object): ''' Writes progress to an html file for long sweeps. Including timestamps. Has an init and an update method You can then open this file to the internet by running a HTTP server. To setup a continuously running server:: screen -S sweepProgressServer (Enter) cd /home/atait/Documents/calibration-instrumentation/sweepMonitorServer/ python3 -m http.server 8050 (Ctrl-a, d) To then access from a web browser:: http://lightwave-lab-olympias.princeton.edu:8050 Todo: Have this class launch its own process server upon init Make it so you can specify actuator names ''' progFileDefault = monitorDir / 'sweep.html' tFmt = '%a, %d %b %Y %H:%M:%S' __tagHead = None __tagFoot = None def __init__(self, name, swpSize, runServer=True, stdoutPrint=False, **kwargs): # pylint: disable=unused-argument ''' Args: name (str): name to be displayed swpSize (tuple): size of each dimension of the sweep ''' self.name = name if np.isscalar(swpSize): swpSize = [swpSize] self.size = np.array(swpSize) self.currentPnt = np.zeros_like(self.size) self.totalSize = np.prod(self.size) self.ofTotal = 0 self.completed = False self.serving = runServer self.printing = stdoutPrint self.startTime = time.time() # In seconds since the epoch def print_string(): prntStr = self.name + "\n" for iterDim, _ in enumerate(self.size): prntStr += 'Dim-' + str(iterDim) + '...' return prntStr if True: monitorDir.mkdir(exist_ok=True) # pylint: disable=no-member self.tempfile = tempfile.NamedTemporaryFile("w+", dir=str(monitorDir), prefix='swp_prog') print("Printing progress in tempfile {}".format(self.tempfile.name)) self.tempfile.write(print_string() + "\n") self.tempfile.write(self._get_std_string() + "\n") self.tempfile.flush() if self.serving: print('See sweep progress online at') print(self.getUrl()) monitorDir.mkdir(exist_ok=True) # pylint: disable=no-member fp = Path(ProgressWriter.progFileDefault) # pylint: disable=no-member fp.touch() self.filePath = fp.resolve() self.__writeHtml() if self.printing: print(print_string()) self.__writeStdio()
[docs] @staticmethod def getUrl(): ''' URL where the progress monitor will be hosted ''' prefix = 'http://' host = socket.getfqdn().lower() try: with open(projectDir / '.monitorhostport', 'r') as fx: port = int(fx.readline()) except FileNotFoundError: port = 'null' return prefix + host + ':' + str(port)
def __tag(self, bodytext, autorefresh=False): ''' Do the HTML tags ''' if not hasattr(self, '__tagHead') or self.__tagHead is None: t = '<!DOCTYPE html>\n' t += '<html>\n' t += '<head>\n' t += '<title>' t += 'Sweep Progress Monitor' t += '</title>\n' if autorefresh: t += '<meta http-equiv="refresh" content="5" />\n' # Autorefresh every 5 seconds t += '<body>\n' t += '<h1>' + self.name + '</h1>\n' t += r'<hr \>\\n' self.__tagHead = t if not hasattr(self, '__tagFoot') or self.__tagFoot is None: t = '</body>\n' t += '</html>\n' self.__tagFoot = t return self.__tagHead + bodytext + self.__tagFoot def __writeStdioEnd(self): # display.clear_output(wait=True) print('Sweep completed!') def __writeHtmlEnd(self): self.__tagHead = None self.__tagFoot = None body = '<h2>Sweep completed!</h2>\n' body += ptag('At ' + ProgressWriter.tims(time.time())) htmlText = self.__tag(body, autorefresh=False) with self.filePath.open('w') as fx: # pylint: disable=no-member fx.write(htmlText) def _get_std_string(self): prntStr = '' for iterDim, dimSize in enumerate(self.size): of = (self.currentPnt[iterDim] + 1, dimSize) prntStr += '/'.join((str(v) for v in of)) + '...' return prntStr def __writeStdio(self): print(self._get_std_string()) def __writeHtml(self): # Write lines for progress in each dimension body = '' for i, p in enumerate(self.currentPnt): dimStr = i * 'sub-' + 'dimension[' + str(i) + '] : ' dimStr += str(p + 1) + ' of ' + str(self.size[i]) body += ptag(dimStr) body += r'<hr \>\\n' # Calculating timing currentTime = time.time() tSinceStart = currentTime - self.startTime completeRatio = (self.ofTotal + 1) / self.totalSize endTime = self.startTime + tSinceStart / completeRatio body += ptag('(Start Time) ' + ProgressWriter.tims(self.startTime)) body += ptag('(Latest Update) ' + ProgressWriter.tims(currentTime)) body += ptag('(Expected Completion) ' + ProgressWriter.tims(endTime)) # Say where the files are hosted body += ptag('This monitor service is hosted in the directory:') body += ptag(str(monitorDir)) # Write to html file if self.serving: htmlText = self.__tag(body, autorefresh=True) with self.filePath.open('w') as fx: # pylint: disable=no-member fx.write(htmlText)
[docs] def update(self, steps=1): if self.completed: raise Exception( 'This sweep has supposedly completed. Make a new object to go again') for _ in range(steps): self.__updateOneInternal() if not self.completed: if self.serving: self.__writeHtml() if self.printing: self.__writeStdio() if True: self.tempfile.write(self._get_std_string() + "\n") self.tempfile.flush() else: if self.serving: self.__writeHtmlEnd() if self.printing: self.__writeStdioEnd() if True: self.tempfile.write("End." + "\n") self.tempfile.flush()
def __updateOneInternal(self): for i in range(len(self.size)): if self.currentPnt[-i - 1] < self.size[-i - 1] - 1: self.currentPnt[-i - 1] += 1 break else: self.currentPnt[-i - 1] = 0 self.ofTotal += 1 if self.ofTotal == self.totalSize: self.completed = True
[docs] @classmethod def tims(cls, epochTime): return time.strftime(cls.tFmt, time.localtime(epochTime)) + '\n'
[docs]def ptag(s): return '<p>' + s + '</p>\n'