Browse Source

First working version of fetapd

pull/1/head
klonfish 10 years ago
parent
commit
20ed8681a1
  1. 308
      fetapdtest.py

308
fetapdtest.py

@ -0,0 +1,308 @@
import time
import threading
import Queue as queue
from phoneinterface import PhoneInterface, PhoneEvent
from tuitest import FeApPinConfiguration, FeApUserInterface
class IllegalEventError(Exception):
pass
class AbstractState(object):
def __init__(self, controller):
self._controller = controller
def on_registration_in_progress(self):
raise IllegalEventError()
def on_registration_successful(self):
raise IllegalEventError()
def on_gabelschalter_up(self):
raise IllegalEventError()
def on_gabelschalter_down(self):
raise IllegalEventError()
def on_incoming_call(self):
raise IllegalEventError()
def on_call_ended(self):
raise IllegalEventError()
def on_call_accepted(self):
raise IllegalEventError()
def on_call_ringing(self):
raise IllegalEventError()
def on_nummernschalter_input(self, num):
raise IllegalEventError()
def on_timeout(self):
raise IllegalEventError()
class InitState(AbstractState):
def __init__(self, controller):
super(InitState, self).__init__(controller)
self._controller.get_feap().set_schauzeichen(True)
def on_registration_in_progress(self):
print('registration in progress')
return RegisteringState
class RegisteringState(AbstractState):
def __init__(self, controller):
super(RegisteringState, self).__init__(controller)
def on_registration_successful(self):
print('registration successful')
self._controller.get_feap().set_schauzeichen(False)
return IdleState
class IdleState(AbstractState):
def on_incoming_call(self):
print('incomfing call')
return SchelltState
def on_gabelschalter_up(self):
print('gabel up')
return DialingState
def on_gabelschalter_down(self):
pass
def on_nummernschalter_input(self, x):
pass
class SchelltState(AbstractState):
def __init__(self, controller):
super(SchelltState, self).__init__(controller)
self._controller.get_feap().set_wecker(True)
def __on_leave(self):
self._controller.get_feap().set_wecker(False)
def on_gabelschalter_up(self):
self.__on_leave()
return AcceptingState
def on_call_ended(self):
self.__on_leave()
return IdleState
class AcceptingState(AbstractState):
def __init__(self, controller):
super(AcceptingState, self).__init__(controller)
self._controller.get_phone().accept_call()
def on_call_accepted(self):
return CallRunningState
class CallTerminatingState(AbstractState):
def __init__(self, controller):
super(CallTerminatingState, self).__init__(controller)
self._controller.get_phone().end_call()
def on_call_ended(self):
return IdleState
def on_call_accepted(self):
return None
class ForgottenState(AbstractState):
def on_gabelschalter_down(self):
return IdleState
class BusyBeepingState(AbstractState):
def __init__(self, controller):
super(BusyBeepingState, self).__init__(controller)
self._controller.get_phone().play_busy_tone()
def __on_leave(self):
self._controller.get_phone().stop_playing()
def on_timeout(self):
return ForgottenState
def on_gabelschalter_down(self):
return IdleState
class CallRunningState(AbstractState):
def on_gabelschalter_down(self):
return CallTerminatingState
def on_call_ended(self):
return BusyBeepingState
class WecktState(AbstractState):
def on_gabelschalter_down(self):
return CallTerminatingState
def on_call_ended(self):
return BusyBeepingState
def on_call_accepted(self):
return CallRunningState
class ConnectingState(AbstractState):
def on_gabelschalter_down(self):
return CallTerminatingState
def on_call_ringing(self):
return WecktState
def on_call_accepted(self):
return CallRunningState
class DialingState(AbstractState):
def __init__(self, controller):
super(DialingState, self).__init__(controller)
self._controller.get_phone().play_dial_tone()
self.__number = ''
def __on_leave(self):
self._controller.get_phone().stop_playing()
self._controller.abort_timeout()
def on_gabelschalter_down(self):
self.__on_leave()
return IdleState
def on_nummernschalter_input(self, num):
print('nummernschalter: %d' % (num))
self.__number += str(num)
self._controller.abort_timeout()
self._controller.set_timeout(7000)
def on_timeout(self):
self.__on_leave()
print 'Dialing number:', self.__number
self._controller.get_phone().call(self.__number)
return ConnectingState
class StateMachineController(object):
def __init__(self, phone, feap):
self.__state = InitState(self)
self.__phone = phone
self.__feap = feap
self.__running = True
self.__evqueue = queue.Queue()
self.__evthread = threading.Thread(target=self.__event_dispatcher)
self.__evthread.start()
def __event_dispatcher(self):
while self.__running:
(evname, evargs, evkwargs) = self.__evqueue.get()
if not evname:
return
print('!!! event: %s' % (evname))
handler = getattr(self.__state, 'on_%s' % (evname))
try:
newstate = handler(*evargs, **evkwargs)
except IllegalEventError:
print('illegal event occured!!!!!!!!!!!!!!!!!!!!', self.__state.__class__.__name__)
if not newstate:
continue
oldstate = self.__state.__class__
print('%s -> %s' % (oldstate.__name__, newstate.__name__))
self.__state = newstate(self)
def __timeout_thread(self, timeout):
i = 0
while self.__do_timeout and self.__running:
time.sleep(0.001)
i += 1
if i == timeout and self.__do_timeout:
self.queue_event('timeout')
return
def queue_event(self, evname, *evargs, **evkwargs):
if not hasattr(AbstractState, 'on_%s' % (evname)):
raise ValueError('Illegal event name: %s' % (evname))
self.__evqueue.put((evname, evargs, evkwargs))
def set_timeout(self, timeout):
self.__do_timeout = True
t = threading.Thread(target=self.__timeout_thread, args=(timeout,))
t.start()
def abort_timeout(self):
self.__do_timeout = False
def get_phone(self):
return phone
def get_feap(self):
return feap
def stop(self, hard=False):
if hard:
self.__running = False
self.__evqueue.put((None, None, None))
c = None
def gabelschalter_cb(state):
global c
if state == 1:
c.queue_event('gabelschalter_up')
else:
c.queue_event('gabelschalter_down')
def nummernschalter_cb(digit):
global c
c.queue_event('nummernschalter_input', digit)
def phone_cb(event):
if event == PhoneEvent.RegInProgress:
c.queue_event('registration_in_progress')
elif event == PhoneEvent.RegSuccessfull:
c.queue_event('registration_successful')
elif event == PhoneEvent.CallIncoming:
c.queue_event('incoming_call')
elif event == PhoneEvent.CallAccepted:
c.queue_event('call_accepted')
elif event == PhoneEvent.CallEnded:
c.queue_event('call_ended')
elif event == PhoneEvent.CallRinging:
c.queue_event('call_ringing')
if __name__ == '__main__':
phone = PhoneInterface(None, '.linphonerc')
pinconf = FeApPinConfiguration()
feap = FeApUserInterface(pinconf)
c = StateMachineController(phone, feap)
feap.add_gabelschalter_callback(gabelschalter_cb)
feap.add_nummernschalter_callback(nummernschalter_cb)
phone.add_event_cb(phone_cb)
phone.start()
try:
while True:
time.sleep(1)
'''
c.queue_event('gabelschalter_up')
c.queue_event('nummernschalter_input', 4)
c.queue_event('nummernschalter_input', 2)
#c.queue_event('gabelschalter_down')
#c.queue_event('call_accepted')
c.queue_event('timeout')
c.queue_event('call_ringing')
#c.queue_event('gabelschalter_down')
c.queue_event('call_accepted')
c.queue_event('call_ended')
c.queue_event('timeout')
c.queue_event('gabelschalter_down')
'''
except KeyboardInterrupt:
phone.stop()
feap.set_wecker(False)
c.stop()
Loading…
Cancel
Save