import socket import threading import pdb class LinphoneEvent(): def __init__(self): pass class CallStateChanged(LinphoneEvent): def __init__(self, event, caller_id, call_id): self.event = event self.caller_id = caller_id self.call_id = call_id class LinphoneCommunicationSocket(): def __init__(self, lp_socket): self.socket_path = lp_socket self.socket = socket.socket(socket.AF_UNIX) self.socket.connect(self.socket_path) self.socket_lock = threading.Lock() def send_command(self, command, blocking=True): with self.socket_lock: self.socket.send(command.encode("ascii")) return self._await_answer() def register(self, identity, proxy_address, password = "NULL", userid = "NULL", realm = "NULL", parameters = None): answer = self.send_command(("register {identity} {proxy_address} {password} {userid} {realm} {parameters}".format(identity=identity, proxy_address=proxy_address, password=password, userid=userid, realm=realm, parameters="" if parameters else parameters))) return int(answer["data"][0].split(":", 1)[1].strip()) #id of newly registered account def register_status(self, account_id=None): answer = self.send_command(("register-status {account_id}".format(account_id="ALL" if account_id == None else account_id))) accounts = [] if answer["status"]: for acc_idx in range(0, len(answer["data"]), 2): acc_id = int(answer["data"][acc_idx].split(":", 1)[1].strip()) acc_state = answer["data"][acc_idx+1].split(":", 1)[1].strip() accounts.append([acc_id, acc_state]) else: raise RuntimeError(answer["error"]) return accounts def register_info(self, account_id=None): answer = self.send_command(("register-info {account_id}".format(account_id="ALL" if account_id == None else account_id))) accounts = [] if answer["status"]: for acc_idx in range(0, len(answer["data"]), 4): acc_id = int(answer["data"][acc_idx+0].split(":", 1)[1].strip()) acc_identity = answer["data"][acc_idx+1].split(":", 1)[1].strip() acc_proxy = answer["data"][acc_idx+2].split(":", 1)[1].strip() acc_state = answer["data"][acc_idx+3].split(":", 1)[1].strip() accounts.append([acc_id, acc_identity, acc_proxy, acc_state]) else: raise RuntimeError(answer["error"]) return accounts def answer(self, call_id=None): answer = self.send_command(("answer {call_id}".format(call_id="" if call_id == None else call_id))) if answer["status"]: return True else: raise RuntimeError(answer["error"]) def terminate(self, call_id=None): answer = self.send_command(("terminate {call_id}".format(call_id="" if call_id == None else call_id))) if answer["status"]: return True else: raise RuntimeError(answer["error"]) def call(self, sip_address): answer = self.send_command("call {sip_address}".format(sip_address=sip_address)) if answer["status"]: return answer["data"][0].split(":", 1)[1].strip() else: raise RuntimeError(answer["error"]) def call_mute(self, mute=True): answer = self.send_command("call-mute {mute}".format(mute="1" if mute else "0")) if answer["status"]: return True else: raise RuntimeError(answer["error"]) def call_pause(self, call_id): answer = self.send_command("call-pause {call_id}".format(call_id=call_id)) if answer["status"]: return True else: raise RuntimeError(answer["error"]) def call_status(self, call_id): answer = self.send_command("call-status {call_id}".format(call_id=call_id)) if answer["status"]: data = answer["data"] status = { "state": data[0].split(":", 1)[1].strip(), "from": data[1].split(":", 1)[1].strip(), "direction": data[2].split(":", 1)[1].strip(), "duration": int(data[3].split(":", 1)[1].strip()) } return status else: raise RuntimeError(answer["error"]) def call_stats(self, call_id): answer = self.send_command("call-stats {call_id}".format(call_id=call_id)) if answer["status"]: stats_offsets = [i for i, x in enumerate(answer["data"]) if "Id:" in x] stats = { "audio": {}, "video": {} } stat_type = answer["data"][stats_offsets[0]+1].split(":", 1)[1].strip().lower() for stat in answer["data"][:stats_offsets[1]]: k, v = stat.split(":", 1) key = k.strip().lower().replace(" ", "_") value = v.strip().lower().replace(" ", "_") try: value = int(value) except Exception: pass stats[stat_type].update({key: value}) stat_type = answer["data"][stats_offsets[1]+1].split(":", 1)[1].strip().lower() for stat in answer["data"][stats_offsets[1]:]: k, v = stat.split(":", 1) key = k.strip().lower().replace(" ", "_") value = v.strip().lower().replace(" ", "_") try: value = int(value) except Exception: pass stats[stat_type].update({key: value}) return status else: raise RuntimeError(answer["error"]) def call_resume(self, call_id): answer = self.send_command("call-resume {call_id}".format(call_id=call_id)) if answer["status"]: return True else: raise RuntimeError(answer["error"]) def dtmf(self, digit): answer = self.send_command("dtmf {digit}".format(digit=digit)) if answer["status"]: return True else: raise RuntimeError(answer["error"]) def process_event(self): answer = self.send_command("pop-event") if answer["status"]: size_string = answer["data"][0] remaining_item_count = int(size_string.split(":")[1].strip()) answer["data"].pop(0) if len(answer["data"]) > 0: event = self._parse_event_data(answer["data"]) getattr(self, "on{}".format(event.event))(event) if remaining_item_count > 0: self.process_event() else: raise RuntimeError("Failed to fetch event from the linphone event queue") def _parse_event_data(self, data): event_type = data[0].split(":", 1)[1].strip() if event_type == "call-state-changed": event = data[1].split(":", 1)[1].strip() caller_id = data[2].split(":", 1)[1].strip() call_id = data[3].split(":", 1)[1].strip() event = CallStateChanged(event, caller_id, int(call_id)) else: print("Unknown event type") pdb.set_trace() return event def _await_answer(self): chunk = self.socket.recv(1) buf_size = 1024 data = chunk while True: try: chunk = self.socket.recv(buf_size, socket.MSG_DONTWAIT) except BlockingIOError: break data += chunk if len(chunk) < buf_size: break return self._parse_answer(data) def _parse_answer(self, raw_answer): answer = raw_answer.decode("ascii").split("\n") answer = list(filter(lambda x: len(x) > 0, answer)) #filter empty elements/lines status = answer[0].split(":")[1].strip() status = True if status == "Ok" else False answer_dict = {"status": status} if not status: error = answer[1].split(":")[1].strip() answer_dict["error"] = error else: answer_dict["data"] = answer[1:] return answer_dict def onLinphoneCallIncomingReceived(self, event): print("dummy onLinphoneCallIncomingReceived") pass def onLinphoneCallEnd(self, event): print("dummy onLinphoneCallEnd") pass def onLinphoneCallReleased(self, event): print("dummy onLinphoneCallReleased") pass def onLinphoneCallConnected(self, event): print("dummy onLinphoneCallConnected") pass def onLinphoneCallStreamsRunning(self, event): print("dummy onLinphoneCallStreamsRunning") pass def onLinphoneCallOutgoingInit(self, event): print("dummy onLinphoneCallOutgoingInit") pass def onLinphoneCallOutgoingProgress(self, event): print("dummy onLinphoneCallOutgoingProgress") pass def onLinphoneCallOutgoingRinging(self, event): print("dummy onLinphoneCallOutgoingRinging") pass def onLinphoneCallPausing(self, event): print("dummy onLinphoneCallPausing") pass def onLinphoneCallPaused(self, event): print("dummy onLinphoneCallPaused") pass def onLinphoneCallResuming(self, event): print("dummy onLinphoneCallResuming") pass