mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-02 06:23:37 +02:00
refactor: remove SaluteJazz carrier support
This commit is contained in:
@@ -1,59 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import json
|
||||
import uuid
|
||||
import aiohttp
|
||||
|
||||
API_BASE = "https://bk.salutejazz.ru"
|
||||
JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"}
|
||||
|
||||
async def get_jazz_info():
|
||||
print("\n--- SaluteJazz Info ---")
|
||||
timeout = aiohttp.ClientTimeout(total=15)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
print("[1/4] API Initialization...")
|
||||
try:
|
||||
r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "InfoBot", "guestEnabled": True, "lobbyEnabled": False, "room3dEnabled": False})
|
||||
rj = await r.json()
|
||||
print(" :P Room created")
|
||||
print(json.dumps(rj, indent=2))
|
||||
|
||||
r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "sdkRoomSupport": True, "mediaWithoutAutoSubscribeSupport": True}})
|
||||
r2j = await r2.json()
|
||||
print(" :P Preconnect info received")
|
||||
print(json.dumps(r2j, indent=2))
|
||||
conn_url = r2j['connectorUrl']
|
||||
except Exception as e:
|
||||
print(f" X Error: {e}"); return
|
||||
|
||||
print(f"\n[2/4] Connecting to signaling...")
|
||||
async with session.ws_connect(conn_url) as ws:
|
||||
await ws.send_json({"roomId": rj["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": rj["password"], "participantName": "InfoBot", "supportedFeatures": {"attachedRooms": True}, "isSilent": False}})
|
||||
print(" :P Signaling established")
|
||||
|
||||
print("\n[3/4] Collecting network & media details...")
|
||||
end = asyncio.get_event_loop().time() + 8
|
||||
while asyncio.get_event_loop().time() < end:
|
||||
try:
|
||||
m = await asyncio.wait_for(ws.receive(), 1)
|
||||
if m.type == aiohttp.WSMsgType.TEXT:
|
||||
d = json.loads(m.data); ev = d.get("event", ""); p = d.get("payload", {}); meth = p.get("method", "")
|
||||
print(f" -> Event: {ev}{' ('+meth+')' if meth else ''}")
|
||||
if meth == "rtc:config":
|
||||
print("\n--- ICE Servers ---")
|
||||
print(json.dumps(p.get("configuration", {}).get("iceServers", []), indent=2))
|
||||
elif meth == "rtc:offer":
|
||||
print("\n--- SDP Offer (Codecs & Quality) ---")
|
||||
print(p.get("description", {}).get("sdp", ""))
|
||||
elif ev == "join-response":
|
||||
print("\n--- Participant Group ---")
|
||||
print(json.dumps(p.get("participantGroup", {}), indent=2))
|
||||
else:
|
||||
print(json.dumps(p, indent=2))
|
||||
except: continue
|
||||
|
||||
print("\n--- INFO COLLECTION COMPLETE ---")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try: asyncio.run(get_jazz_info())
|
||||
except KeyboardInterrupt: pass
|
||||
@@ -1,241 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""PoC: SaluteJazz DataChannel over LiveKit."""
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
import aiohttp
|
||||
from aiortc import RTCConfiguration, RTCIceCandidate, RTCIceServer, RTCPeerConnection, RTCSessionDescription
|
||||
from aiortc.mediastreams import AudioStreamTrack
|
||||
from aiortc.rtcconfiguration import RTCBundlePolicy
|
||||
|
||||
logging.getLogger("aiortc").setLevel(logging.WARNING)
|
||||
|
||||
API_BASE = "https://bk.salutejazz.ru"
|
||||
JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"}
|
||||
TEST_MESSAGES = ["Hello Jazz DC!", "Hello world", "X" * 100, "Final test"]
|
||||
|
||||
def _pb_varint(v: int) -> bytes:
|
||||
b = bytearray()
|
||||
while v > 0x7F: b.append((v & 0x7F) | 0x80); v >>= 7
|
||||
b.append(v & 0x7F)
|
||||
return bytes(b)
|
||||
|
||||
def _pb_field(f: int, w: int, d: bytes) -> bytes:
|
||||
t = _pb_varint((f << 3) | w)
|
||||
return t + d if w == 0 else (t + _pb_varint(len(d)) + d if w == 2 else t + d)
|
||||
|
||||
def _read_varint(s: io.BytesIO) -> int | None:
|
||||
res, shift = 0, 0
|
||||
while b := s.read(1):
|
||||
res |= (b[0] & 0x7F) << shift
|
||||
if not (b[0] & 0x80): return res
|
||||
shift += 7
|
||||
return None
|
||||
|
||||
def encode_data_packet(payload: bytes, topic: str = "") -> bytes:
|
||||
uf = _pb_field(2, 2, payload) + (_pb_field(4, 2, topic.encode()) if topic else b"") + _pb_field(8, 2, str(uuid.uuid4()).encode())
|
||||
return _pb_field(1, 0, _pb_varint(0)) + _pb_field(2, 2, uf)
|
||||
|
||||
def decode_data_packet(raw: bytes) -> tuple[bytes, str] | None:
|
||||
s = io.BytesIO(raw)
|
||||
ud = None
|
||||
while (tg := _read_varint(s)) is not None:
|
||||
wt = tg & 0x07
|
||||
if wt == 0: _read_varint(s)
|
||||
elif wt == 2:
|
||||
l = _read_varint(s)
|
||||
if l is None: break
|
||||
d = s.read(l)
|
||||
if (tg >> 3) == 2: ud = d
|
||||
elif wt == 1: s.read(8)
|
||||
elif wt == 5: s.read(4)
|
||||
else: break
|
||||
if ud is None: return None
|
||||
p, t, ins = b"", "", io.BytesIO(ud)
|
||||
while (tg := _read_varint(ins)) is not None:
|
||||
wt = tg & 0x07
|
||||
if wt == 0: _read_varint(ins)
|
||||
elif wt == 2:
|
||||
l = _read_varint(ins)
|
||||
if l is None: break
|
||||
d = ins.read(l)
|
||||
fn = tg >> 3
|
||||
if fn == 2: p = d
|
||||
elif fn == 4: t = d.decode(errors="replace")
|
||||
elif wt == 1: ins.read(8)
|
||||
elif wt == 5: ins.read(4)
|
||||
else: break
|
||||
return p, t
|
||||
|
||||
async def _create_peer(name: str, room: dict, session: aiohttp.ClientSession, is_server: bool = False, stats: dict = None) -> dict:
|
||||
ws = await session.ws_connect(room["connectorUrl"])
|
||||
await ws.send_json({"roomId": room["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": room["password"], "participantName": name, "supportedFeatures": {"attachedRooms": True, "sessionGroups": True}, "isSilent": False}})
|
||||
|
||||
peer = {"ws": ws, "pc_sub": None, "pc_pub": None, "dc": None, "ready": asyncio.Event(), "sub_ready": asyncio.Event()}
|
||||
group_id, p_ice_sub, p_ice_pub = None, [], []
|
||||
ice_servers = []
|
||||
|
||||
async def ws_loop():
|
||||
nonlocal group_id
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
data = json.loads(msg.data)
|
||||
ev = data.get("event", "")
|
||||
p = data.get("payload", {})
|
||||
m = p.get("method", "")
|
||||
|
||||
if ev == "join-response": group_id = p.get("participantGroup", {}).get("groupId")
|
||||
elif ev == "media-out" and m == "rtc:config":
|
||||
for s in p.get("configuration", {}).get("iceServers", []):
|
||||
urls = [u for u in s.get("urls", []) if "transport=udp" in u]
|
||||
if urls: ice_servers.append(RTCIceServer(urls=[urls[0]], username=s.get("username"), credential=s.get("credential")))
|
||||
|
||||
elif ev == "media-out" and m == "rtc:offer" and not peer["pc_sub"]:
|
||||
peer["pc_sub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE))
|
||||
@peer["pc_sub"].on("connectionstatechange")
|
||||
def _():
|
||||
if peer["pc_sub"].connectionState == "connected": peer["sub_ready"].set()
|
||||
|
||||
@peer["pc_sub"].on("datachannel")
|
||||
def on_dc(ch):
|
||||
if ch.label != "_reliable": return
|
||||
@ch.on("message")
|
||||
def on_msg(msg_data):
|
||||
parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode())
|
||||
if not parsed or parsed[1] != "poc": return
|
||||
stats["recv"] += 1
|
||||
if is_server and peer["dc"]:
|
||||
try:
|
||||
peer["dc"].send(encode_data_packet(f"Echo: {parsed[0].decode()}".encode(), "poc"))
|
||||
stats["sent"] += 1
|
||||
except: pass
|
||||
|
||||
@peer["pc_sub"].on("icecandidate")
|
||||
async def on_sub_ice(e):
|
||||
if e and e.candidate and group_id:
|
||||
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "SUBSCRIBER"}]}})
|
||||
|
||||
await peer["pc_sub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="offer"))
|
||||
ans = await peer["pc_sub"].createAnswer()
|
||||
await peer["pc_sub"].setLocalDescription(ans)
|
||||
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:answer", "description": {"type": "answer", "sdp": peer["pc_sub"].localDescription.sdp}}})
|
||||
for c in p_ice_sub:
|
||||
pts = c.get("candidate","").split()
|
||||
if len(pts) >= 8: await peer["pc_sub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)))
|
||||
p_ice_sub.clear()
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
peer["pc_pub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE))
|
||||
peer["pc_pub"].addTrack(AudioStreamTrack())
|
||||
peer["dc"] = peer["pc_pub"].createDataChannel("_reliable", ordered=True)
|
||||
|
||||
@peer["dc"].on("open")
|
||||
def on_open(): peer["ready"].set()
|
||||
|
||||
@peer["dc"].on("message")
|
||||
def on_pub_msg(msg_data):
|
||||
parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode())
|
||||
if parsed and parsed[1] == "poc": stats["recv"] += 1
|
||||
|
||||
@peer["pc_pub"].on("icecandidate")
|
||||
async def on_pub_ice(e):
|
||||
if e and e.candidate and group_id:
|
||||
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "PUBLISHER"}]}})
|
||||
|
||||
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:track:add", "cid": str(uuid.uuid4()), "track": {"type": "AUDIO", "source": "MICROPHONE", "muted": True}}})
|
||||
pub_offer = await peer["pc_pub"].createOffer()
|
||||
await peer["pc_pub"].setLocalDescription(pub_offer)
|
||||
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:offer", "description": {"type": "offer", "sdp": peer["pc_pub"].localDescription.sdp}}})
|
||||
|
||||
elif ev == "media-out" and m == "rtc:answer" and peer["pc_pub"]:
|
||||
await peer["pc_pub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="answer"))
|
||||
for c in p_ice_pub:
|
||||
pts = c.get("candidate","").split()
|
||||
if len(pts) >= 8: await peer["pc_pub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)))
|
||||
p_ice_pub.clear()
|
||||
|
||||
elif ev == "media-out" and m == "rtc:ice":
|
||||
for c in p.get("rtcIceCandidates", []):
|
||||
pts = c.get("candidate","").split()
|
||||
if len(pts) < 8: continue
|
||||
ice = RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0))
|
||||
tgt = c.get("target")
|
||||
if tgt == "SUBSCRIBER": (await peer["pc_sub"].addIceCandidate(ice)) if peer["pc_sub"] else p_ice_sub.append(c)
|
||||
elif tgt == "PUBLISHER": (await peer["pc_pub"].addIceCandidate(ice)) if peer["pc_pub"] else p_ice_pub.append(c)
|
||||
|
||||
async def _keep():
|
||||
while not ws.closed:
|
||||
await asyncio.sleep(5)
|
||||
if group_id: await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ping", "ping_req": {"timestamp": int(time.time()*1000), "rtt": 0}}})
|
||||
|
||||
peer["task"] = asyncio.create_task(ws_loop())
|
||||
peer["keep"] = asyncio.create_task(_keep())
|
||||
return peer
|
||||
|
||||
async def run_poc() -> dict:
|
||||
print("\n--- SaluteJazz PoC ---")
|
||||
results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []}
|
||||
s_stats, c_stats = {"sent": 0, "recv": 0}, {"sent": 0, "recv": 0}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
try:
|
||||
r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "PoC", "guestEnabled": True, "lobbyEnabled": False})
|
||||
rj = await r.json()
|
||||
r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "demoRoomBaseSupport": True, "demoRoomVersionSupport": 2, "mediaWithoutAutoSubscribeSupport": True}})
|
||||
room_inf = {"roomId": rj["roomId"], "password": rj["password"], "connectorUrl": (await r2.json())["connectorUrl"]}
|
||||
except Exception as e:
|
||||
results["errors"].append(f"Auth fail: {e}")
|
||||
return results
|
||||
|
||||
print("[1/3] Connecting Server & Client...")
|
||||
try:
|
||||
server = await _create_peer("Server", room_inf, session, is_server=True, stats=s_stats)
|
||||
await asyncio.wait_for(server["ready"].wait(), 15.0)
|
||||
results["server_ok"] = True
|
||||
|
||||
client = await _create_peer("Client", room_inf, session, is_server=False, stats=c_stats)
|
||||
await asyncio.wait_for(client["ready"].wait(), 15.0)
|
||||
results["client_ok"] = True
|
||||
print(" :P Peers connected")
|
||||
except Exception as e:
|
||||
results["errors"].append(str(e))
|
||||
return results
|
||||
|
||||
print("\n[2/3] Exchanging messages...")
|
||||
await asyncio.sleep(1)
|
||||
for idx, msg in enumerate(TEST_MESSAGES, 1):
|
||||
try:
|
||||
client["dc"].send(encode_data_packet(msg.encode(), "poc"))
|
||||
c_stats["sent"] += 1
|
||||
print(f" -> Sent: {msg}")
|
||||
await asyncio.sleep(0.5)
|
||||
except Exception as e:
|
||||
results["errors"].append(f"Sending {idx} failed: {str(e)}")
|
||||
|
||||
await asyncio.sleep(3)
|
||||
results["sent"], results["recv"] = c_stats["sent"], c_stats["recv"]
|
||||
|
||||
print("\n[3/3] Cleaning up...")
|
||||
for p in (server, client):
|
||||
for t in ["task", "keep"]: p[t].cancel()
|
||||
await p["ws"].close()
|
||||
for pc in [p["pc_sub"], p["pc_pub"]]:
|
||||
if pc: await pc.close()
|
||||
|
||||
return results
|
||||
|
||||
def print_results(res: dict):
|
||||
print("\n--- TEST RESULTS ---")
|
||||
print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}")
|
||||
print(f"Messages: Sent {res['sent']} / Recv {res['recv']}")
|
||||
if res['errors']:
|
||||
for e in res['errors']: print(f" Error: {e}")
|
||||
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try: res = asyncio.run(run_poc()); print_results(res)
|
||||
except KeyboardInterrupt: pass
|
||||
Reference in New Issue
Block a user