You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

444 lines
13 KiB

"""
This module is part of the 'blup' package and provides several output modules
to send frames as well as a mechanism to create them from a string
specification.
"""
import re
import serial
import socket
from blup import BLP
#
# ---- exceptions ----
#
class IllegalOutputSpecificationError(Exception):
"""
Will be thrown when a string is given to getOutput (see below) that is not
matched by any output modules regex.
"""
def __str__(self):
return __class__.__name__
#
# ---- generic output class to be subclassed ----
#
class Output(object):
def sendFrame(self, frame):
raise Exception('not implemented - just a dummy class')
#
# ---- MCUFOutput (MicroController Unit Frame) ----
#
class MCUFOutput(Output):
"""
This output module allows to send frames to blinkendevices over a serial
line using the MCUF protocol.
"""
moduleRegexDesc = 'mcuf:DEVICE:BAUDRATE[:DEPTH]'
moduleRegex = '^mcuf:([^:]+):(\d+)(:(\d+))?$'
def __init__(self, port, baudrate, depth=0):
"""
Initialize the module with given port and baudrate. If depth is
specified, every frame will be transformed, if it not already has the
correct depth.
"""
self.__port = port
self.__baudrate = baudrate
self.__ser = serial.Serial(port=port, baudrate=baudrate)
self.__depth = depth
@classmethod
def fromRegexMatch(cls, regexMatch):
""" Create an instance from a RegexMatch (matched with moduleRegex) """
port = regexMatch.group(1)
baudrate = int(regexMatch.group(2))
if regexMatch.group(4):
depth = int(regexMatch.group(4))
else:
depth = 0
return cls(port, baudrate, depth)
def sendFrame(self, frame):
""" Pass a frame to the module. """
(w, h) = frame.size
if self.__depth != 0:
if frame.depth != self.__depth:
frame = frame.transform(self.__depth)
maxval = frame.depth - 1
framedata = ''
for y in range(h):
for x in range(w):
framedata += chr(frame.getPixel(x, y))
packet = '\x23\x54\x26\x66'
packet += '%s%s%s%s' % (chr(h>>8), chr(h & 0xff),
chr(w>>8), chr(w & 0xff))
packet += '\x00\x01\x00%s' % (chr(maxval))
packet += framedata
self.__ser.write(packet)
return True
#
# ---- SerialBlupOutput ----
#
class SerialBlupOutput(Output):
"""
This output module allows sending frames to 18x8 blinkendevices with
depth 2 over a serial line, by sending every column as a byte.
"""
moduleRegexDesc = 'serialblup:DEVICE:BAUDRATE'
moduleRegex = '^serialblup:([^:]+):(\d+)$'
def __init__(self, port, baudrate):
""" Initialize the output module with the given port. """
self.__port = port
self.__baudrate = baudrate
self.__ser = serial.Serial(port=port, baudrate=baudrate)
self.__stx2 = 'a'
self.__etx2 = 'b'
self.__stx8 = 'q'
self.__etx8 = 'b'
self.__stxRGB = chr(0xca)
self.__etxRGB = chr(0xfe)
@classmethod
def fromRegexMatch(cls, regexMatch):
""" Create an instance from a RegexMatch (matched with moduleRegex) """
port = regexMatch.group(1)
baudrate = int(regexMatch.group(2))
return cls(port, baudrate)
def sendFrame(self, frame):
""" Pass a frame to the module. """
if frame.depth not in [2, 16] and frame.channels == 1:
raise ValueError('%s only supports frames of depth 2 or 16' %
(self.__class__.__name__))
if frame.channels not in [1, 3]:
raise ValueError('%s only supports frames with 1 or 3 channels' %
(self.__class__.__name__))
if frame.size != (18, 8):
raise ValueError('%s currently only supports 18x8 frames' %
(self.__class__.__name__))
if frame.channels == 1 and frame.depth == 2:
bytes = [ 0 ] * 18
for x in range(18):
for y in range(8):
if frame.getPixel(x, y) == 1:
bytes[x] += 2**(7 - y)
packet = self.__stx2 + ''.join(map(lambda b: chr(b), bytes)) + self.__etx2
elif frame.channels == 1 and frame.depth == 16:
bytes = [ 0 ] * (18*4)
for x in range(18):
for y in range(8):
pixelval = frame.getPixel(x, y)
pixelnum = y*18 + x
bytes[pixelnum/2] += (pixelval << ((pixelnum % 2) * 4))
packet = self.__stx8 + ''.join(map(lambda b: chr(b), bytes)) + self.__etx8
elif frame.channels == 3:
bytes = [ 0 ] * (18*8*3)
for y in range(8):
for x in range(18):
(r,g,b) = frame.getPixel(x, y)
pixelnum = y*18 + x
bytes[3*pixelnum] = r
bytes[3*pixelnum + 1] = g
bytes[3*pixelnum + 2] = b
packet = self.__stxRGB + ''.join(map(lambda b: chr(b), bytes)) + self.__etxRGB
self.__ser.write(packet)
return True
#
# ---- ShellOuput ----
#
class ShellOutput(Output):
""" This output module prints frames with depth 2 on the terminal. """
moduleRegexDesc = 'shell[:ON_CHAR:OFF_CHAR]'
moduleRegex = '^shell(:(.):(.))?$'
def __init__(self, onChar='#', offChar=' '):
""" Initialize the output. """
self.__onChar = onChar
self.__offChar = offChar
self.__initialized = False
@classmethod
def fromRegexMatch(cls, regexMatch):
""" Create an instance from a RegexMatch (matched with moduleRegex) """
if regexMatch.group(2) is not None:
return cls(regexMatch.group(2), regexMatch.group(3))
else:
return cls()
def sendFrame(self, frame):
""" Pass a frame to the module. """
if frame.depth != 2:
raise ValueError('%s only supports frames of depth 2' %
(self.__class__.__name__))
(width, height) = frame.size
if not self.__initialized:
self.__initialized = True
print('\n' * height)
print('\x1b[%dA' % (height + 2))
for y in range(height):
line = ''
for x in range(width):
if frame.getPixel(x, y) == 1:
line += self.__onChar
else:
line += self.__offChar
print(line)
print('')
return True
#
# ---- ColorfulShellOuput ----
#
class ColorfulShellOutput(Output):
""" This output module prints frames on the terminal, using colors. """
moduleRegexDesc = 'colorfulshell'
moduleRegex = '^colorfulshell?$'
def __init__(self):
""" Initialize the output. """
self.__initialized = False
@classmethod
def fromRegexMatch(cls, regexMatch):
""" Create an instance from a RegexMatch (matched with moduleRegex) """
return cls()
def sendFrame(self, frame):
""" Pass a frame to the module. """
if frame.channels not in [1, 3]:
raise ValueError('%s only supports frames with 1 or 3 channels' %
(self.__class__.__name__))
(width, height) = frame.size
if not self.__initialized:
self.__initialized = True
print('\n' * height)
print('\x1b[%dA' % (height + 2))
for y in range(height):
line = ''
for x in range(width):
if frame.channels == 3:
(r,g,b) = frame.getPixel(x, y)
if frame.depth != 5:
factor = 5 / ((frame.depth - 1) * 1.0)
r = round(r*factor)
g = round(g*factor)
b = round(b*factor)
colorval = 36*r + 6*g + b + 16
line += '\033[48;05;%dm ' % (colorval)
elif frame.channels == 1:
colorval = frame.getPixel(x, y)
if frame.depth != 24:
factor = (23 / ((frame.depth - 1) * 1.0))
colorval = 232 + colorval * factor
line += '\033[48;05;%dm ' % (colorval)
print(line)
print('')
return True
#
# ---- BLPOutput ----
#
class BLPOutput(Output):
"""
This output module sends frames over the network using the UDP-based BLP
protocol.
"""
moduleRegexDesc = 'blp[:HOST:PORT]'
moduleRegex = '^(blp|eblp|e3blp)(:(.+):(\d+))?$'
def __init__(self, host='127.0.0.1', port=4242, protocol=BLP.PROTO_BLP):
""" Initialize the output. Frames are sent to host:port. """
self.__host = host
self.__port = port
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__protocol = protocol
@classmethod
def fromRegexMatch(cls, regexMatch):
""" Create an instance from a RegexMatch (matched with moduleRegex) """
if regexMatch.group(1) == 'blp':
protocol = BLP.PROTO_BLP
elif regexMatch.group(1) == 'eblp':
protocol = BLP.PROTO_EBLP
elif regexMatch.group(1) == 'e3blp':
protocol = BLP.PROTO_E3BLP
if regexMatch.group(3) and regexMatch.group(4):
host = regexMatch.group(3)
port = int(regexMatch.group(4))
return cls(host, port, protocol)
else:
return cls(protocol=protocol)
def sendFrame(self, frame):
""" Pass a frame to the module. """
packet = BLP.frame2blp(frame, protocol=self.__protocol)
self.__sock.sendto(packet, (self.__host, self.__port))
return True
#
# ---- BlinkenbuntHDOutput ----
#
class BlinkenbuntHDOutput(Output):
moduleRegexDesc = 'bbunthd'
moduleRegex = '^bbunthd$'
def __init__(self):
import neopixel
self.w = 22
self.h = 16
LED_COUNT = self.w * self.h
LED_PIN = 13
LED_FREQ_HZ = 800000
LED_DMA = 5
LED_BRIGHTNESS = 50
LED_INVERT = False
LED_PWM = 1
self.strip = neopixel.Adafruit_NeoPixel(
LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT,
LED_BRIGHTNESS, LED_PWM
)
self.strip.begin()
self._color_cls = neopixel.Color
@classmethod
def fromRegexMatch(cls, regexMatch):
return cls()
def sendFrame(self, frame):
for y in range(self.h):
for x in range(self.w):
if x%2 == 0:
lednum = x*self.h + y
else:
lednum = x*self.h - y + 15
pix = frame.getPixel(self.w - x - 1, self.h - y - 1)
r, g, b = pix
self.strip.setPixelColor(lednum, self._color_cls(g, r, b))
self.strip.show()
#
# ---- PygameOutput ----
#
class PygameOutput(Output):
moduleRegexDesc = 'pygame'
moduleRegex = '^pygame$'
def __init__(self, width=22, height=16, scalex=8, scaley=8):
self.width = width
self.height = height
self.scalex = scalex
self.scaley = scaley
import pygame
self.__pygame = pygame
self.screen = pygame.display.set_mode((width*scalex, height*scaley))
pygame.display.update()
@classmethod
def fromRegexMatch(cls, regexMatch):
return cls()
def sendFrame(self, frame):
pygame = self.__pygame
for y in range(self.height):
for x in range(self.width):
rect = (x*self.scalex, y*self.scaley, self.scalex, self.scaley)
pygame.draw.rect(self.screen, frame.getPixel(x, y), rect, 0)
pygame.display.update()
#import time
#time.sleep(0.1)
#
# ---- output creator ;) ----
#
__outputModules = {
SerialBlupOutput.moduleRegex: SerialBlupOutput,
MCUFOutput.moduleRegex: MCUFOutput,
ShellOutput.moduleRegex: ShellOutput,
ColorfulShellOutput.moduleRegex: ColorfulShellOutput,
BLPOutput.moduleRegex: BLPOutput,
BlinkenbuntHDOutput.moduleRegex: BlinkenbuntHDOutput,
PygameOutput.moduleRegex: PygameOutput,
}
def getOutput(outputSpec):
"""
Return an output module instance described by the given specification.
"""
for regex in __outputModules.keys():
m = re.match(regex, outputSpec)
if m:
return __outputModules[regex].fromRegexMatch(regexMatch=m)
raise IllegalOutputSpecificationError()
def getOutputDescriptions():
"""
Return a string containing the moduleRegexDesc of every available output
module.
"""
s = ""
for outputModule in __outputModules.values():
s += " %s\n" % (outputModule.moduleRegexDesc)
return s