forked from LUG-Saar/fetapi
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
166 lines
5.2 KiB
166 lines
5.2 KiB
import linphone |
|
import time |
|
import threading |
|
|
|
DIAL_TONE_FILE = 'freizeichen.wav' |
|
DIAL_TONE_LEN = 5 |
|
BUSY_TONE_FILE = 'besetztzeichen.wav' |
|
BUSY_TONE_LEN = 0.96 |
|
RINGBACK_TONE = 'weckzeichen.wav' |
|
|
|
class PhoneEvent(object): |
|
RegInProgress,\ |
|
RegSuccessfull,\ |
|
CallIncoming,\ |
|
CallRinging,\ |
|
CallAccepted,\ |
|
CallEnded,\ |
|
CallBusy,\ |
|
CallInvalidNumber = range(8) |
|
|
|
@classmethod |
|
def string(cls, val): |
|
for k, v in vars(cls).iteritems(): |
|
if v == val: |
|
return k |
|
|
|
class PhoneInterface(object): |
|
def __init__(self, config, roconfig): |
|
cbs = { |
|
'global_state_changed': self.__global_state_changed, |
|
'registration_state_changed': self.__registration_state_changed, |
|
'call_state_changed': self.__call_state_changed |
|
} |
|
|
|
self.__event_cbs = [] |
|
|
|
self.__core = linphone.Core.new(cbs, config, roconfig) |
|
|
|
# Set default parameters overriding the ones from the given config file |
|
self.__core.set_user_agent('FeTAp 615', '0.1') |
|
self.__core.ringback = RINGBACK_TONE |
|
self.__core.max_calls = 1 |
|
self.__core.set_call_error_tone(linphone.Reason.Busy, '') |
|
self.__core.disable_chat(linphone.Reason.None) |
|
self.__core.echo_cancellation_enabled = False |
|
self.__core.video_capture_enabled = False |
|
self.__core.video_display_enabled = False |
|
|
|
def __global_state_changed(self, core, state, msg): |
|
print 'Global state changed:', state, msg |
|
# TODO: Do we need events emitted here? |
|
pass |
|
|
|
def __registration_state_changed(self, core, proxyconf, state, msg): |
|
print 'Registration state changed:', proxyconf, state, msg |
|
evt = None |
|
if state == linphone.RegistrationState.Progress: |
|
evt = PhoneEvent.RegInProgress |
|
elif state == linphone.RegistrationState.Ok: |
|
evt = PhoneEvent.RegSuccessfull |
|
|
|
if evt is not None: |
|
for cb in self.__event_cbs: |
|
cb(evt) |
|
else: |
|
print 'Unhandled registration state:', linphone.RegistrationState.string(state) |
|
|
|
def __call_state_changed(self, core, call, state, msg): |
|
print 'Call state changed:', call, state, msg |
|
evt = None |
|
if state == linphone.CallState.IncomingReceived: |
|
evt = PhoneEvent.CallIncoming |
|
elif state == linphone.CallState.OutgoingRinging: |
|
evt = PhoneEvent.CallRinging |
|
elif state == linphone.CallState.Connected: |
|
evt = PhoneEvent.CallAccepted |
|
elif state == linphone.CallState.End: |
|
evt = PhoneEvent.CallEnded |
|
elif state == linphone.CallState.Error: |
|
error = call.error_info.reason |
|
if error == linphone.Reason.Busy: |
|
evt = PhoneEvent.CallBusy |
|
elif error == linphone.Reason.NotFound: |
|
evt = PhoneEvent.CallInvalidNumber |
|
else: |
|
evt = PhoneEvent.CallEnded |
|
|
|
if evt is not None: |
|
for cb in self.__event_cbs: |
|
cb(evt) |
|
else: |
|
print 'Unhandled call state:', linphone.CallState.string(state) |
|
|
|
def __pollthread(self): |
|
while self.__running: |
|
self.__core.iterate() |
|
time.sleep(0.02) # Value from example code |
|
|
|
def start(self): |
|
self.__running = True |
|
t = threading.Thread(target=self.__pollthread) |
|
t.start() |
|
|
|
def stop(self): |
|
self.__running = False |
|
|
|
def add_event_cb(self, cb): |
|
self.__event_cbs.append(cb) |
|
|
|
def call(self, number): |
|
self.__core.invite(number) |
|
|
|
def accept_call(self): |
|
self.__core.accept_call(self.__core.current_call) |
|
|
|
def end_call(self): |
|
self.__core.terminate_call(self.__core.current_call) |
|
|
|
def __play_loop_thread(self, filename, length, continous): |
|
self.__core.play_local(filename) |
|
# Start replay of file 500ms before the old playing will end |
|
# to get more or less continous play |
|
i = 50 if continous else 0 |
|
while self.__playing and self.__running: |
|
# TODO: Schoener? |
|
if i == length * 100: |
|
self.__core.play_local(filename) |
|
i = 0 |
|
time.sleep(0.01) |
|
i += 1 |
|
self.__core.stop_ringing() |
|
|
|
def __play_file(self, filename, length, continous=False): |
|
print 'Playing file:', filename |
|
self.__playing = True |
|
t = threading.Thread(target=self.__play_loop_thread, args=(filename, length, continous)) |
|
t.start() |
|
|
|
def play_dial_tone(self): |
|
self.__play_file(DIAL_TONE_FILE, DIAL_TONE_LEN, True) |
|
|
|
def play_busy_tone(self): |
|
self.__play_file(BUSY_TONE_FILE, BUSY_TONE_LEN) |
|
|
|
def stop_playing(self): |
|
self.__playing = False |
|
|
|
if __name__ == '__main__': |
|
def event_cb(evt): |
|
print 'Got event:', PhoneEvent.string(evt) |
|
|
|
try: |
|
phone = PhoneInterface('.linphonerc-foo', '.linphonerc') |
|
phone.add_event_cb(event_cb) |
|
phone.start() |
|
i = 0 |
|
while True: |
|
time.sleep(1) |
|
i += 1 |
|
if i == 5: |
|
phone.call('3474') |
|
#phone.play_busy_tone() |
|
pass |
|
except KeyboardInterrupt: |
|
phone.stop() |
|
|
|
|