You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
444 lines
13 KiB
444 lines
13 KiB
""" |
|
This module is part of the 'blup' package and provides several output modules |
|
to send frames as well as a mechanism to create them from a string |
|
specification. |
|
|
|
""" |
|
|
|
import re |
|
import serial |
|
import socket |
|
|
|
from blup import BLP |
|
|
|
|
|
# |
|
# ---- exceptions ---- |
|
# |
|
|
|
class IllegalOutputSpecificationError(Exception): |
|
""" |
|
Will be thrown when a string is given to getOutput (see below) that is not |
|
matched by any output modules regex. |
|
|
|
""" |
|
def __str__(self): |
|
return __class__.__name__ |
|
|
|
|
|
# |
|
# ---- generic output class to be subclassed ---- |
|
# |
|
|
|
class Output(object): |
|
def sendFrame(self, frame): |
|
raise Exception('not implemented - just a dummy class') |
|
|
|
|
|
# |
|
# ---- MCUFOutput (MicroController Unit Frame) ---- |
|
# |
|
|
|
class MCUFOutput(Output): |
|
""" |
|
This output module allows to send frames to blinkendevices over a serial |
|
line using the MCUF protocol. |
|
|
|
""" |
|
|
|
moduleRegexDesc = 'mcuf:DEVICE:BAUDRATE[:DEPTH]' |
|
moduleRegex = '^mcuf:([^:]+):(\d+)(:(\d+))?$' |
|
|
|
def __init__(self, port, baudrate, depth=0): |
|
""" |
|
Initialize the module with given port and baudrate. If depth is |
|
specified, every frame will be transformed, if it not already has the |
|
correct depth. |
|
|
|
""" |
|
|
|
self.__port = port |
|
self.__baudrate = baudrate |
|
self.__ser = serial.Serial(port=port, baudrate=baudrate) |
|
self.__depth = depth |
|
|
|
@classmethod |
|
def fromRegexMatch(cls, regexMatch): |
|
""" Create an instance from a RegexMatch (matched with moduleRegex) """ |
|
port = regexMatch.group(1) |
|
baudrate = int(regexMatch.group(2)) |
|
if regexMatch.group(4): |
|
depth = int(regexMatch.group(4)) |
|
else: |
|
depth = 0 |
|
return cls(port, baudrate, depth) |
|
|
|
def sendFrame(self, frame): |
|
""" Pass a frame to the module. """ |
|
(w, h) = frame.size |
|
|
|
if self.__depth != 0: |
|
if frame.depth != self.__depth: |
|
frame = frame.transform(self.__depth) |
|
maxval = frame.depth - 1 |
|
|
|
framedata = '' |
|
for y in range(h): |
|
for x in range(w): |
|
framedata += chr(frame.getPixel(x, y)) |
|
|
|
packet = '\x23\x54\x26\x66' |
|
packet += '%s%s%s%s' % (chr(h>>8), chr(h & 0xff), |
|
chr(w>>8), chr(w & 0xff)) |
|
packet += '\x00\x01\x00%s' % (chr(maxval)) |
|
packet += framedata |
|
|
|
self.__ser.write(packet) |
|
|
|
return True |
|
|
|
|
|
# |
|
# ---- SerialBlupOutput ---- |
|
# |
|
|
|
class SerialBlupOutput(Output): |
|
""" |
|
This output module allows sending frames to 18x8 blinkendevices with |
|
depth 2 over a serial line, by sending every column as a byte. |
|
|
|
""" |
|
|
|
moduleRegexDesc = 'serialblup:DEVICE:BAUDRATE' |
|
moduleRegex = '^serialblup:([^:]+):(\d+)$' |
|
|
|
def __init__(self, port, baudrate): |
|
""" Initialize the output module with the given port. """ |
|
self.__port = port |
|
self.__baudrate = baudrate |
|
self.__ser = serial.Serial(port=port, baudrate=baudrate) |
|
self.__stx2 = 'a' |
|
self.__etx2 = 'b' |
|
self.__stx8 = 'q' |
|
self.__etx8 = 'b' |
|
self.__stxRGB = chr(0xca) |
|
self.__etxRGB = chr(0xfe) |
|
|
|
@classmethod |
|
def fromRegexMatch(cls, regexMatch): |
|
""" Create an instance from a RegexMatch (matched with moduleRegex) """ |
|
port = regexMatch.group(1) |
|
baudrate = int(regexMatch.group(2)) |
|
return cls(port, baudrate) |
|
|
|
def sendFrame(self, frame): |
|
""" Pass a frame to the module. """ |
|
if frame.depth not in [2, 16] and frame.channels == 1: |
|
raise ValueError('%s only supports frames of depth 2 or 16' % |
|
(self.__class__.__name__)) |
|
|
|
if frame.channels not in [1, 3]: |
|
raise ValueError('%s only supports frames with 1 or 3 channels' % |
|
(self.__class__.__name__)) |
|
|
|
if frame.size != (18, 8): |
|
raise ValueError('%s currently only supports 18x8 frames' % |
|
(self.__class__.__name__)) |
|
|
|
if frame.channels == 1 and frame.depth == 2: |
|
bytes = [ 0 ] * 18 |
|
for x in range(18): |
|
for y in range(8): |
|
if frame.getPixel(x, y) == 1: |
|
bytes[x] += 2**(7 - y) |
|
|
|
packet = self.__stx2 + ''.join(map(lambda b: chr(b), bytes)) + self.__etx2 |
|
elif frame.channels == 1 and frame.depth == 16: |
|
bytes = [ 0 ] * (18*4) |
|
for x in range(18): |
|
for y in range(8): |
|
pixelval = frame.getPixel(x, y) |
|
pixelnum = y*18 + x |
|
bytes[pixelnum/2] += (pixelval << ((pixelnum % 2) * 4)) |
|
|
|
packet = self.__stx8 + ''.join(map(lambda b: chr(b), bytes)) + self.__etx8 |
|
elif frame.channels == 3: |
|
bytes = [ 0 ] * (18*8*3) |
|
for y in range(8): |
|
for x in range(18): |
|
(r,g,b) = frame.getPixel(x, y) |
|
pixelnum = y*18 + x |
|
bytes[3*pixelnum] = r |
|
bytes[3*pixelnum + 1] = g |
|
bytes[3*pixelnum + 2] = b |
|
packet = self.__stxRGB + ''.join(map(lambda b: chr(b), bytes)) + self.__etxRGB |
|
|
|
self.__ser.write(packet) |
|
|
|
return True |
|
|
|
# |
|
# ---- ShellOuput ---- |
|
# |
|
|
|
class ShellOutput(Output): |
|
""" This output module prints frames with depth 2 on the terminal. """ |
|
|
|
moduleRegexDesc = 'shell[:ON_CHAR:OFF_CHAR]' |
|
moduleRegex = '^shell(:(.):(.))?$' |
|
|
|
def __init__(self, onChar='#', offChar=' '): |
|
""" Initialize the output. """ |
|
self.__onChar = onChar |
|
self.__offChar = offChar |
|
self.__initialized = False |
|
|
|
@classmethod |
|
def fromRegexMatch(cls, regexMatch): |
|
""" Create an instance from a RegexMatch (matched with moduleRegex) """ |
|
if regexMatch.group(2) is not None: |
|
return cls(regexMatch.group(2), regexMatch.group(3)) |
|
else: |
|
return cls() |
|
|
|
def sendFrame(self, frame): |
|
""" Pass a frame to the module. """ |
|
if frame.depth != 2: |
|
raise ValueError('%s only supports frames of depth 2' % |
|
(self.__class__.__name__)) |
|
|
|
(width, height) = frame.size |
|
|
|
if not self.__initialized: |
|
self.__initialized = True |
|
print('\n' * height) |
|
|
|
print('\x1b[%dA' % (height + 2)) |
|
for y in range(height): |
|
line = '' |
|
for x in range(width): |
|
if frame.getPixel(x, y) == 1: |
|
line += self.__onChar |
|
else: |
|
line += self.__offChar |
|
print(line) |
|
print('') |
|
|
|
return True |
|
|
|
# |
|
# ---- ColorfulShellOuput ---- |
|
# |
|
|
|
class ColorfulShellOutput(Output): |
|
""" This output module prints frames on the terminal, using colors. """ |
|
|
|
moduleRegexDesc = 'colorfulshell' |
|
moduleRegex = '^colorfulshell?$' |
|
|
|
def __init__(self): |
|
""" Initialize the output. """ |
|
self.__initialized = False |
|
|
|
@classmethod |
|
def fromRegexMatch(cls, regexMatch): |
|
""" Create an instance from a RegexMatch (matched with moduleRegex) """ |
|
return cls() |
|
|
|
def sendFrame(self, frame): |
|
""" Pass a frame to the module. """ |
|
if frame.channels not in [1, 3]: |
|
raise ValueError('%s only supports frames with 1 or 3 channels' % |
|
(self.__class__.__name__)) |
|
|
|
(width, height) = frame.size |
|
|
|
if not self.__initialized: |
|
self.__initialized = True |
|
print('\n' * height) |
|
|
|
print('\x1b[%dA' % (height + 2)) |
|
for y in range(height): |
|
line = '' |
|
for x in range(width): |
|
if frame.channels == 3: |
|
(r,g,b) = frame.getPixel(x, y) |
|
if frame.depth != 5: |
|
factor = 5 / ((frame.depth - 1) * 1.0) |
|
r = round(r*factor) |
|
g = round(g*factor) |
|
b = round(b*factor) |
|
colorval = 36*r + 6*g + b + 16 |
|
|
|
line += '\033[48;05;%dm ' % (colorval) |
|
elif frame.channels == 1: |
|
colorval = frame.getPixel(x, y) |
|
if frame.depth != 24: |
|
factor = (23 / ((frame.depth - 1) * 1.0)) |
|
colorval = 232 + colorval * factor |
|
|
|
line += '\033[48;05;%dm ' % (colorval) |
|
print(line) |
|
print('') |
|
|
|
return True |
|
# |
|
# ---- BLPOutput ---- |
|
# |
|
|
|
class BLPOutput(Output): |
|
""" |
|
This output module sends frames over the network using the UDP-based BLP |
|
protocol. |
|
|
|
""" |
|
|
|
moduleRegexDesc = 'blp[:HOST:PORT]' |
|
moduleRegex = '^(blp|eblp|e3blp)(:(.+):(\d+))?$' |
|
|
|
def __init__(self, host='127.0.0.1', port=4242, protocol=BLP.PROTO_BLP): |
|
""" Initialize the output. Frames are sent to host:port. """ |
|
self.__host = host |
|
self.__port = port |
|
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
self.__protocol = protocol |
|
|
|
@classmethod |
|
def fromRegexMatch(cls, regexMatch): |
|
""" Create an instance from a RegexMatch (matched with moduleRegex) """ |
|
if regexMatch.group(1) == 'blp': |
|
protocol = BLP.PROTO_BLP |
|
elif regexMatch.group(1) == 'eblp': |
|
protocol = BLP.PROTO_EBLP |
|
elif regexMatch.group(1) == 'e3blp': |
|
protocol = BLP.PROTO_E3BLP |
|
|
|
if regexMatch.group(3) and regexMatch.group(4): |
|
host = regexMatch.group(3) |
|
port = int(regexMatch.group(4)) |
|
return cls(host, port, protocol) |
|
else: |
|
return cls(protocol=protocol) |
|
|
|
def sendFrame(self, frame): |
|
""" Pass a frame to the module. """ |
|
packet = BLP.frame2blp(frame, protocol=self.__protocol) |
|
self.__sock.sendto(packet, (self.__host, self.__port)) |
|
return True |
|
|
|
# |
|
# ---- BlinkenbuntHDOutput ---- |
|
# |
|
|
|
class BlinkenbuntHDOutput(Output): |
|
moduleRegexDesc = 'bbunthd[:BRIGHTNESS]' |
|
moduleRegex = '^bbunthd(:(?P<brightness>\d+))?$' |
|
|
|
def __init__(self, brightness): |
|
import neopixel |
|
self.w = 22 |
|
self.h = 16 |
|
LED_COUNT = self.w * self.h |
|
LED_PIN = 13 |
|
LED_FREQ_HZ = 800000 |
|
LED_DMA = 5 |
|
LED_INVERT = False |
|
LED_PWM = 1 |
|
|
|
self.strip = neopixel.Adafruit_NeoPixel( |
|
LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, |
|
brightness, LED_PWM |
|
) |
|
self.strip.begin() |
|
self._color_cls = neopixel.Color |
|
|
|
@classmethod |
|
def fromRegexMatch(cls, regexMatch): |
|
b = int(regexMatch.groupdict().get('brightness', 50)) |
|
return cls(brightness=b) |
|
|
|
def sendFrame(self, frame): |
|
for y in range(self.h): |
|
for x in range(self.w): |
|
if x%2 == 0: |
|
lednum = x*self.h + y |
|
else: |
|
lednum = x*self.h - y + 15 |
|
pix = frame.getPixel(self.w - x - 1, self.h - y - 1) |
|
r, g, b = pix |
|
self.strip.setPixelColor(lednum, self._color_cls(g, r, b)) |
|
self.strip.show() |
|
|
|
# |
|
# ---- PygameOutput ---- |
|
# |
|
|
|
class PygameOutput(Output): |
|
moduleRegexDesc = 'pygame' |
|
moduleRegex = '^pygame$' |
|
|
|
def __init__(self, width=22, height=16, scalex=8, scaley=8): |
|
self.width = width |
|
self.height = height |
|
self.scalex = scalex |
|
self.scaley = scaley |
|
import pygame |
|
self.__pygame = pygame |
|
self.screen = pygame.display.set_mode((width*scalex, height*scaley)) |
|
pygame.display.update() |
|
|
|
@classmethod |
|
def fromRegexMatch(cls, regexMatch): |
|
return cls() |
|
|
|
def sendFrame(self, frame): |
|
pygame = self.__pygame |
|
for y in range(self.height): |
|
for x in range(self.width): |
|
rect = (x*self.scalex, y*self.scaley, self.scalex, self.scaley) |
|
pygame.draw.rect(self.screen, frame.getPixel(x, y), rect, 0) |
|
pygame.display.update() |
|
#import time |
|
#time.sleep(0.1) |
|
|
|
|
|
|
|
# |
|
# ---- output creator ;) ---- |
|
# |
|
|
|
__outputModules = { |
|
SerialBlupOutput.moduleRegex: SerialBlupOutput, |
|
MCUFOutput.moduleRegex: MCUFOutput, |
|
ShellOutput.moduleRegex: ShellOutput, |
|
ColorfulShellOutput.moduleRegex: ColorfulShellOutput, |
|
BLPOutput.moduleRegex: BLPOutput, |
|
BlinkenbuntHDOutput.moduleRegex: BlinkenbuntHDOutput, |
|
PygameOutput.moduleRegex: PygameOutput, |
|
} |
|
|
|
def getOutput(outputSpec): |
|
""" |
|
Return an output module instance described by the given specification. |
|
|
|
""" |
|
|
|
for regex in __outputModules.keys(): |
|
m = re.match(regex, outputSpec) |
|
if m: |
|
return __outputModules[regex].fromRegexMatch(regexMatch=m) |
|
|
|
raise IllegalOutputSpecificationError() |
|
|
|
def getOutputDescriptions(): |
|
""" |
|
Return a string containing the moduleRegexDesc of every available output |
|
module. |
|
|
|
""" |
|
|
|
s = "" |
|
for outputModule in __outputModules.values(): |
|
s += " %s\n" % (outputModule.moduleRegexDesc) |
|
return s |
|
|
|
|