You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

167 lines
4.8 KiB

#!/usr/bin/env python3
import threading
import socket
import time
import getopt
import sys
import blup.BLP
import blup.output
class BLPStreamReceiverThread(threading.Thread):
def __init__(self, addr, port, hub, protocol=blup.BLP.PROTO_BLP):
threading.Thread.__init__(self)
self.__addr = addr
self.__port = port
self.__hub = hub
self.__protocol = protocol
self.__running = False
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((self.__addr, self.__port))
sock.settimeout(1)
self.__running = True
while self.__running:
try:
data, addr = sock.recvfrom(2048)
except socket.timeout:
continue
try:
frame = blup.BLP.blp2frame(data, protocol=self.__protocol)
except ValueError:
print('got an invalid frame from', addr)
else:
self.__hub.sendFrame(frame, self)
def terminate(self):
self.__running = False
class BLPHub():
def __init__(self, output):
self.__output = output
self.__streams = {}
self.__lock = threading.Lock()
self.__lastFrameTimeout = -1
self.__lastFramePriority = -1
def sendFrame(self, frame, stream):
with self.__lock:
if stream not in self.__streams.keys():
return False
priority = self.__streams[stream]['priority']
currentTime = int(time.time() * 1000)
accept = False
if priority >= self.__lastFramePriority:
accept = True
else:
if currentTime > self.__lastFrameTimeout:
accept = True
if accept:
self.__lastFramePriority = priority
self.__lastFrameTimeout = (
currentTime + self.__streams[stream]['timeout']
)
self.__output.sendFrame(frame)
return True
return False
def addStream(self, stream, priority, timeout):
if stream not in self.__streams.keys():
self.__streams[stream] = {'priority': priority, 'timeout': timeout}
def printUsage(errMsg=None):
if errMsg is not None:
print('error: %s\n' % (errMsg))
print('usage: %s [OPTIONS] INPUTSTREAM ...' % (sys.argv[0]))
print('where INPUTSTREAM is given as HOST:PORT:PRIORITY:TIMEOUT')
print('supported options:')
print(' -o OUTPUT where to put the received frames')
print(' --eblp use EBLP instead of BLP')
print(' --e3blp use E3BLP instead of BLP')
print(' -h print this text')
print(' --help')
def main():
(opts, args) = getopt.gnu_getopt(sys.argv, 'hs:o:',
['help', 'eblp', 'e3blp'])
opts = dict(opts)
if '--help' in opts or '-h' in opts:
printUsage()
sys.exit(0)
if '-o' in opts:
output = opts['-o']
else:
output = 'shell'
if '--eblp' in opts:
protocol = blup.BLP.PROTO_EBLP
elif '--e3blp' in opts:
protocol = blup.BLP.PROTO_E3BLP
else:
protocol = blup.BLP.PROTO_BLP
if '--eblp' in opts and '--e3blp' in opts:
printUsage('please only specify one of --eblp or --e3blp')
sys.exit(1)
streams = []
for arg in args[1:]:
try:
(host, port, priority, timeout) = arg.split(':')
port = int(port)
priority = int(priority)
timeout = int(timeout)
if port <= 0 or timeout < 0:
raise ValueError
except ValueError as e:
printUsage('illegal stream specification')
sys.exit(1)
streams.append((host, port, priority, timeout))
if len(streams) == 0:
printUsage('please specify at least one stream')
sys.exit(1)
try:
out = blup.output.getOutput(output)
except ValueError:
printUsage('could not initialize output')
sys.exit(1)
hub = BLPHub(out)
streamThreads = []
firstThread = None
for (host, port, priority, timeout) in streams:
stream = BLPStreamReceiverThread(host, port, hub, protocol)
hub.addStream(stream, priority, timeout)
if firstThread is None:
firstThread = stream
else:
streamThreads.append(stream)
stream.start()
try:
firstThread.run()
except KeyboardInterrupt:
if len(streamThreads) > 0:
print('waiting for listeners to terminate...')
for stream in streamThreads:
stream.terminate()
for stream in streamThreads:
stream.join()
if __name__ == '__main__':
main()