You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
174 lines
4.3 KiB
174 lines
4.3 KiB
""" |
|
This is a BLP and EBLP implementation as described here: |
|
https://wiki.blinkenarea.org/index.php/BlinkenlightsProtocolEnglish |
|
|
|
""" |
|
|
|
import threading |
|
import socket |
|
import struct |
|
|
|
from .frame import Frame |
|
from .frame import FrameDimension |
|
|
|
BLP_MAGIC = b'\xde\xad\xbe\xef' |
|
EBLP_MAGIC = b'\xfe\xed\xbe\xef' |
|
E3BLP_MAGIC = b'\xca\xfe\xbe\xef' |
|
|
|
PROTO_BLP = 0 |
|
PROTO_EBLP = 1 |
|
PROTO_E3BLP = 2 |
|
PROTOCOLS = [PROTO_BLP, PROTO_EBLP, PROTO_E3BLP] |
|
|
|
def frame2blp(frame, protocol=PROTO_BLP, depth=16): |
|
if protocol not in PROTOCOLS: |
|
raise ValueError('Unknown protocol: %d' % (protocol)) |
|
if frame.depth != 2 and protocol == PROTO_BLP: |
|
raise ValueError('BLP does not support greyscale.') |
|
if frame.channels != 3 and protocol == PROTO_E3BLP: |
|
raise ValueError('E3BLP does only support 3-channel frames.') |
|
|
|
(w, h) = frame.size |
|
|
|
# magic value |
|
if protocol == PROTO_BLP: |
|
packet = BLP_MAGIC |
|
elif protocol == PROTO_EBLP: |
|
packet = EBLP_MAGIC |
|
elif protocol == PROTO_E3BLP: |
|
packet = E3BLP_MAGIC |
|
|
|
# frame number, not yet implemented |
|
packet += b'\x00\x00\x00\x00' |
|
|
|
# frame dimension |
|
packet += struct.pack('>HH', w, h) |
|
|
|
for y in range(h): |
|
for x in range(w): |
|
if protocol == PROTO_BLP: |
|
if frame.getPixel(x, y) == 1: |
|
packet += b'\x01' |
|
else: |
|
packet += b'\x00' |
|
elif protocol == PROTO_EBLP: |
|
packet += struct.pack('B', frame.getPixel(x, y)) |
|
elif protocol == PROTO_E3BLP: |
|
(r,g,b) = frame.getPixel(x, y) |
|
packet += struct.pack('BBB', r, g, b) |
|
|
|
return packet |
|
|
|
def blp2frame(packet, protocol=PROTO_BLP, depth=None): |
|
if protocol not in PROTOCOLS: |
|
raise ValueError('Unknown protocol: %d' % (protocol)) |
|
if len(packet) < 13: |
|
raise ValueError('packet is too short') |
|
if ( (packet[0:4] != BLP_MAGIC and protocol == PROTO_BLP) or |
|
(packet[0:4] != EBLP_MAGIC and protocol == PROTO_EBLP) or |
|
(packet[0:4] != E3BLP_MAGIC and protocol == PROTO_E3BLP) ): |
|
raise ValueError('MAGIC does not match') |
|
|
|
w = (packet[8]<<8) + packet[9] |
|
h = (packet[10]<<8) + packet[11] |
|
|
|
if ( (protocol == PROTO_E3BLP and len(packet) != (w*h*3 + 12)) or |
|
(protocol in [PROTO_BLP, PROTO_EBLP] and len(packet) != (w*h + 12)) ): |
|
print(len(packet)) |
|
raise ValueError('packet size does not match') |
|
|
|
if protocol == PROTO_BLP: |
|
frm = Frame(FrameDimension(w, h, 2, 1)) |
|
elif protocol == PROTO_EBLP: |
|
if depth is None: |
|
depth = 16 |
|
frm = Frame(FrameDimension(w, h, depth, 1)) |
|
elif protocol == PROTO_E3BLP: |
|
if depth is None: |
|
depth = 8 |
|
frm = Frame(FrameDimension(w, h, depth, 3)) |
|
|
|
pixels = [] |
|
for y in range(h): |
|
row = [] |
|
for x in range(w): |
|
if protocol == PROTO_BLP: |
|
if packet[12 + y*w + x] == '\x01': |
|
#frm.setPixel(x, y, 1) |
|
row.append(1) |
|
else: |
|
#frm.setPixel(x, y, 0) |
|
row.append(0) |
|
elif protocol == PROTO_EBLP: |
|
row.append(packet[12 + y*w + x]) |
|
elif protocol == PROTO_E3BLP: |
|
pos = 12 + (y*w + x) * 3 |
|
r = packet[pos] |
|
g = packet[pos + 1] |
|
b = packet[pos + 2] |
|
row.append((r,g,b)) |
|
pixels.append(row) |
|
frm.pixels = pixels |
|
|
|
return frm |
|
|
|
class BLPServer: |
|
def __init__(self, addr='127.0.0.1', port=4242, output=None, |
|
protocol=PROTO_BLP): |
|
if protocol not in PROTOCOLS: |
|
raise ValueError('Unknown protocol: %d' % (protocol)) |
|
|
|
self.__addr = addr |
|
self.__port = port |
|
self.__output = output |
|
self.__sock = None |
|
self.__protocol = protocol |
|
|
|
if not output: |
|
raise ValueError('serving without output isn\'t supported, yet') |
|
|
|
def bind(self): |
|
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
self.__sock.bind((self.__addr, self.__port)) |
|
|
|
def handle(self): |
|
if self.__sock is None: |
|
return |
|
|
|
data, addr = self.__sock.recvfrom(1024) |
|
try: |
|
frame = blp2frame(data, protocol=self.__protocol) |
|
except ValueError as e: |
|
print('got an invalid frame from', addr) |
|
print(e) |
|
return |
|
else: |
|
self.__output.sendFrame(frame) |
|
|
|
def serve(self): |
|
self.bind() |
|
while True: |
|
self.handle() |
|
|
|
class BLPServerThread(threading.Thread, BLPServer): |
|
def __init__(self, addr, port, output, protocol=PROTO_BLP): |
|
if protocol not in PROTOCOLS: |
|
raise ValueError('Unknown protocol: %d' % (protocol)) |
|
|
|
threading.Thread.__init__(self) |
|
BLPServer.__init__(self, addr, port, output, protocol) |
|
self.__running = False |
|
|
|
def run(self): |
|
self.bind() |
|
self.__running = True |
|
while self.__running: self.handle() |
|
|
|
def serve(self): |
|
self.start() |
|
|
|
def terminate(self): |
|
self.__running = False |
|
|
|
|
|
|
|
|