""" This is a BLP and EBLP implementation as described here: https://wiki.blinkenarea.org/index.php/BlinkenlightsProtocolEnglish """ import threading import socket import struct from .frame import Frame from .frame import FrameDimension BLP_MAGIC = b'\xde\xad\xbe\xef' EBLP_MAGIC = b'\xfe\xed\xbe\xef' E3BLP_MAGIC = b'\xca\xfe\xbe\xef' PROTO_BLP = 0 PROTO_EBLP = 1 PROTO_E3BLP = 2 PROTOCOLS = [PROTO_BLP, PROTO_EBLP, PROTO_E3BLP] def frame2blp(frame, protocol=PROTO_BLP, depth=16): if protocol not in PROTOCOLS: raise ValueError('Unknown protocol: %d' % (protocol)) if frame.depth != 2 and protocol == PROTO_BLP: raise ValueError('BLP does not support greyscale.') if frame.channels != 3 and protocol == PROTO_E3BLP: raise ValueError('E3BLP does only support 3-channel frames.') (w, h) = frame.size # magic value if protocol == PROTO_BLP: packet = BLP_MAGIC elif protocol == PROTO_EBLP: packet = EBLP_MAGIC elif protocol == PROTO_E3BLP: packet = E3BLP_MAGIC # frame number, not yet implemented packet += b'\x00\x00\x00\x00' # frame dimension packet += struct.pack('>HH', w, h) for y in range(h): for x in range(w): if protocol == PROTO_BLP: if frame.getPixel(x, y) == 1: packet += b'\x01' else: packet += b'\x00' elif protocol == PROTO_EBLP: packet += struct.pack('B', frame.getPixel(x, y)) elif protocol == PROTO_E3BLP: (r,g,b) = frame.getPixel(x, y) packet += struct.pack('BBB', r, g, b) return packet def blp2frame(packet, protocol=PROTO_BLP, depth=None): if protocol not in PROTOCOLS: raise ValueError('Unknown protocol: %d' % (protocol)) if len(packet) < 13: raise ValueError('packet is too short') if ( (packet[0:4] != BLP_MAGIC and protocol == PROTO_BLP) or (packet[0:4] != EBLP_MAGIC and protocol == PROTO_EBLP) or (packet[0:4] != E3BLP_MAGIC and protocol == PROTO_E3BLP) ): raise ValueError('MAGIC does not match') w = (packet[8]<<8) + packet[9] h = (packet[10]<<8) + packet[11] if ( (protocol == PROTO_E3BLP and len(packet) != (w*h*3 + 12)) or (protocol in [PROTO_BLP, PROTO_EBLP] and len(packet) != (w*h + 12)) ): print(len(packet)) raise ValueError('packet size does not match') if protocol == PROTO_BLP: frm = Frame(FrameDimension(w, h, 2, 1)) elif protocol == PROTO_EBLP: if depth is None: depth = 16 frm = Frame(FrameDimension(w, h, depth, 1)) elif protocol == PROTO_E3BLP: if depth is None: depth = 8 frm = Frame(FrameDimension(w, h, depth, 3)) pixels = [] for y in range(h): row = [] for x in range(w): if protocol == PROTO_BLP: if packet[12 + y*w + x] == '\x01': #frm.setPixel(x, y, 1) row.append(1) else: #frm.setPixel(x, y, 0) row.append(0) elif protocol == PROTO_EBLP: row.append(packet[12 + y*w + x]) elif protocol == PROTO_E3BLP: pos = 12 + (y*w + x) * 3 r = packet[pos] g = packet[pos + 1] b = packet[pos + 2] row.append((r,g,b)) pixels.append(row) frm.pixels = pixels return frm class BLPServer: def __init__(self, addr='127.0.0.1', port=4242, output=None, protocol=PROTO_BLP): if protocol not in PROTOCOLS: raise ValueError('Unknown protocol: %d' % (protocol)) self.__addr = addr self.__port = port self.__output = output self.__sock = None self.__protocol = protocol if not output: raise ValueError('serving without output isn\'t supported, yet') def bind(self): self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.__sock.bind((self.__addr, self.__port)) def handle(self): if self.__sock is None: return data, addr = self.__sock.recvfrom(1024) try: frame = blp2frame(data, protocol=self.__protocol) except ValueError as e: print('got an invalid frame from', addr) print(e) return else: self.__output.sendFrame(frame) def serve(self): self.bind() while True: self.handle() class BLPServerThread(threading.Thread, BLPServer): def __init__(self, addr, port, output, protocol=PROTO_BLP): if protocol not in PROTOCOLS: raise ValueError('Unknown protocol: %d' % (protocol)) threading.Thread.__init__(self) BLPServer.__init__(self, addr, port, output, protocol) self.__running = False def run(self): self.bind() self.__running = True while self.__running: self.handle() def serve(self): self.start() def terminate(self): self.__running = False