Browse Source

Added python3 version of wiiboard library and boad reading daemon

feature/balanceutils
klonfish 7 years ago committed by klonfish
parent
commit
bc3eca1e3b
  1. 201
      wii-pair/daemon.py
  2. 241
      wii-pair/wiiboard.py

201
wii-pair/daemon.py

@ -0,0 +1,201 @@ @@ -0,0 +1,201 @@
#!/usr/bin/env python3
import sys
import os, math, random
import time
import socket
import struct
from threading import Thread
import bluetooth
import wiiboard
known_boards = {'niklas': '00:26:59:34:C8:69',
'kaka': '00:26:59:37:04:15',
'michael': '00:24:44:65:5B:F8',
'fed': '00:23:CC:23:5E:1D',
}
class StatusThread(Thread):
def __init__(self, t1, t2):
Thread.__init__(self)
self.t1 = t1
self.t2 = t2
self.stop = False
def run(self):
t1 = self.t1
t2 = self.t2
colw = 50
batw = colw - len('Bat: 000% || ')
while not self.stop:
print('')
print(('Left player - '+t1.addr).ljust(colw) + 'Right player - '+t2.addr)
print('-'*2*colw)
if t1.addr != 'dummy':
if t1.connected:
s1 = 'Connected'
bat = t1.board.battery
else:
s1 = 'Connecting...'
bat = -1
else:
s1 = 'Dummy'
bat = 0.42
px = 'Low ' if t1.pos_x is None else '%.3f'%t1.pos_x
py = 'Low ' if t1.pos_y is None else '%.3f'%t1.pos_y
p1 = 'Position: X: %s - Y: %s' % (px, py)
if bat is not None:
b1 = 'Bat: |%s| %03d%%' % (('='*int(batw*bat)).ljust(batw), int(bat*100))
else:
b1 = 'Bat: |%s| ---%%' % ('?'*batw).ljust(batw)
w1 = 'Weight: %.1f' % t1.weight
if t2.addr != 'dummy':
if t2.connected:
s2 = 'Connected'
bat = t2.board.battery
else:
s2 = 'Connecting...'
bat = -1
else:
s2 = 'Dummy'
bat = 0.42
px = 'Low ' if t2.pos_x is None else '%.3f'%t2.pos_x
py = 'Low ' if t2.pos_y is None else '%.3f'%t2.pos_y
p2 = 'Position: X: %s - Y: %s' % (px, py)
if bat is not None:
b2 = 'Bat: |%s| %03d%%' % (('='*int(batw*bat)).ljust(batw), int(bat*100))
else:
b2 = 'Bat: |%s| ---%%' % ('?'*batw).ljust(batw)
w2 = 'Weight: %.1f' % t2.weight
print(s1.ljust(colw) + s2)
print(b1.ljust(colw) + b2)
print(w1.ljust(colw) + w2)
print(p1.ljust(colw) + p2)
time.sleep(1)
class WiiThread(Thread):
def __init__(self, player, addr):
Thread.__init__(self)
self.addr = addr
self.player = player
self.stop = False
self.connected = False
self.board = None
self.pos_x = None
self.pos_y = None
self.weight = 0
def run(self):
if self.addr != 'dummy':
while not self.stop:
self.connected = False
self.pos = 'C'
self.board = wiiboard.Wiiboard()
print('%s: Connecting to %s' % (self.player, self.addr))
try:
self.board.connect(self.addr)
except bluetooth.btcommon.BluetoothError:
continue
time.sleep(1)
print('%s: Connected' % self.player)
self.connected = True
self.board.setLight(True)
while not self.stop and self.board.status == 'Connected':
self.weight = self.board.mass.totalWeight
if self.board.mass.totalWeight > 15:
m = self.board.mass
x_balance = -(m.topLeft+m.bottomLeft) + (m.topRight+m.bottomRight)
x_balance = x_balance/float(m.totalWeight)
y_balance = -(m.bottomLeft+m.bottomRight) + (m.topLeft+m.topRight)
y_balance = y_balance/float(m.totalWeight)
self.pos_x = x_balance
self.pos_y = y_balance
else:
self.pos_x = None
self.pos_y = None
time.sleep(0.1)
self.board.disconnect()
else:
print('%s: Connected as DUMMY' % self.player)
self.connected = True
self.pos = 'D'
wiis = []
if len(sys.argv) > 1:
if sys.argv[1] in list(known_boards.keys()):
addr = known_boards[sys.argv[1]]
else:
addr = sys.argv[1]
t1 = WiiThread('L', addr)
else:
t1 = WiiThread('')
t1.daemon = True
t1.start()
if len(sys.argv) > 2:
if sys.argv[2] in list(known_boards.keys()):
addr = known_boards[sys.argv[2]]
else:
addr = sys.argv[2]
t2 = WiiThread('R', addr)
else:
t2 = WiiThread('')
while not t1.connected:
time.sleep(0.1)
t2.daemon = True
t2.start()
while not t2.connected:
time.sleep(0.1)
tStatus = StatusThread(t1, t2)
tStatus.daemon = True
tStatus.start()
wiis = [t1, t2]
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', 4711))
s.listen(1)
try:
while True:
conn, addr = s.accept()
while True:
m = conn.recv(1)
if len(m) == 0:
conn.close()
break
r = b''
for w in wiis:
if w.pos_x is None:
v = -128
else:
v = int(round(w.pos_x*100))
r += struct.pack('b', v)
if w.pos_y is None:
v = -128
else:
v = int(round(w.pos_y*100))
r += struct.pack('b', v)
conn.send(r)
except (KeyboardInterrupt, SystemExit):
t1.stop = True
t2.stop = True
tStatus.stop = True
sys.exit()

241
wii-pair/wiiboard.py

@ -0,0 +1,241 @@ @@ -0,0 +1,241 @@
'''Wiiboard driver
Nedim Jackman December 2008
No liability held for any use of this software.
More information at http://code.google.com/p/wiiboard-simple/
'''
import bluetooth
import sys
import _thread
import time
import binascii
CONTINUOUS_REPORTING = 0x04
COMMAND_LIGHT = 0x11
COMMAND_REPORTING = 0x12
COMMAND_REQUEST_STATUS = 0x15
COMMAND_REGISTER = 0x16
COMMAND_READ_REGISTER = 0x17
#input is Wii device to host
INPUT_STATUS = 0x20
INPUT_READ_DATA = 0x21
EXTENSION_8BYTES = 0x32
BUTTON_DOWN_MASK = 8
TOP_RIGHT = 0
BOTTOM_RIGHT = 1
TOP_LEFT = 2
BOTTOM_LEFT = 3
BLUETOOTH_NAME = "Nintendo RVL-WBC-01"
class BoardEvent:
def __init__(self, topLeft,topRight,bottomLeft,bottomRight, buttonPressed, buttonReleased):
self.topLeft = topLeft
self.topRight = topRight
self.bottomLeft = bottomLeft
self.bottomRight = bottomRight
self.buttonPressed = buttonPressed
self.buttonReleased = buttonReleased
#convenience value
self.totalWeight = topLeft + topRight + bottomLeft + bottomRight
class Wiiboard:
def __init__(self):
self.datasocket = None
self.controlsocket = None
self.calibration = []
self.calibrationRequested = False
self.LED = False
self.address = None
self.buttonDown = False
for i in range(3):
self.calibration.append([])
for j in range(4):
self.calibration[i].append(10000) #high dummy value so events with it don't register
self.status = "Disconnected"
self.lastEvent = BoardEvent(0,0,0,0,False,False)
self.mass = self.lastEvent
try:
self.datasocket = bluetooth.BluetoothSocket(bluetooth.L2CAP)
self.datasocket.setttimeout(2)
self.controlsocket = bluetooth.BluetoothSocket(bluetooth.L2CAP)
self.controlsocket.setttimeout(2)
except ValueError:
raise Exception("Error: Bluetooth not found")
def isConnected(self):
if self.status == "Connected":
return True
else:
return False
# Connect to the Wiiboard at bluetooth address <address>
def connect(self, address):
if address == None:
print("Non existant address")
return
self.datasocket.connect((address, 0x13))
self.controlsocket.connect((address, 0x11))
if self.datasocket and self.controlsocket:
print("Connected to Wiiboard at address " + address)
self.status = "Connected"
self.address = address
_thread.start_new_thread(self.receivethread, ())
self.calibrate()
useExt = bytes([COMMAND_REGISTER, 0x04, 0xA4, 0x00, 0x40, 0x00])
self.send(useExt)
self.setReportingType()
else:
print("Could not connect to Wiiboard at address " + address)
# Disconnect from the Wiiboard
def disconnect(self):
if self.status == "Connected":
self.status = "Disconnecting"
while self.status == "Disconnecting":
time.sleep(0.001)
try:
self.datasocket.close()
self.controlsocket.close()
except:
pass
print("WiiBoard disconnected")
# Try to discover a Wiiboard
def discover(self):
#print "Press the red sync button on the board now"
address = None
bluetoothdevices = bluetooth.discover_devices(duration = 6, lookup_names = True)
for addr, name in bluetoothdevices:
if name == BLUETOOTH_NAME:
address = addr
print("Found Wiiboard at address " + address)
#if address == None:
#print "No Wiiboards discovered."
return address
def createBoardEvent(self, data):
buttonBytes = data[0:2]
data = data[2:12]
buttonPressed = False
buttonReleased = False
state = (buttonBytes[0] << 8) | buttonBytes[1]
if state == BUTTON_DOWN_MASK:
buttonPressed = True
if not self.buttonDown:
self.buttonDown = True
if buttonPressed == False:
if self.lastEvent.buttonPressed == True:
buttonReleased = True
self.buttonDown = False
rawTR = (data[0] << 8) + data[1]
rawBR = (data[2] << 8) + data[3]
rawTL = (data[4] << 8) + data[5]
rawBL = (data[6] << 8) + data[7]
topLeft = self.calcMass(rawTL, TOP_LEFT)
topRight = self.calcMass(rawTR, TOP_RIGHT)
bottomLeft = self.calcMass(rawBL, BOTTOM_LEFT)
bottomRight = self.calcMass(rawBR, BOTTOM_RIGHT)
boardEvent = BoardEvent(topLeft,topRight,bottomLeft,bottomRight,buttonPressed,buttonReleased)
return boardEvent
def calcMass(self, raw, pos):
val = 0.0
#calibration[0] is calibration values for 0kg
#calibration[1] is calibration values for 17kg
#calibration[2] is calibration values for 34kg
if raw < self.calibration[0][pos]:
return val
elif raw < self.calibration[1][pos]:
val = 17 * ((raw - self.calibration[0][pos]) / float((self.calibration[1][pos] - self.calibration[0][pos])))
elif raw > self.calibration[1][pos]:
val = 17 + 17 * ((raw - self.calibration[1][pos]) / float((self.calibration[2][pos] - self.calibration[1][pos])))
return val
def getEvent(self):
return self.lastEvent
def getLED(self):
return self.LED
# Thread that listens for incoming data
def receivethread(self):
while self.status == "Connected":
if True:
data = self.datasocket.recv(25)
intype = data[1]
if intype == INPUT_STATUS:
self.setReportingType()
elif intype == INPUT_READ_DATA:
if self.calibrationRequested == True:
packetLength = data[4]//16 + 1
self.parseCalibrationResponse(data[7:(7+packetLength)])
if packetLength < 16:
self.calibrationRequested = False
elif intype == EXTENSION_8BYTES:
self.lastEvent = self.createBoardEvent(data[2:12])
self.mass = self.lastEvent
else:
print("ACK to data write received")
self.status = "Disconnected"
self.disconnect()
def parseCalibrationResponse(self, bytes):
index = 0
if len(bytes) == 16:
for i in range(2):
for j in range(4):
self.calibration[i][j] = (bytes[index] << 8) + bytes[index+1]
index += 2
elif len(bytes) < 16:
for i in range(4):
self.calibration[2][i] = (bytes[index] << 8) + bytes[index+1]
index += 2
# Send <data> to the Wiiboard
# <data> should be an array of strings, each string representing a single hex byte
def send(self, data):
if self.status != "Connected":
return
senddata = b'0xa2'+data
self.datasocket.send(senddata)
def setLight(self, light):
val = 0x00
if light == True:
val = 0x10
msg = bytes([COMMAND_LIGHT, val])
self.send(msg)
self.LED = light
def calibrate(self):
msg = bytes([COMMAND_READ_REGISTER, 0x04, 0xA4, 0x00, 0x24, 0x00, 0x18])
self.send_legacy(msg)
self.calibrationRequested = True
def setReportingType(self):
msg = bytes([COMMAND_REPORTING, CONTINUOUS_REPORTING, EXTENSION_8BYTES])
self.send_legacy(bytearr)
Loading…
Cancel
Save