forked from Blinkenbunt/blup
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
168 lines
4.8 KiB
168 lines
4.8 KiB
8 years ago
|
#!/usr/bin/env python3
|
||
|
|
||
|
import threading
|
||
|
import socket
|
||
|
import time
|
||
|
import getopt
|
||
|
import sys
|
||
|
|
||
|
import blup.BLP
|
||
|
import blup.output
|
||
|
|
||
|
|
||
|
class BLPStreamReceiverThread(threading.Thread):
|
||
|
def __init__(self, addr, port, hub, protocol=blup.BLP.PROTO_BLP):
|
||
|
threading.Thread.__init__(self)
|
||
|
self.__addr = addr
|
||
|
self.__port = port
|
||
|
self.__hub = hub
|
||
|
self.__protocol = protocol
|
||
|
self.__running = False
|
||
|
|
||
|
def run(self):
|
||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
sock.bind((self.__addr, self.__port))
|
||
|
sock.settimeout(1)
|
||
|
|
||
|
self.__running = True
|
||
|
while self.__running:
|
||
|
try:
|
||
|
data, addr = sock.recvfrom(2048)
|
||
|
except socket.timeout:
|
||
|
continue
|
||
|
|
||
|
try:
|
||
|
frame = blup.BLP.blp2frame(data, protocol=self.__protocol)
|
||
|
except ValueError:
|
||
|
print('got an invalid frame from', addr)
|
||
|
else:
|
||
|
self.__hub.sendFrame(frame, self)
|
||
|
|
||
|
def terminate(self):
|
||
|
self.__running = False
|
||
|
|
||
|
class BLPHub():
|
||
|
def __init__(self, output):
|
||
|
self.__output = output
|
||
|
self.__streams = {}
|
||
|
self.__lock = threading.Lock()
|
||
|
self.__lastFrameTimeout = -1
|
||
|
self.__lastFramePriority = -1
|
||
|
|
||
|
def sendFrame(self, frame, stream):
|
||
|
with self.__lock:
|
||
|
if stream not in self.__streams.keys():
|
||
|
return False
|
||
|
|
||
|
priority = self.__streams[stream]['priority']
|
||
|
currentTime = int(time.time() * 1000)
|
||
|
|
||
|
accept = False
|
||
|
if priority >= self.__lastFramePriority:
|
||
|
accept = True
|
||
|
else:
|
||
|
if currentTime > self.__lastFrameTimeout:
|
||
|
accept = True
|
||
|
|
||
|
if accept:
|
||
|
self.__lastFramePriority = priority
|
||
|
self.__lastFrameTimeout = (
|
||
|
currentTime + self.__streams[stream]['timeout']
|
||
|
)
|
||
|
self.__output.sendFrame(frame)
|
||
|
return True
|
||
|
|
||
|
return False
|
||
|
|
||
|
def addStream(self, stream, priority, timeout):
|
||
|
if stream not in self.__streams.keys():
|
||
|
self.__streams[stream] = {'priority': priority, 'timeout': timeout}
|
||
|
|
||
|
def printUsage(errMsg=None):
|
||
|
if errMsg is not None:
|
||
|
print('error: %s\n' % (errMsg))
|
||
|
print('usage: %s [OPTIONS] INPUTSTREAM ...' % (sys.argv[0]))
|
||
|
print('where INPUTSTREAM is given as HOST:PORT:PRIORITY:TIMEOUT')
|
||
|
print('supported options:')
|
||
|
print(' -o OUTPUT where to put the received frames')
|
||
|
print(' --eblp use EBLP instead of BLP')
|
||
|
print(' --e3blp use E3BLP instead of BLP')
|
||
|
print(' -h print this text')
|
||
|
print(' --help')
|
||
|
|
||
|
def main():
|
||
|
(opts, args) = getopt.gnu_getopt(sys.argv, 'hs:o:',
|
||
|
['help', 'eblp', 'e3blp'])
|
||
|
opts = dict(opts)
|
||
|
|
||
|
if '--help' in opts or '-h' in opts:
|
||
|
printUsage()
|
||
|
sys.exit(0)
|
||
|
|
||
|
if '-o' in opts:
|
||
|
output = opts['-o']
|
||
|
else:
|
||
|
output = 'shell'
|
||
|
|
||
|
if '--eblp' in opts:
|
||
|
protocol = blup.BLP.PROTO_EBLP
|
||
|
elif '--e3blp' in opts:
|
||
|
protocol = blup.BLP.PROTO_E3BLP
|
||
|
else:
|
||
|
protocol = blup.BLP.PROTO_BLP
|
||
|
|
||
|
if '--eblp' in opts and '--e3blp' in opts:
|
||
|
printUsage('please only specify one of --eblp or --e3blp')
|
||
|
sys.exit(1)
|
||
|
|
||
|
streams = []
|
||
|
for arg in args[1:]:
|
||
|
try:
|
||
|
(host, port, priority, timeout) = arg.split(':')
|
||
|
port = int(port)
|
||
|
priority = int(priority)
|
||
|
timeout = int(timeout)
|
||
|
if port <= 0 or timeout < 0:
|
||
|
raise ValueError
|
||
|
except ValueError as e:
|
||
|
printUsage('illegal stream specification')
|
||
|
sys.exit(1)
|
||
|
streams.append((host, port, priority, timeout))
|
||
|
|
||
|
if len(streams) == 0:
|
||
|
printUsage('please specify at least one stream')
|
||
|
sys.exit(1)
|
||
|
|
||
|
try:
|
||
|
out = blup.output.getOutput(output)
|
||
|
except ValueError:
|
||
|
printUsage('could not initialize output')
|
||
|
sys.exit(1)
|
||
|
|
||
|
hub = BLPHub(out)
|
||
|
|
||
|
streamThreads = []
|
||
|
firstThread = None
|
||
|
for (host, port, priority, timeout) in streams:
|
||
|
stream = BLPStreamReceiverThread(host, port, hub, protocol)
|
||
|
hub.addStream(stream, priority, timeout)
|
||
|
if firstThread is None:
|
||
|
firstThread = stream
|
||
|
else:
|
||
|
streamThreads.append(stream)
|
||
|
stream.start()
|
||
|
|
||
|
try:
|
||
|
firstThread.run()
|
||
|
except KeyboardInterrupt:
|
||
|
if len(streamThreads) > 0:
|
||
|
print('waiting for listeners to terminate...')
|
||
|
for stream in streamThreads:
|
||
|
stream.terminate()
|
||
|
for stream in streamThreads:
|
||
|
stream.join()
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|
||
|
|