You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
167 lines
4.8 KiB
167 lines
4.8 KiB
#!/usr/bin/env python3 |
|
|
|
import threading |
|
import socket |
|
import time |
|
import getopt |
|
import sys |
|
|
|
import blup.BLP |
|
import blup.output |
|
|
|
|
|
class BLPStreamReceiverThread(threading.Thread): |
|
def __init__(self, addr, port, hub, protocol=blup.BLP.PROTO_BLP): |
|
threading.Thread.__init__(self) |
|
self.__addr = addr |
|
self.__port = port |
|
self.__hub = hub |
|
self.__protocol = protocol |
|
self.__running = False |
|
|
|
def run(self): |
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
sock.bind((self.__addr, self.__port)) |
|
sock.settimeout(1) |
|
|
|
self.__running = True |
|
while self.__running: |
|
try: |
|
data, addr = sock.recvfrom(2048) |
|
except socket.timeout: |
|
continue |
|
|
|
try: |
|
frame = blup.BLP.blp2frame(data, protocol=self.__protocol) |
|
except ValueError: |
|
print('got an invalid frame from', addr) |
|
else: |
|
self.__hub.sendFrame(frame, self) |
|
|
|
def terminate(self): |
|
self.__running = False |
|
|
|
class BLPHub(): |
|
def __init__(self, output): |
|
self.__output = output |
|
self.__streams = {} |
|
self.__lock = threading.Lock() |
|
self.__lastFrameTimeout = -1 |
|
self.__lastFramePriority = -1 |
|
|
|
def sendFrame(self, frame, stream): |
|
with self.__lock: |
|
if stream not in self.__streams.keys(): |
|
return False |
|
|
|
priority = self.__streams[stream]['priority'] |
|
currentTime = int(time.time() * 1000) |
|
|
|
accept = False |
|
if priority >= self.__lastFramePriority: |
|
accept = True |
|
else: |
|
if currentTime > self.__lastFrameTimeout: |
|
accept = True |
|
|
|
if accept: |
|
self.__lastFramePriority = priority |
|
self.__lastFrameTimeout = ( |
|
currentTime + self.__streams[stream]['timeout'] |
|
) |
|
self.__output.sendFrame(frame) |
|
return True |
|
|
|
return False |
|
|
|
def addStream(self, stream, priority, timeout): |
|
if stream not in self.__streams.keys(): |
|
self.__streams[stream] = {'priority': priority, 'timeout': timeout} |
|
|
|
def printUsage(errMsg=None): |
|
if errMsg is not None: |
|
print('error: %s\n' % (errMsg)) |
|
print('usage: %s [OPTIONS] INPUTSTREAM ...' % (sys.argv[0])) |
|
print('where INPUTSTREAM is given as HOST:PORT:PRIORITY:TIMEOUT') |
|
print('supported options:') |
|
print(' -o OUTPUT where to put the received frames') |
|
print(' --eblp use EBLP instead of BLP') |
|
print(' --e3blp use E3BLP instead of BLP') |
|
print(' -h print this text') |
|
print(' --help') |
|
|
|
def main(): |
|
(opts, args) = getopt.gnu_getopt(sys.argv, 'hs:o:', |
|
['help', 'eblp', 'e3blp']) |
|
opts = dict(opts) |
|
|
|
if '--help' in opts or '-h' in opts: |
|
printUsage() |
|
sys.exit(0) |
|
|
|
if '-o' in opts: |
|
output = opts['-o'] |
|
else: |
|
output = 'shell' |
|
|
|
if '--eblp' in opts: |
|
protocol = blup.BLP.PROTO_EBLP |
|
elif '--e3blp' in opts: |
|
protocol = blup.BLP.PROTO_E3BLP |
|
else: |
|
protocol = blup.BLP.PROTO_BLP |
|
|
|
if '--eblp' in opts and '--e3blp' in opts: |
|
printUsage('please only specify one of --eblp or --e3blp') |
|
sys.exit(1) |
|
|
|
streams = [] |
|
for arg in args[1:]: |
|
try: |
|
(host, port, priority, timeout) = arg.split(':') |
|
port = int(port) |
|
priority = int(priority) |
|
timeout = int(timeout) |
|
if port <= 0 or timeout < 0: |
|
raise ValueError |
|
except ValueError as e: |
|
printUsage('illegal stream specification') |
|
sys.exit(1) |
|
streams.append((host, port, priority, timeout)) |
|
|
|
if len(streams) == 0: |
|
printUsage('please specify at least one stream') |
|
sys.exit(1) |
|
|
|
try: |
|
out = blup.output.getOutput(output) |
|
except ValueError: |
|
printUsage('could not initialize output') |
|
sys.exit(1) |
|
|
|
hub = BLPHub(out) |
|
|
|
streamThreads = [] |
|
firstThread = None |
|
for (host, port, priority, timeout) in streams: |
|
stream = BLPStreamReceiverThread(host, port, hub, protocol) |
|
hub.addStream(stream, priority, timeout) |
|
if firstThread is None: |
|
firstThread = stream |
|
else: |
|
streamThreads.append(stream) |
|
stream.start() |
|
|
|
try: |
|
firstThread.run() |
|
except KeyboardInterrupt: |
|
if len(streamThreads) > 0: |
|
print('waiting for listeners to terminate...') |
|
for stream in streamThreads: |
|
stream.terminate() |
|
for stream in streamThreads: |
|
stream.join() |
|
|
|
if __name__ == '__main__': |
|
main() |
|
|
|
|