You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
505 lines
16 KiB
505 lines
16 KiB
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import sys |
|
import socket |
|
import struct |
|
import time |
|
import enum |
|
import math |
|
import collections |
|
import blup.frame |
|
import blup.output |
|
import random |
|
import threading |
|
|
|
|
|
class InvalidMoveError(Exception): |
|
pass |
|
|
|
class Point(collections.namedtuple('Point', ['x', 'y'])): |
|
__slots__ = () |
|
|
|
def __add__(self, other): |
|
return Point(self.x + other.x, self.y + other.y) |
|
|
|
def floored(self): |
|
return Point(math.floor(self.x), math.floor(self.y)) |
|
|
|
Block = collections.namedtuple('Block', ['pos', 'color']) |
|
|
|
|
|
class Tetrimino(): |
|
def __init__(self, shape, playground, pos): |
|
self.shape = shape |
|
self.playground = playground |
|
self.pos = pos |
|
|
|
@property |
|
def width(self): |
|
x = list(map(lambda s: s.pos.x, self.shape)) |
|
return max(x) - min(x) |
|
|
|
@property |
|
def height(self): |
|
y = list(map(lambda s: s.pos.y, self.shape)) |
|
return max(y) - min(y) |
|
|
|
@property |
|
def blocks(self): |
|
ret = { Block((self.pos + b.pos).floored(), b.color) for b in |
|
self.shape } |
|
return ret |
|
|
|
def __calc_points(self, pos=None, shape=None): |
|
if pos is None: |
|
pos = self.pos |
|
if shape is None: |
|
shape = self.shape |
|
return { (pos + b.pos).floored() for b in shape } |
|
|
|
@property |
|
def points(self): |
|
return self.__calc_points() |
|
|
|
def __check_collision(self, newpoints): |
|
#if not self.playground.contains_points(newpoints): |
|
# raise InvalidMoveError('out of playground bounds') |
|
#print(self.playground.block_points) |
|
#print(self.playground.blocks) |
|
#print('new', newpoints) |
|
for newp in newpoints: |
|
if ( newp.x >= self.playground.width or newp.x < 0 or |
|
newp.y >= self.playground.height): |
|
raise InvalidMoveError('out of playground bounds') |
|
if not self.playground.block_points.isdisjoint(newpoints): |
|
raise InvalidMoveError('new position already occupied') |
|
other_mino_points = self.playground.mino_points - self.points |
|
if not other_mino_points.isdisjoint(newpoints): |
|
raise InvalidMoveError('other Tetrimino at new position') |
|
|
|
def rotate(self, ccw=False): |
|
if ccw: |
|
transform = lambda s: Block(Point(s.pos.y, -s.pos.x), s.color) |
|
else: |
|
transform = lambda s: Block(Point(-s.pos.y, s.pos.x), s.color) |
|
newshape = set(map(transform, self.shape)) |
|
newpoints = self.__calc_points(shape=newshape) |
|
self.__check_collision(newpoints) |
|
self.shape = newshape |
|
|
|
def move(self, m): |
|
newpos = self.pos + m |
|
newpoints = self.__calc_points(pos=newpos) |
|
self.__check_collision(newpoints) |
|
self.pos = newpos |
|
|
|
|
|
|
|
|
|
|
|
|
|
class TetriL(Tetrimino): |
|
def __init__(self, playground, pos): |
|
color = (255,165,0) |
|
points = [(-1, 1), (-1, 0), (0, 0), (1, 0)] |
|
shape = { Block(Point(x, y), color) for (x, y) in points } |
|
Tetrimino.__init__(self, shape, playground, pos) |
|
|
|
|
|
class TetriJ(Tetrimino): |
|
def __init__(self, playground, pos): |
|
color = (0,0,255) |
|
points = [(-1, 0), (0, 0), (1, 0), (1, 1)] |
|
shape = { Block(Point(x, y), color) for (x, y) in points } |
|
Tetrimino.__init__(self, shape, playground, pos) |
|
|
|
|
|
class TetriI(Tetrimino): |
|
def __init__(self, playground, pos): |
|
color = (0,255,255) |
|
points = {(1.5, -0.5), (0.5, -0.5), (-0.5, -0.5), (-1.5, -0.5)} |
|
shape = { Block(Point(x, y), color) for (x, y) in points } |
|
Tetrimino.__init__(self, shape, playground, pos) |
|
|
|
|
|
class TetriO(Tetrimino): |
|
def __init__(self, playground, pos): |
|
color = (255,255,0) |
|
points = {(-0.5, -0.5), (-0.5, 0.5), (0.5, -0.5), (0.5, 0.5)} |
|
shape = { Block(Point(x, y), color) for (x, y) in points } |
|
Tetrimino.__init__(self, shape, playground, pos) |
|
|
|
|
|
class TetriS(Tetrimino): |
|
def __init__(self, playground, pos): |
|
color = (128,255,0) |
|
points = {(-1, 0), (0, 0), (0, -1), (1, -1)} |
|
shape = { Block(Point(x, y), color) for (x, y) in points } |
|
Tetrimino.__init__(self, shape, playground, pos) |
|
|
|
|
|
class TetriZ(Tetrimino): |
|
def __init__(self, playground, pos): |
|
color = (255,0,0) |
|
points = {(-1, 0), (0, 0), (0, 1), (1, 1)} |
|
shape = { Block(Point(x, y), color) for (x, y) in points } |
|
Tetrimino.__init__(self, shape, playground, pos) |
|
|
|
|
|
class TetriT(Tetrimino): |
|
def __init__(self, playground, pos): |
|
color = (128,0,128) |
|
points = {(-1, 0), (0, 0), (1, 0), (0, 1)} |
|
shape = { Block(Point(x, y), color) for (x, y) in points } |
|
Tetrimino.__init__(self, shape, playground, pos) |
|
|
|
|
|
class Playground(): |
|
def __init__(self, width=10, height=22): |
|
self.width = width |
|
self.height = height |
|
self.blocks = set() |
|
self.minos = set() |
|
|
|
@property |
|
def block_points(self): |
|
return set([ b.pos for b in self.blocks ]) |
|
|
|
@property |
|
def mino_points(self): |
|
return set.union(set(), *[ m.points for m in self.minos ]) |
|
|
|
def contains_points(self, points): |
|
for p in points: |
|
if p.x >= self.width or p.y >= self.height or p.x < 0 or p.y < 0: |
|
return False |
|
return True |
|
|
|
def paint(self, frame, xpos, ypos): |
|
for x in range(self.width): |
|
for y in range(self.height): |
|
frame.setPixel(xpos + x, ypos + y, (0, 0, 0)) |
|
for b in set.union(self.blocks, *[ m.blocks for m in self.minos ]): |
|
if not self.contains_points([b.pos]): |
|
# don't draw blocks outside the playground area |
|
continue |
|
frame.setPixel(xpos + int(b.pos.x), ypos + int(b.pos.y), b.color) |
|
|
|
|
|
PlayerEvent = enum.Enum('PlayerEvent', ['ROTATE', 'DROP', 'MOVE_LEFT', |
|
'MOVE_RIGHT', 'QUIT']) |
|
|
|
|
|
class TtrsPlayer(): |
|
def __init__(self, playground): |
|
pass |
|
|
|
def get_event(self): |
|
return None |
|
|
|
|
|
class TestTtrsPlayer(TtrsPlayer): |
|
def __init__(self, playground): |
|
self.playground = playground |
|
self.__evt = None |
|
|
|
import pygame |
|
self.__pygame = pygame |
|
self.screen = pygame.display.set_mode((100, 100)) |
|
pygame.display.update() |
|
|
|
self.controls = { |
|
pygame.K_a: PlayerEvent.MOVE_LEFT, |
|
pygame.K_d: PlayerEvent.MOVE_RIGHT, |
|
pygame.K_w: PlayerEvent.ROTATE, |
|
pygame.K_s: PlayerEvent.DROP, |
|
pygame.K_ESCAPE: PlayerEvent.QUIT, |
|
} |
|
#self.controls = { |
|
# pygame.K_LEFT: PlayerEvent.MOVE_LEFT, |
|
# pygame.K_RIGHT: PlayerEvent.MOVE_RIGHT, |
|
# pygame.K_UP: PlayerEvent.ROTATE, |
|
# pygame.K_DOWN: PlayerEvent.DROP, |
|
# pygame.K_ESCAPE: PlayerEvent.QUIT, |
|
#} |
|
|
|
def get_event(self): |
|
pygame = self.__pygame |
|
for event in pygame.event.get(): |
|
if event.type == pygame.KEYDOWN: |
|
return self.controls.get(event.key, None) |
|
return None |
|
|
|
|
|
class BalanceTtrsPlayer(TtrsPlayer, threading.Thread): |
|
def __init__(self, playground, addr, player_id): |
|
self.playground = playground |
|
self.addr = addr |
|
self.player_id = player_id |
|
self.evt = None |
|
self.lastevt = None |
|
threading.Thread.__init__(self, daemon=True) |
|
self.start() |
|
|
|
def run(self): |
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
try: |
|
sock.connect(self.addr) |
|
except ConnectionRefusedError: |
|
print('could not connect to balance server', file=sys.stderr) |
|
self.evt = PlayerEvent.QUIT |
|
return |
|
self.running = True |
|
evt = None |
|
oldevt = None |
|
lastchange = 0 |
|
while self.running: |
|
sock.send(b'a') |
|
data = sock.recv(4) |
|
p0x, p0y, p1x, p1y = struct.unpack('bbbb', data) |
|
if self.player_id == 0: |
|
xbal, ybal = p0x, p0y |
|
elif self.player_id == 1: |
|
xbal, ybal = p1x, p1y |
|
|
|
print('player_id=%d xbal=%d ybal=%d' % (self.player_id, xbal, ybal)) |
|
if xbal == -128 or ybal == -128: |
|
self.evt = PlayerEvent.QUIT |
|
self.running = False |
|
break |
|
|
|
THRESHOLD = 40 |
|
if abs(xbal) < THRESHOLD and abs(ybal) < THRESHOLD: |
|
evt = None |
|
else: |
|
if abs(xbal) < abs(ybal): |
|
if ybal > 0: |
|
evt = PlayerEvent.ROTATE |
|
else: |
|
evt = PlayerEvent.DROP |
|
else: |
|
if xbal > 0: |
|
evt = PlayerEvent.MOVE_RIGHT |
|
else: |
|
evt = PlayerEvent.MOVE_LEFT |
|
|
|
MIN_TIMES = { |
|
PlayerEvent.MOVE_LEFT: 0.1, |
|
PlayerEvent.MOVE_RIGHT: 0.1, |
|
PlayerEvent.ROTATE: 0.1, |
|
PlayerEvent.DROP: 0.3, |
|
} |
|
if evt != oldevt: |
|
lastchange = time.time() |
|
oldevt = evt |
|
|
|
if time.time() - lastchange < MIN_TIMES.get(evt, 0): |
|
print('player_id=%d debounce %s' % (self.player_id, evt)) |
|
continue |
|
|
|
if evt == PlayerEvent.ROTATE: |
|
if self.lastevt != PlayerEvent.ROTATE: |
|
self.evt = evt |
|
else: |
|
self.evt = evt |
|
self.lastevt = evt |
|
print('player_id=%d event=%s' % (self.player_id, self.evt)) |
|
|
|
|
|
|
|
def get_event(self): |
|
evt = self.evt |
|
self.evt = None |
|
return evt |
|
|
|
|
|
class TtrsGame(threading.Thread): |
|
def __init__(self, playground, player, rnd=None): |
|
self.playground = playground |
|
self.player = player |
|
if rnd is None: |
|
self.rnd = random.Random() |
|
else: |
|
self.rnd = rnd |
|
self.running = False |
|
self.tick_callbacks = [] |
|
threading.Thread.__init__(self, daemon=True) |
|
|
|
def add_tick_callback(self, cb): |
|
self.tick_callbacks.append(cb) |
|
|
|
def __call_callbacks(self): |
|
for cb in self.tick_callbacks: |
|
cb(self) |
|
|
|
def run(self): |
|
self.running = True |
|
spawnpos = Point(self.playground.width // 2, -1) |
|
mino = None |
|
TICK_TIME = 0.1 |
|
FALL_INTERVAL = 5 |
|
MOVE_INTERVAL = 1 |
|
ticks = 0 |
|
lastfall = 0 |
|
lastmove = 0 |
|
dropping = False |
|
top_row = { Point(x, -1) for x in range(self.playground.height) } |
|
while self.running: |
|
self.__call_callbacks() |
|
time.sleep(TICK_TIME) |
|
ticks += 1 |
|
|
|
evt = self.player.get_event() |
|
if evt == PlayerEvent.QUIT: |
|
time.sleep(3) |
|
if self.player.get_event() == PlayerEvent.QUIT: |
|
self.running = False |
|
break |
|
|
|
if not self.playground.block_points.isdisjoint(top_row): |
|
self.running = False |
|
break |
|
|
|
if mino is None: |
|
newminocls = self.rnd.choice(Tetrimino.__subclasses__()) |
|
mino = newminocls(self.playground, spawnpos) |
|
self.playground.minos = {mino} |
|
|
|
#print(mino.pos) |
|
|
|
to_remove = set() |
|
for y in range(self.playground.height): |
|
row = { Point(x, y) for x in range(self.playground.width) } |
|
if row.issubset(self.playground.block_points): |
|
to_remove.add(y) |
|
|
|
if len(to_remove) > 0: |
|
to_delete = set() |
|
for b in list(self.playground.blocks): |
|
if b.pos.y in to_remove: |
|
to_delete.add(Block(b.pos, (255, 255, 255))) |
|
self.playground.blocks.remove(b) |
|
|
|
for i in range(2): |
|
for b in to_delete: |
|
self.playground.blocks.add(b) |
|
self.__call_callbacks() |
|
time.sleep(0.2) |
|
for b in to_delete: |
|
self.playground.blocks.remove(b) |
|
self.__call_callbacks() |
|
time.sleep(0.2) |
|
|
|
to_add = set() |
|
for b in list(self.playground.blocks): |
|
o = len(list(filter(lambda y: y > b.pos.y, to_remove))) |
|
if o > 0: |
|
self.playground.blocks.remove(b) |
|
newb = Block(b.pos + Point(0, o), b.color) |
|
to_add.add(newb) |
|
self.playground.blocks.update(to_add) |
|
|
|
if ticks - lastfall >= FALL_INTERVAL or evt == PlayerEvent.DROP: |
|
lastfall = ticks |
|
try: |
|
mino.move(Point(0, 1)) |
|
except InvalidMoveError: |
|
self.playground.blocks.update(mino.blocks) |
|
mino = None |
|
self.playground.minos = set() |
|
continue |
|
|
|
if ticks - lastmove >= MOVE_INTERVAL: |
|
try: |
|
if evt == PlayerEvent.MOVE_LEFT: |
|
mino.move(Point(-1, 0)) |
|
lastmove = ticks |
|
elif evt == PlayerEvent.MOVE_RIGHT: |
|
mino.move(Point(1, 0)) |
|
lastmove = ticks |
|
except InvalidMoveError: |
|
pass |
|
|
|
if evt == PlayerEvent.ROTATE: |
|
try: |
|
mino.rotate() |
|
except InvalidMoveError: |
|
pass |
|
|
|
|
|
def repaint(pg): |
|
pg.paint(frame, 0, 0) |
|
out.sendFrame(frame) |
|
|
|
class GamePlaygroundPainter(): |
|
def __init__(self, output, dimension): |
|
self.output = output |
|
self.games = {} |
|
self.frame = blup.frame.Frame(dimension) |
|
bgcolor = (100, 100, 100) |
|
for x in range(dimension.width): |
|
for y in range(dimension.height): |
|
self.frame.setPixel(x, y, bgcolor) |
|
|
|
def add_game(self, game, xpos, ypos): |
|
game.add_tick_callback(self.repaint) |
|
self.games[game] = (xpos, ypos) |
|
|
|
def repaint(self, game): |
|
xpos, ypos = self.games[game] |
|
game.playground.paint(self.frame, xpos, ypos) |
|
self.output.sendFrame(self.frame) |
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser(description='Blinkenbunt Tetris!') |
|
parser.add_argument('--players', dest='players', type=int, default=1, |
|
help='number of players') |
|
parser.add_argument('--pygame', dest='pygame', action='store_true', |
|
help='use pygame as input') |
|
parser.add_argument('--balance', dest='balance', type=str, |
|
metavar='HOST:PORT', help='use balance input') |
|
parser.add_argument('--out', dest='out', type=str, metavar='OUTPUT', |
|
default='e3blp', help='blup output specification') |
|
args = parser.parse_args() |
|
|
|
if args.balance is None and not args.pygame: |
|
print('please specify an input method', file=sys.stderr) |
|
sys.exit(1) |
|
|
|
if args.pygame and args.players == 2: |
|
print('pygame input does only support one player', file=sys.stderr) |
|
sys.exit(1) |
|
|
|
w = 22 |
|
h = 16 |
|
|
|
out = blup.output.getOutput(args.out) |
|
dim = blup.frame.FrameDimension(w, h, 256, 3) |
|
painter = GamePlaygroundPainter(out, dim) |
|
|
|
seed = random.random() |
|
games = [] |
|
def start_game(player_id, xpos): |
|
pg = Playground(10, 16) |
|
if args.pygame: |
|
player = TestTtrsPlayer(pg) |
|
elif args.balance is not None: |
|
host, port = args.balance.split(':') |
|
player = BalanceTtrsPlayer(pg, (host, int(port)), player_id) |
|
rnd = random.Random(seed) |
|
game = TtrsGame(pg, player, rnd) |
|
painter.add_game(game, xpos, 0) |
|
game.start() |
|
games.append(game) |
|
|
|
if args.players == 1: |
|
start_game(0, 6) |
|
elif args.players == 2: |
|
start_game(0, 0) |
|
start_game(1, 12) |
|
|
|
for game in games: |
|
game.join() |
|
|
|
|
|
|