#!/usr/bin/env python3 import threading import socket import time import getopt import sys import blup.BLP import blup.output class BLPStreamReceiverThread(threading.Thread): def __init__(self, addr, port, hub, protocol=blup.BLP.PROTO_BLP): threading.Thread.__init__(self) self.__addr = addr self.__port = port self.__hub = hub self.__protocol = protocol self.__running = False def run(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((self.__addr, self.__port)) sock.settimeout(1) self.__running = True while self.__running: try: data, addr = sock.recvfrom(2048) except socket.timeout: continue try: frame = blup.BLP.blp2frame(data, protocol=self.__protocol) except ValueError: print('got an invalid frame from', addr) else: self.__hub.sendFrame(frame, self) def terminate(self): self.__running = False class BLPHub(): def __init__(self, output): self.__output = output self.__streams = {} self.__lock = threading.Lock() self.__lastFrameTimeout = -1 self.__lastFramePriority = -1 def sendFrame(self, frame, stream): with self.__lock: if stream not in self.__streams.keys(): return False priority = self.__streams[stream]['priority'] currentTime = int(time.time() * 1000) accept = False if priority >= self.__lastFramePriority: accept = True else: if currentTime > self.__lastFrameTimeout: accept = True if accept: self.__lastFramePriority = priority self.__lastFrameTimeout = ( currentTime + self.__streams[stream]['timeout'] ) self.__output.sendFrame(frame) return True return False def addStream(self, stream, priority, timeout): if stream not in self.__streams.keys(): self.__streams[stream] = {'priority': priority, 'timeout': timeout} def printUsage(errMsg=None): if errMsg is not None: print('error: %s\n' % (errMsg)) print('usage: %s [OPTIONS] INPUTSTREAM ...' % (sys.argv[0])) print('where INPUTSTREAM is given as HOST:PORT:PRIORITY:TIMEOUT') print('supported options:') print(' -o OUTPUT where to put the received frames') print(' --eblp use EBLP instead of BLP') print(' --e3blp use E3BLP instead of BLP') print(' -h print this text') print(' --help') def main(): (opts, args) = getopt.gnu_getopt(sys.argv, 'hs:o:', ['help', 'eblp', 'e3blp']) opts = dict(opts) if '--help' in opts or '-h' in opts: printUsage() sys.exit(0) if '-o' in opts: output = opts['-o'] else: output = 'shell' if '--eblp' in opts: protocol = blup.BLP.PROTO_EBLP elif '--e3blp' in opts: protocol = blup.BLP.PROTO_E3BLP else: protocol = blup.BLP.PROTO_BLP if '--eblp' in opts and '--e3blp' in opts: printUsage('please only specify one of --eblp or --e3blp') sys.exit(1) streams = [] for arg in args[1:]: try: (host, port, priority, timeout) = arg.split(':') port = int(port) priority = int(priority) timeout = int(timeout) if port <= 0 or timeout < 0: raise ValueError except ValueError as e: printUsage('illegal stream specification') sys.exit(1) streams.append((host, port, priority, timeout)) if len(streams) == 0: printUsage('please specify at least one stream') sys.exit(1) try: out = blup.output.getOutput(output) except ValueError: printUsage('could not initialize output') sys.exit(1) hub = BLPHub(out) streamThreads = [] firstThread = None for (host, port, priority, timeout) in streams: stream = BLPStreamReceiverThread(host, port, hub, protocol) hub.addStream(stream, priority, timeout) if firstThread is None: firstThread = stream else: streamThreads.append(stream) stream.start() try: firstThread.run() except KeyboardInterrupt: if len(streamThreads) > 0: print('waiting for listeners to terminate...') for stream in streamThreads: stream.terminate() for stream in streamThreads: stream.join() if __name__ == '__main__': main()