You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 lines
9.2 KiB

import socket
import threading
import pdb
class LinphoneEvent():
def __init__(self):
pass
class CallStateChanged(LinphoneEvent):
def __init__(self, event, caller_id, call_id):
self.event = event
self.caller_id = caller_id
self.call_id = call_id
class LinphoneCommunicationSocket():
def __init__(self, lp_socket):
self.socket_path = lp_socket
self.socket = socket.socket(socket.AF_UNIX)
self.socket.connect(self.socket_path)
self.socket_lock = threading.Lock()
def send_command(self, command, blocking=True):
with self.socket_lock:
self.socket.send(command.encode("ascii"))
return self._await_answer()
def register(self, identity, proxy_address, password = "NULL", userid = "NULL", realm = "NULL", parameters = None):
answer = self.send_command(("register {identity} {proxy_address} {password} {userid} {realm} {parameters}".format(identity=identity, proxy_address=proxy_address, password=password, userid=userid, realm=realm, parameters="" if parameters else parameters)))
return int(answer["data"][0].split(":", 1)[1].strip()) #id of newly registered account
def register_status(self, account_id=None):
answer = self.send_command(("register-status {account_id}".format(account_id="ALL" if account_id == None else account_id)))
accounts = []
if answer["status"]:
for acc_idx in range(0, len(answer["data"]), 2):
acc_id = int(answer["data"][acc_idx].split(":", 1)[1].strip())
acc_state = answer["data"][acc_idx+1].split(":", 1)[1].strip()
accounts.append([acc_id, acc_state])
else:
raise RuntimeError(answer["error"])
return accounts
def register_info(self, account_id=None):
answer = self.send_command(("register-info {account_id}".format(account_id="ALL" if account_id == None else account_id)))
accounts = []
if answer["status"]:
for acc_idx in range(0, len(answer["data"]), 4):
acc_id = int(answer["data"][acc_idx+0].split(":", 1)[1].strip())
acc_identity = answer["data"][acc_idx+1].split(":", 1)[1].strip()
acc_proxy = answer["data"][acc_idx+2].split(":", 1)[1].strip()
acc_state = answer["data"][acc_idx+3].split(":", 1)[1].strip()
accounts.append([acc_id, acc_identity, acc_proxy, acc_state])
else:
raise RuntimeError(answer["error"])
return accounts
def answer(self, call_id=None):
answer = self.send_command(("answer {call_id}".format(call_id="" if call_id == None else call_id)))
if answer["status"]:
return True
else:
raise RuntimeError(answer["error"])
def terminate(self, call_id=None):
answer = self.send_command(("terminate {call_id}".format(call_id="" if call_id == None else call_id)))
if answer["status"]:
return True
else:
raise RuntimeError(answer["error"])
def call(self, sip_address):
answer = self.send_command("call {sip_address}".format(sip_address=sip_address))
if answer["status"]:
return answer["data"][0].split(":", 1)[1].strip()
else:
raise RuntimeError(answer["error"])
def call_mute(self, mute=True):
answer = self.send_command("call-mute {mute}".format(mute="1" if mute else "0"))
if answer["status"]:
return True
else:
raise RuntimeError(answer["error"])
def call_pause(self, call_id):
answer = self.send_command("call-pause {call_id}".format(call_id=call_id))
if answer["status"]:
return True
else:
raise RuntimeError(answer["error"])
def call_status(self, call_id):
answer = self.send_command("call-status {call_id}".format(call_id=call_id))
if answer["status"]:
data = answer["data"]
status = {
"state": data[0].split(":", 1)[1].strip(),
"from": data[1].split(":", 1)[1].strip(),
"direction": data[2].split(":", 1)[1].strip(),
"duration": int(data[3].split(":", 1)[1].strip())
}
return status
else:
raise RuntimeError(answer["error"])
def call_stats(self, call_id):
answer = self.send_command("call-stats {call_id}".format(call_id=call_id))
if answer["status"]:
stats_offsets = [i for i, x in enumerate(answer["data"]) if "Id:" in x]
stats = {
"audio": {},
"video": {}
}
stat_type = answer["data"][stats_offsets[0]+1].split(":", 1)[1].strip().lower()
for stat in answer["data"][:stats_offsets[1]]:
k, v = stat.split(":", 1)
key = k.strip().lower().replace(" ", "_")
value = v.strip().lower().replace(" ", "_")
try:
value = int(value)
except Exception:
pass
stats[stat_type].update({key: value})
stat_type = answer["data"][stats_offsets[1]+1].split(":", 1)[1].strip().lower()
for stat in answer["data"][stats_offsets[1]:]:
k, v = stat.split(":", 1)
key = k.strip().lower().replace(" ", "_")
value = v.strip().lower().replace(" ", "_")
try:
value = int(value)
except Exception:
pass
stats[stat_type].update({key: value})
return status
else:
raise RuntimeError(answer["error"])
def call_resume(self, call_id):
answer = self.send_command("call-resume {call_id}".format(call_id=call_id))
if answer["status"]:
return True
else:
raise RuntimeError(answer["error"])
def dtmf(self, digit):
answer = self.send_command("dtmf {digit}".format(digit=digit))
if answer["status"]:
return True
else:
raise RuntimeError(answer["error"])
def process_event(self):
answer = self.send_command("pop-event")
if answer["status"]:
size_string = answer["data"][0]
remaining_item_count = int(size_string.split(":")[1].strip())
answer["data"].pop(0)
if len(answer["data"]) > 0:
event = self._parse_event_data(answer["data"])
getattr(self, "on{}".format(event.event))(event)
if remaining_item_count > 0:
self.process_event()
else:
raise RuntimeError("Failed to fetch event from the linphone event queue")
def _parse_event_data(self, data):
event_type = data[0].split(":", 1)[1].strip()
if event_type == "call-state-changed":
event = data[1].split(":", 1)[1].strip()
caller_id = data[2].split(":", 1)[1].strip()
call_id = data[3].split(":", 1)[1].strip()
event = CallStateChanged(event, caller_id, int(call_id))
else:
print("Unknown event type")
pdb.set_trace()
return event
def _await_answer(self):
chunk = self.socket.recv(1)
buf_size = 1024
data = chunk
while True:
try:
chunk = self.socket.recv(buf_size, socket.MSG_DONTWAIT)
except BlockingIOError:
break
data += chunk
if len(chunk) < buf_size:
break
return self._parse_answer(data)
def _parse_answer(self, raw_answer):
answer = raw_answer.decode("ascii").split("\n")
answer = list(filter(lambda x: len(x) > 0, answer)) #filter empty elements/lines
status = answer[0].split(":")[1].strip()
status = True if status == "Ok" else False
answer_dict = {"status": status}
if not status:
error = answer[1].split(":")[1].strip()
answer_dict["error"] = error
else:
answer_dict["data"] = answer[1:]
return answer_dict
def onLinphoneCallIncomingReceived(self, event):
print("dummy onLinphoneCallIncomingReceived")
pass
def onLinphoneCallEnd(self, event):
print("dummy onLinphoneCallEnd")
pass
def onLinphoneCallReleased(self, event):
print("dummy onLinphoneCallReleased")
pass
def onLinphoneCallConnected(self, event):
print("dummy onLinphoneCallConnected")
pass
def onLinphoneCallStreamsRunning(self, event):
print("dummy onLinphoneCallStreamsRunning")
pass
def onLinphoneCallOutgoingInit(self, event):
print("dummy onLinphoneCallOutgoingInit")
pass
def onLinphoneCallOutgoingProgress(self, event):
print("dummy onLinphoneCallOutgoingProgress")
pass
def onLinphoneCallOutgoingRinging(self, event):
print("dummy onLinphoneCallOutgoingRinging")
pass
def onLinphoneCallPausing(self, event):
print("dummy onLinphoneCallPausing")
pass
def onLinphoneCallPaused(self, event):
print("dummy onLinphoneCallPaused")
pass
def onLinphoneCallResuming(self, event):
print("dummy onLinphoneCallResuming")
pass