import linphone import time import threading DIAL_TONE_FILE = 'freizeichen.wav' DIAL_TONE_LEN = 5 BUSY_TONE_FILE = 'besetztzeichen.wav' BUSY_TONE_LEN = 0.96 RINGBACK_TONE = 'weckzeichen.wav' class PhoneEvent(object): RegInProgress,\ RegSuccessfull,\ CallIncoming,\ CallRinging,\ CallAccepted,\ CallEnded,\ CallBusy= range(7) @classmethod def string(cls, val): for k, v in vars(cls).iteritems(): if v == val: return k class PhoneInterface(object): def __init__(self, config, roconfig): cbs = { 'global_state_changed': self.__global_state_changed, 'registration_state_changed': self.__registration_state_changed, 'call_state_changed': self.__call_state_changed } self.__event_cbs = [] self.__core = linphone.Core.new(cbs, config, roconfig) # Set default parameters overriding the ones from the given config file self.__core.set_user_agent('FeTAp 615', '0.1') self.__core.ringback = RINGBACK_TONE self.__core.max_calls = 1 self.__core.set_call_error_tone(linphone.Reason.Busy, '') self.__core.disable_chat(linphone.Reason.None) self.__core.echo_cancellation_enabled = False self.__core.video_capture_enabled = False self.__core.video_display_enabled = False def __global_state_changed(self, core, state, msg): print 'Global state changed:', state, msg # TODO: Do we need events emitted here? pass def __registration_state_changed(self, core, proxyconf, state, msg): print 'Registration state changed:', proxyconf, state, msg evt = None if state == linphone.RegistrationState.Progress: evt = PhoneEvent.RegInProgress elif state == linphone.RegistrationState.Ok: evt = PhoneEvent.RegSuccessfull if evt is not None: for cb in self.__event_cbs: cb(evt) else: print 'Unhandled registration state:', linphone.RegistrationState.string(state) def __call_state_changed(self, core, call, state, msg): print 'Call state changed:', call, state, msg evt = None if state == linphone.CallState.IncomingReceived: evt = PhoneEvent.CallIncoming elif state == linphone.CallState.OutgoingRinging: evt = PhoneEvent.CallRinging elif state == linphone.CallState.Connected: evt = PhoneEvent.CallAccepted elif state == linphone.CallState.End: evt = PhoneEvent.CallEnded elif state == linphone.CallState.Error: if call.error_info.reason == linphone.Reason.Busy: evt = PhoneEvent.CallBusy if evt is not None: for cb in self.__event_cbs: cb(evt) else: print 'Unhandled call state:', linphone.CallState.string(state) def __pollthread(self): while self.__running: self.__core.iterate() time.sleep(0.02) # Value from example code def start(self): self.__running = True t = threading.Thread(target=self.__pollthread) t.start() def stop(self): self.__running = False def add_event_cb(self, cb): self.__event_cbs.append(cb) def call(self, number): self.__core.invite(number) def accept_call(self): self.__core.accept_call(self.__core.current_call) def end_call(self): self.__core.terminate_call(self.__core.current_call) def __play_loop_thread(self, filename, length, continous): self.__core.play_local(filename) # Start replay of file 500ms before the old playing will end # to get more or less continous play i = 50 if continous else 0 while self.__playing and self.__running: # TODO: Schoener? if i == length * 100: self.__core.play_local(filename) i = 0 time.sleep(0.01) i += 1 self.__core.stop_ringing() def __play_file(self, filename, length, continous=False): self.__playing = True t = threading.Thread(target=self.__play_loop_thread, args=(filename, length, continous)) t.start() def play_dial_tone(self): self.__play_file(DIAL_TONE_FILE, DIAL_TONE_LEN, True) def play_busy_tone(self): self.__play_file(BUSY_TONE_FILE, BUSY_TONE_LEN) def stop_playing(self): self.__playing = False if __name__ == '__main__': def event_cb(evt): print 'Got event:', PhoneEvent.string(evt) try: phone = PhoneInterface('.linphonerc-foo', '.linphonerc') phone.add_event_cb(event_cb) phone.start() i = 0 while True: time.sleep(1) i += 1 if i == 5: phone.call('3474') #phone.play_busy_tone() pass except KeyboardInterrupt: phone.stop()