#!/usr/bin/env python3 import argparse import sys import socket import struct import time import enum import math import collections import blup.balance_util import blup.frame import blup.output import random import threading class InvalidMoveError(Exception): pass class Point(collections.namedtuple('Point', ['x', 'y'])): __slots__ = () def __add__(self, other): return Point(self.x + other.x, self.y + other.y) def floored(self): return Point(math.floor(self.x), math.floor(self.y)) Block = collections.namedtuple('Block', ['pos', 'color']) class Tetrimino(): def __init__(self, shape, playground, pos): self.shape = shape self.playground = playground self.pos = pos @property def width(self): x = list(map(lambda s: s.pos.x, self.shape)) return max(x) - min(x) @property def height(self): y = list(map(lambda s: s.pos.y, self.shape)) return max(y) - min(y) @property def blocks(self): ret = { Block((self.pos + b.pos).floored(), b.color) for b in self.shape } return ret def __calc_points(self, pos=None, shape=None): if pos is None: pos = self.pos if shape is None: shape = self.shape return { (pos + b.pos).floored() for b in shape } @property def points(self): return self.__calc_points() def __check_collision(self, newpoints): #if not self.playground.contains_points(newpoints): # raise InvalidMoveError('out of playground bounds') #print(self.playground.block_points) #print(self.playground.blocks) #print('new', newpoints) for newp in newpoints: if ( newp.x >= self.playground.width or newp.x < 0 or newp.y >= self.playground.height): raise InvalidMoveError('out of playground bounds') if not self.playground.block_points.isdisjoint(newpoints): raise InvalidMoveError('new position already occupied') other_mino_points = self.playground.mino_points - self.points if not other_mino_points.isdisjoint(newpoints): raise InvalidMoveError('other Tetrimino at new position') def rotate(self, ccw=False): if ccw: transform = lambda s: Block(Point(s.pos.y, -s.pos.x), s.color) else: transform = lambda s: Block(Point(-s.pos.y, s.pos.x), s.color) newshape = set(map(transform, self.shape)) newpoints = self.__calc_points(shape=newshape) self.__check_collision(newpoints) self.shape = newshape def move(self, m): newpos = self.pos + m newpoints = self.__calc_points(pos=newpos) self.__check_collision(newpoints) self.pos = newpos class TetriL(Tetrimino): def __init__(self, playground, pos): color = (255,165,0) points = [(-1, 1), (-1, 0), (0, 0), (1, 0)] shape = { Block(Point(x, y), color) for (x, y) in points } Tetrimino.__init__(self, shape, playground, pos) class TetriJ(Tetrimino): def __init__(self, playground, pos): color = (0,0,255) points = [(-1, 0), (0, 0), (1, 0), (1, 1)] shape = { Block(Point(x, y), color) for (x, y) in points } Tetrimino.__init__(self, shape, playground, pos) class TetriI(Tetrimino): def __init__(self, playground, pos): color = (0,255,255) points = {(1.5, -0.5), (0.5, -0.5), (-0.5, -0.5), (-1.5, -0.5)} shape = { Block(Point(x, y), color) for (x, y) in points } Tetrimino.__init__(self, shape, playground, pos) class TetriO(Tetrimino): def __init__(self, playground, pos): color = (255,255,0) points = {(-0.5, -0.5), (-0.5, 0.5), (0.5, -0.5), (0.5, 0.5)} shape = { Block(Point(x, y), color) for (x, y) in points } Tetrimino.__init__(self, shape, playground, pos) class TetriS(Tetrimino): def __init__(self, playground, pos): color = (128,255,0) points = {(-1, 0), (0, 0), (0, -1), (1, -1)} shape = { Block(Point(x, y), color) for (x, y) in points } Tetrimino.__init__(self, shape, playground, pos) class TetriZ(Tetrimino): def __init__(self, playground, pos): color = (255,0,0) points = {(-1, 0), (0, 0), (0, 1), (1, 1)} shape = { Block(Point(x, y), color) for (x, y) in points } Tetrimino.__init__(self, shape, playground, pos) class TetriT(Tetrimino): def __init__(self, playground, pos): color = (128,0,128) points = {(-1, 0), (0, 0), (1, 0), (0, 1)} shape = { Block(Point(x, y), color) for (x, y) in points } Tetrimino.__init__(self, shape, playground, pos) class Playground(): def __init__(self, width=10, height=22): self.width = width self.height = height self.blocks = set() self.minos = set() @property def block_points(self): return set([ b.pos for b in self.blocks ]) @property def mino_points(self): return set.union(set(), *[ m.points for m in self.minos ]) def contains_points(self, points): for p in points: if p.x >= self.width or p.y >= self.height or p.x < 0 or p.y < 0: return False return True def paint(self, frame, xpos, ypos): for x in range(self.width): for y in range(self.height): frame.setPixel(xpos + x, ypos + y, (0, 0, 0)) for b in set.union(self.blocks, *[ m.blocks for m in self.minos ]): if not self.contains_points([b.pos]): # don't draw blocks outside the playground area continue frame.setPixel(xpos + int(b.pos.x), ypos + int(b.pos.y), b.color) PlayerEvent = enum.Enum('PlayerEvent', ['ROTATE', 'DROP', 'MOVE_LEFT', 'MOVE_RIGHT', 'QUIT']) class TtrsPlayer(): def __init__(self, playground): pass def get_event(self): return None class TestTtrsPlayer(TtrsPlayer): def __init__(self, playground): self.playground = playground self.__evt = None import pygame self.__pygame = pygame self.screen = pygame.display.set_mode((100, 100)) pygame.display.update() self.controls = { pygame.K_a: PlayerEvent.MOVE_LEFT, pygame.K_d: PlayerEvent.MOVE_RIGHT, pygame.K_w: PlayerEvent.ROTATE, pygame.K_s: PlayerEvent.DROP, pygame.K_ESCAPE: PlayerEvent.QUIT, } #self.controls = { # pygame.K_LEFT: PlayerEvent.MOVE_LEFT, # pygame.K_RIGHT: PlayerEvent.MOVE_RIGHT, # pygame.K_UP: PlayerEvent.ROTATE, # pygame.K_DOWN: PlayerEvent.DROP, # pygame.K_ESCAPE: PlayerEvent.QUIT, #} def get_event(self): pygame = self.__pygame for event in pygame.event.get(): if event.type == pygame.KEYDOWN: return self.controls.get(event.key, None) return None class BalanceTtrsPlayer(TtrsPlayer, threading.Thread): def __init__(self, playground, balance_util): self.playground = playground self.balance_util = balance_util self.evt = None self.lastevt = None threading.Thread.__init__(self, daemon=True) self.start() def run(self): self.running = True evt = None oldevt = None lastchange = 0 while self.running: THRESHOLD = 40 direction = self.balance_util.get_4dir(THRESHOLD) if direction == blup.balance_util.Direction.QUIT: self.evt = PlayerEvent.QUIT elif direction == blup.balance_util.Direction.LEFT: evt = PlayerEvent.MOVE_LEFT elif direction == blup.balance_util.Direction.RIGHT: evt = PlayerEvent.MOVE_RIGHT elif direction == blup.balance_util.Direction.UP: evt = PlayerEvent.ROTATE elif direction == blup.balance_util.Direction.DOWN: evt = PlayerEvent.DROP else: evt = None #print('player_id=%d xbal=%d ybal=%d' % (self.player_id, xbal, ybal)) MIN_TIMES = { PlayerEvent.MOVE_LEFT: 0.1, PlayerEvent.MOVE_RIGHT: 0.1, PlayerEvent.ROTATE: 0.1, PlayerEvent.DROP: 0.3, } if evt != oldevt: lastchange = time.time() oldevt = evt if time.time() - lastchange < MIN_TIMES.get(evt, 0): #print('player_id=%d debounce %s' % (self.player_id, evt)) continue if evt == PlayerEvent.ROTATE: if self.lastevt != PlayerEvent.ROTATE: self.evt = evt else: self.evt = evt self.lastevt = evt #print('player_id=%d event=%s' % (self.player_id, self.evt)) def get_event(self): print('player_id=%d event=%s' % (self.balance_util.player_id, self.evt)) evt = self.evt self.evt = None return evt class TtrsGame(threading.Thread): def __init__(self, playground, player, rnd=None): self.playground = playground self.player = player if rnd is None: self.rnd = random.Random() else: self.rnd = rnd self.running = False self.tick_callbacks = [] threading.Thread.__init__(self, daemon=True) def add_tick_callback(self, cb): self.tick_callbacks.append(cb) def __call_callbacks(self): for cb in self.tick_callbacks: cb(self) def run(self): self.running = True spawnpos = Point(self.playground.width // 2, -1) mino = None TICK_TIME = 0.1 FALL_INTERVAL = 5 MOVE_INTERVAL = 1 DROP_BARRIER = 4 ticks = 0 lastfall = 0 lastmove = 0 dropping = False top_row = { Point(x, -1) for x in range(self.playground.height) } while self.running: self.__call_callbacks() time.sleep(TICK_TIME) ticks += 1 evt = self.player.get_event() if evt == PlayerEvent.QUIT: time.sleep(3) if self.player.get_event() == PlayerEvent.QUIT: self.running = False break if not self.playground.block_points.isdisjoint(top_row): self.running = False break if mino is None: newminocls = self.rnd.choice(Tetrimino.__subclasses__()) mino = newminocls(self.playground, spawnpos) self.playground.minos = {mino} #print(mino.pos) to_remove = set() for y in range(self.playground.height): row = { Point(x, y) for x in range(self.playground.width) } if row.issubset(self.playground.block_points): to_remove.add(y) if len(to_remove) > 0: to_delete = set() for b in list(self.playground.blocks): if b.pos.y in to_remove: to_delete.add(Block(b.pos, (255, 255, 255))) self.playground.blocks.remove(b) for i in range(2): for b in to_delete: self.playground.blocks.add(b) self.__call_callbacks() time.sleep(0.2) for b in to_delete: self.playground.blocks.remove(b) self.__call_callbacks() time.sleep(0.2) to_add = set() for b in list(self.playground.blocks): o = len(list(filter(lambda y: y > b.pos.y, to_remove))) if o > 0: self.playground.blocks.remove(b) newb = Block(b.pos + Point(0, o), b.color) to_add.add(newb) self.playground.blocks.update(to_add) drop = evt == PlayerEvent.DROP and mino.pos.y > DROP_BARRIER if ticks - lastfall >= FALL_INTERVAL or drop: lastfall = ticks try: mino.move(Point(0, 1)) except InvalidMoveError: self.playground.blocks.update(mino.blocks) mino = None self.playground.minos = set() continue if ticks - lastmove >= MOVE_INTERVAL: try: if evt == PlayerEvent.MOVE_LEFT: mino.move(Point(-1, 0)) lastmove = ticks elif evt == PlayerEvent.MOVE_RIGHT: mino.move(Point(1, 0)) lastmove = ticks except InvalidMoveError: pass if evt == PlayerEvent.ROTATE: try: mino.rotate() except InvalidMoveError: pass def repaint(pg): pg.paint(frame, 0, 0) out.sendFrame(frame) class GamePlaygroundPainter(): def __init__(self, output, dimension): self.output = output self.games = {} self.frame = blup.frame.Frame(dimension) bgcolor = (100, 100, 100) for x in range(dimension.width): for y in range(dimension.height): self.frame.setPixel(x, y, bgcolor) def add_game(self, game, xpos, ypos): game.add_tick_callback(self.repaint) self.games[game] = (xpos, ypos) def repaint(self, game): xpos, ypos = self.games[game] game.playground.paint(self.frame, xpos, ypos) self.output.sendFrame(self.frame) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Blinkenbunt Tetris!') parser.add_argument('--players', dest='players', type=int, default=1, help='number of players') parser.add_argument('--pygame', dest='pygame', action='store_true', help='use pygame as input') parser.add_argument('--balance', dest='balance', type=str, metavar='HOST:PORT', help='use balance input') parser.add_argument('--out', dest='out', type=str, metavar='OUTPUT', default='e3blp', help='blup output specification') args = parser.parse_args() if args.balance is None and not args.pygame: print('please specify an input method', file=sys.stderr) sys.exit(1) if args.pygame and args.players == 2: print('pygame input does only support one player', file=sys.stderr) sys.exit(1) w = 22 h = 16 out = blup.output.getOutput(args.out) dim = blup.frame.FrameDimension(w, h, 256, 3) painter = GamePlaygroundPainter(out, dim) seed = random.random() games = [] sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) host, port = args.balance.split(':') try: sock.connect((host, int(port))) except ConnectionRefusedError: print('could not connect to balance server', file=sys.stderr) sys.exit(1) def start_game(player_id, xpos): pg = Playground(10, 16) balance_util = blup.balance_util.BalanceUtil(sock, player_id) if args.pygame: player = TestTtrsPlayer(pg) elif args.balance is not None: player = BalanceTtrsPlayer(pg, balance_util) rnd = random.Random(seed) game = TtrsGame(pg, player, rnd) painter.add_game(game, xpos, 0) game.start() games.append(game) if args.players == 1: start_game(0, 6) elif args.players == 2: start_game(0, 0) start_game(1, 12) for game in games: game.join()