commit b81a6647e5f869633bdeaa5bb30829c467a57136 Author: sqozz Date: Tue Sep 1 09:38:58 2020 +0200 Add first version diff --git a/pylinphone.py b/pylinphone.py new file mode 100644 index 0000000..16631e8 --- /dev/null +++ b/pylinphone.py @@ -0,0 +1,140 @@ +import socket +import pdb + +class LinphoneEvent(): + def __init__(self): + pass + +class CallStateChanged(LinphoneEvent): + def __init__(self, event, caller_id, call_id): + self.event = event + self.caller_id = caller_id + self.call_id = call_id + +class LinphoneCommunicationSocket(): + def __init__(self, lp_socket): + self.socket_path = lp_socket + self.socket = socket.socket(socket.AF_UNIX) + self.socket.connect(self.socket_path) + + def send_command(self, command, blocking=True): + self.socket.send(command.encode("ascii")) + + def register(self, identity, proxy_address, password = "NULL", userid = "NULL", realm = "NULL", parameters = None): + self.socket.send(("register {identity} {proxy_address} {password} {userid} {realm} {parameters}".format(identity=identity, proxy_address=proxy_address, password=password, userid=userid, realm=realm, parameters="" if parameters else parameters)).encode("ascii")) + answer = self._await_answer() + return int(answer["data"][0].split(":", 1)[1].strip()) #id of newly registered account + + def register_status(self, account_id=None): + self.socket.send(("register-status {account_id}".format(account_id="ALL" if account_id == None else account_id)).encode("ascii")) + accounts = [] + answer = self._await_answer() + if answer["status"]: + for acc_idx in range(0, len(answer["data"]), 2): + acc_id = int(answer["data"][acc_idx].split(":", 1)[1].strip()) + acc_state = answer["data"][acc_idx+1].split(":", 1)[1].strip() + accounts.append([acc_id, acc_state]) + else: + raise RuntimeError(answer["error"]) + return accounts + + def register_info(self, account_id=None): + self.socket.send(("register-info {account_id}".format(account_id="ALL" if account_id == None else account_id)).encode("ascii")) + accounts = [] + answer = self._await_answer() + if answer["status"]: + for acc_idx in range(0, len(answer["data"]), 4): + acc_id = int(answer["data"][acc_idx+0].split(":", 1)[1].strip()) + acc_identity = answer["data"][acc_idx+1].split(":", 1)[1].strip() + acc_proxy = answer["data"][acc_idx+2].split(":", 1)[1].strip() + acc_state = answer["data"][acc_idx+3].split(":", 1)[1].strip() + accounts.append([acc_id, acc_identity, acc_proxy, acc_state]) + else: + raise RuntimeError(answer["error"]) + return accounts + + def answer(self, call_id=None): + self.socket.send(("answer {call_id}".format(call_id="" if call_id == None else call_id)).encode("ascii")) + answer = self._await_answer() + + def process_event(self): + self.socket.send("pop-event".encode("ascii")) + answer = self._await_answer() + if answer["status"]: + size_string = answer["data"][0] + remaining_item_count = int(size_string.split(":")[1].strip()) + answer["data"].pop(0) + + if len(answer["data"]) > 0: + event = self._parse_event_data(answer["data"]) + getattr(self, "on{}".format(event.event))(event) + + if remaining_item_count > 0: + self.process_event() + else: + raise RuntimeError("Failed to fetch event from the linphone event queue") + + def _parse_event_data(self, data): + event_type = data[0].split(":", 1)[1].strip() + if event_type == "call-state-changed": + event = data[1].split(":", 1)[1].strip() + caller_id = data[2].split(":", 1)[1].strip() + call_id = data[3].split(":", 1)[1].strip() + event = CallStateChanged(event, caller_id, int(call_id)) + else: + print("Unknown event type") + pdb.set_trace() + + return event + + def _await_answer(self): + chunk = self.socket.recv(1) + buf_size = 1024 + data = chunk + while True: + try: + chunk = self.socket.recv(buf_size, socket.MSG_DONTWAIT) + except BlockingIOError: + break + + data += chunk + if len(chunk) < buf_size: + break + + return self._parse_answer(data) + + def _parse_answer(self, raw_answer): + answer = raw_answer.decode("ascii").split("\n") + answer = list(filter(lambda x: len(x) > 0, answer)) #filter empty elements/lines + status = answer[0].split(":")[1].strip() + status = True if status == "Ok" else False + answer_dict = {"status": status} + + if not status: + error = answer[1].split(":")[1].strip() + answer_dict["error"] = error + else: + answer_dict["data"] = answer[1:] + + return answer_dict + + def onLinphoneCallIncomingReceived(self, event): + print("dummy onLinphoneCallIncomingReceived") + pass + + def onLinphoneCallEnd(self, event): + print("dummy onLinphoneCallEnd") + pass + + def onLinphoneCallReleased(self, event): + print("dummy onLinphoneCallReleased") + pass + + def onLinphoneCallConnected(self, event): + print("dummy onLinphoneCallConnected") + pass + + def onLinphoneCallStreamsRunning(self, event): + print("dummy onLinphoneCallStreamsRunning") + pass +