Skip to content

Instantly share code, notes, and snippets.

@ethagnawl
Created August 3, 2022 19:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ethagnawl/a1824df3fbd7c9a6d8fd29581559d493 to your computer and use it in GitHub Desktop.
Save ethagnawl/a1824df3fbd7c9a6d8fd29581559d493 to your computer and use it in GitHub Desktop.
aiortc client.js => client.py POC
import asyncio
import json
import requests
import time
from aiortc import (
MediaStreamTrack,
RTCConfiguration,
RTCIceServer,
RTCPeerConnection,
RTCSessionDescription,
)
from aiortc.contrib.media import MediaPlayer, MediaRelay
from loguru import logger
relay = MediaRelay()
class VideoTransformTrack(MediaStreamTrack):
"""
A video stream track that transforms frames from an another track.
"""
kind = "video"
def __init__(self, track, procedure_id):
if track.kind != "video":
raise Exception("Unsupported kind: %s" % (track.kind))
super().__init__()
logger.info("track kind: %s" % track.kind)
self.track = track
async def recv(self):
logger.info("Attempt to receive frame ...")
frame = await self.track.recv()
logger.info("Received frame: %s" % (frame))
async def run():
pc = RTCPeerConnection()
pc.addTransceiver("video", direction="recvonly")
@pc.on("connectionstatechange")
async def on_connectionstatechange():
logger.info("Connection state is %s" % pc.connectionState)
if pc.connectionState == "failed":
await pc.close()
@pc.on("track")
async def on_track(track):
"""
Track has been received from client.
"""
logger.info("Track %s received" % track)
pc.addTrack(
VideoTransformTrack(
relay.subscribe(track),
)
)
@track.on("ended")
async def on_ended():
logger.info("Track ended: %s" % track)
offer = await pc.createOffer()
await pc.setLocalDescription(offer)
while True:
logger.info("icegatheringstate: %s" % pc.iceGatheringState)
if pc.iceGatheringState == "complete":
break
offer2 = pc.localDescription
data = {
"sdp": offer2.sdp,
"type": offer2.type,
}
response = requests.post(
"http://localhost:8080/offer",
headers={"Content-Type": "application/json"},
json=data,
)
response_data = response.json()
await pc.setRemoteDescription(
RTCSessionDescription(sdp=response_data["sdp"], type=response_data["type"])
)
while True:
time.sleep(2)
logger.info("pc.iceGatheringState: %s" % pc.iceGatheringState)
logger.info("pc.connectionState : %s" % pc.connectionState)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment