Skip to content

Instantly share code, notes, and snippets.

@kueblert
Last active April 25, 2024 07:28
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save kueblert/fc1517fc9254e9a3cb0add7795c337f4 to your computer and use it in GitHub Desktop.
Save kueblert/fc1517fc9254e9a3cb0add7795c337f4 to your computer and use it in GitHub Desktop.
aiortc opencv webcam stream
// get DOM elements
var dataChannelLog = document.getElementById('data-channel'),
iceConnectionLog = document.getElementById('ice-connection-state'),
iceGatheringLog = document.getElementById('ice-gathering-state'),
signalingLog = document.getElementById('signaling-state');
// peer connection
var pc = null;
// data channel
var dc = null, dcInterval = null;
function createPeerConnection() {
var config = {
sdpSemantics: 'unified-plan'
};
if (document.getElementById('use-stun').checked) {
config.iceServers = [{urls: ['stun:stun.l.google.com:19302']}];
}
pc = new RTCPeerConnection(config);
// register some listeners to help debugging
pc.addEventListener('icegatheringstatechange', function() {
iceGatheringLog.textContent += ' -> ' + pc.iceGatheringState;
}, false);
iceGatheringLog.textContent = pc.iceGatheringState;
pc.addEventListener('iceconnectionstatechange', function() {
iceConnectionLog.textContent += ' -> ' + pc.iceConnectionState;
}, false);
iceConnectionLog.textContent = pc.iceConnectionState;
pc.addEventListener('signalingstatechange', function() {
signalingLog.textContent += ' -> ' + pc.signalingState;
}, false);
signalingLog.textContent = pc.signalingState;
// connect audio / video
pc.addEventListener('track', function(evt) {
if (evt.track.kind == 'video')
document.getElementById('video').srcObject = evt.streams[0];
else
document.getElementById('audio').srcObject = evt.streams[0];
});
return pc;
}
function negotiate() {
return pc.createOffer().then(function(offer) {
return pc.setLocalDescription(offer);
}).then(function() {
// wait for ICE gathering to complete
return new Promise(function(resolve) {
if (pc.iceGatheringState === 'complete') {
resolve();
} else {
function checkState() {
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
}
pc.addEventListener('icegatheringstatechange', checkState);
}
});
}).then(function() {
var offer = pc.localDescription;
var codec;
codec = document.getElementById('audio-codec').value;
if (codec !== 'default') {
offer.sdp = sdpFilterCodec('audio', codec, offer.sdp);
}
codec = document.getElementById('video-codec').value;
if (codec !== 'default') {
offer.sdp = sdpFilterCodec('video', codec, offer.sdp);
}
document.getElementById('offer-sdp').textContent = offer.sdp;
return fetch('/offer', {
body: JSON.stringify({
sdp: offer.sdp,
type: offer.type,
video_transform: document.getElementById('video-transform').value
}),
headers: {
'Content-Type': 'application/json'
},
method: 'POST'
});
}).then(function(response) {
return response.json();
}).then(function(answer) {
document.getElementById('answer-sdp').textContent = answer.sdp;
return pc.setRemoteDescription(answer);
}).catch(function(e) {
alert(e);
});
}
function start() {
document.getElementById('start').style.display = 'none';
pc = createPeerConnection();
var time_start = null;
function current_stamp() {
if (time_start === null) {
time_start = new Date().getTime();
return 0;
} else {
return new Date().getTime() - time_start;
}
}
if (document.getElementById('use-datachannel').checked) {
var parameters = JSON.parse(document.getElementById('datachannel-parameters').value);
dc = pc.createDataChannel('chat', parameters);
dc.onclose = function() {
clearInterval(dcInterval);
dataChannelLog.textContent += '- close\n';
};
dc.onopen = function() {
dataChannelLog.textContent += '- open\n';
dcInterval = setInterval(function() {
var message = 'ping ' + current_stamp();
dataChannelLog.textContent += '> ' + message + '\n';
dc.send(message);
}, 1000);
};
dc.onmessage = function(evt) {
dataChannelLog.textContent += '< ' + evt.data + '\n';
if (evt.data.substring(0, 4) === 'pong') {
var elapsed_ms = current_stamp() - parseInt(evt.data.substring(5), 10);
dataChannelLog.textContent += ' RTT ' + elapsed_ms + ' ms\n';
}
};
}
var constraints = {
audio: false,
video: false
};
if (document.getElementById('use-video').checked) {
constraints.audio = true;
}
else{
pc.addTransceiver('audio', {direction: 'recvonly'});
}
if (document.getElementById('use-video').checked) {
var resolution = document.getElementById('video-resolution').value;
if (resolution) {
resolution = resolution.split('x');
constraints.video = {
width: parseInt(resolution[0], 0),
height: parseInt(resolution[1], 0)
};
} else {
constraints.video = true;
}
}
else{
pc.addTransceiver('video', {direction: 'recvonly'});
}
if (constraints.audio || constraints.video) {
if (constraints.video) {
//document.getElementById('media').style.display = 'block';
}
navigator.mediaDevices.getUserMedia(constraints).then(function(stream) {
stream.getTracks().forEach(function(track) {
pc.addTrack(track, stream);
});
return negotiate();
}, function(err) {
alert('Could not acquire media: ' + err);
});
} else {
negotiate();
}
document.getElementById('stop').style.display = 'inline-block';
}
function sendCommand(){
dc.send("command");
}
function stop() {
document.getElementById('stop').style.display = 'none';
// close data channel
if (dc) {
dc.close();
}
// close transceivers
if (pc.getTransceivers) {
pc.getTransceivers().forEach(function(transceiver) {
if (transceiver.stop) {
transceiver.stop();
}
});
}
// close local audio / video
pc.getSenders().forEach(function(sender) {
if(sender != null && sender.track != null)
sender.track.stop();
});
// close peer connection
setTimeout(function() {
pc.close();
}, 500);
}
function sdpFilterCodec(kind, codec, realSdp) {
var allowed = []
var rtxRegex = new RegExp('a=fmtp:(\\d+) apt=(\\d+)\r$');
var codecRegex = new RegExp('a=rtpmap:([0-9]+) ' + escapeRegExp(codec))
var videoRegex = new RegExp('(m=' + kind + ' .*?)( ([0-9]+))*\\s*$')
var lines = realSdp.split('\n');
var isKind = false;
for (var i = 0; i < lines.length; i++) {
if (lines[i].startsWith('m=' + kind + ' ')) {
isKind = true;
} else if (lines[i].startsWith('m=')) {
isKind = false;
}
if (isKind) {
var match = lines[i].match(codecRegex);
if (match) {
allowed.push(parseInt(match[1]));
}
match = lines[i].match(rtxRegex);
if (match && allowed.includes(parseInt(match[2]))) {
allowed.push(parseInt(match[1]));
}
}
}
var skipRegex = 'a=(fmtp|rtcp-fb|rtpmap):([0-9]+)';
var sdp = '';
isKind = false;
for (var i = 0; i < lines.length; i++) {
if (lines[i].startsWith('m=' + kind + ' ')) {
isKind = true;
} else if (lines[i].startsWith('m=')) {
isKind = false;
}
if (isKind) {
var skipMatch = lines[i].match(skipRegex);
if (skipMatch && !allowed.includes(parseInt(skipMatch[2]))) {
continue;
} else if (lines[i].match(videoRegex)) {
sdp += lines[i].replace(videoRegex, '$1 ' + allowed.join(' ')) + '\n';
} else {
sdp += lines[i] + '\n';
}
} else {
sdp += lines[i] + '\n';
}
}
return sdp;
}
function escapeRegExp(string) {
return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // $& means the whole matched string
}
<html>
<head>
<meta charset="UTF-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>WebRTC demo</title>
<style>
button {
padding: 8px 16px;
}
pre {
overflow-x: hidden;
overflow-y: auto;
}
video {
width: 100%;
}
.option {
margin-bottom: 8px;
}
#media {
max-width: 1280px;
}
</style>
</head>
<body>
<h2>Options</h2>
<div class="option">
<input id="use-datachannel" checked="checked" type="checkbox"/>
<label for="use-datachannel">Use datachannel</label>
<select id="datachannel-parameters">
<option value='{"ordered": true}'>Ordered, reliable</option>
<option value='{"ordered": false, "maxRetransmits": 0}'>Unordered, no retransmissions</option>
<option value='{"ordered": false, "maxPacketLifetime": 500}'>Unordered, 500ms lifetime</option>
</select>
</div>
<div class="option">
<input id="use-audio" type="checkbox"/>
<label for="use-audio">Use audio</label>
<select id="audio-codec">
<option value="default" selected>Default codecs</option>
<option value="opus/48000/2">Opus</option>
<option value="PCMU/8000">PCMU</option>
<option value="PCMA/8000">PCMA</option>
</select>
</div>
<div class="option">
<input id="use-video" type="checkbox"/>
<label for="use-video">Use video</label>
<select id="video-resolution">
<option value="" selected>Default resolution</option>
<option value="320x240">320x240</option>
<option value="640x480">640x480</option>
<option value="960x540">960x540</option>
<option value="1280x720">1280x720</option>
</select>
<select id="video-transform">
<option value="none" selected>No transform</option>
<option value="edges">Edge detection</option>
<option value="cartoon">Cartoon effect</option>
<option value="rotate">Rotate</option>
</select>
<select id="video-codec">
<option value="default" selected>Default codecs</option>
<option value="VP8/90000">VP8</option>
<option value="H264/90000">H264</option>
</select>
</div>
<div class="option">
<input id="use-stun" type="checkbox"/>
<label for="use-stun">Use STUN server</label>
</div>
<button id="start" onclick="start()">Start</button>
<button id="stop" style="display: none" onclick="stop()">Stop</button>
<button id="cmdButton" onclick="sendCommand()">Command</button>
<h2>State</h2>
<p>
ICE gathering state: <span id="ice-gathering-state"></span>
</p>
<p>
ICE connection state: <span id="ice-connection-state"></span>
</p>
<p>
Signaling state: <span id="signaling-state"></span>
</p>
<div id="media" style="display: block">
<h2>Media</h2>
<audio id="audio" autoplay="true"></audio>
<video id="video" autoplay="true" playsinline="true"></video>
</div>
<h2>Data channel</h2>
<pre id="data-channel" style="height: 200px;"></pre>
<h2>SDP</h2>
<h3>Offer</h3>
<pre id="offer-sdp"></pre>
<h3>Answer</h3>
<pre id="answer-sdp"></pre>
<script src="client.js"></script>
</body>
</html>
import asyncio
import fractions
import logging
import threading
import time
from typing import Optional, Set, Tuple
import av
from av import AudioFrame, VideoFrame
from aiortc.mediastreams import AUDIO_PTIME, MediaStreamError, MediaStreamTrack
from aiortc.contrib.media import PlayerStreamTrack, MediaPlayer
import cv2
import numpy as np
logger = logging.getLogger("media")
VIDEO_CLOCK_RATE = 90000
VIDEO_PTIME = 1 / 30 # 30fps
VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)
def opencv_player_worker(
loop, audio_track, video_track, quit_event, throttle_playback
):
audio_fifo = av.AudioFifo()
audio_format_name = "s16"
audio_layout_name = "stereo"
audio_sample_rate = 48000
audio_samples = 0
audio_samples_per_frame = int(audio_sample_rate * AUDIO_PTIME)
audio_resampler = av.AudioResampler(
format=audio_format_name, layout=audio_layout_name, rate=audio_sample_rate
)
video_first_pts = None
frame_time = None
start_time = time.time()
while not quit_event.is_set():
try:
frame = video_track.read()
#frame = next(container.decode(*streams))
except (av.AVError, StopIteration):
if audio_track:
asyncio.run_coroutine_threadsafe(audio_track._queue.put(None), loop)
if video_track:
asyncio.run_coroutine_threadsafe(video_track._queue.put(None), loop)
break
# read up to 1 second ahead
if throttle_playback:
elapsed_time = time.time() - start_time
if frame_time and frame_time > elapsed_time + 1:
time.sleep(0.1)
if isinstance(frame, AudioFrame) and audio_track:
if (
frame.format.name != audio_format_name
or frame.layout.name != audio_layout_name
or frame.sample_rate != audio_sample_rate
):
frame.pts = None
frame = audio_resampler.resample(frame)
# fix timestamps
frame.pts = audio_samples
frame.time_base = fractions.Fraction(1, audio_sample_rate)
audio_samples += frame.samples
audio_fifo.write(frame)
while True:
frame = audio_fifo.read(audio_samples_per_frame)
if frame:
frame_time = frame.time
asyncio.run_coroutine_threadsafe(
audio_track._queue.put(frame), loop
)
else:
break
elif isinstance(frame, VideoFrame):
if frame.pts is None: # pragma: no cover
logger.warning("Skipping video frame with no pts")
continue
# video from a webcam doesn't start at pts 0, cancel out offset
if video_first_pts is None:
video_first_pts = frame.pts
frame.pts -= video_first_pts
frame_time = frame.time
asyncio.run_coroutine_threadsafe(video_track._queue.put(frame), loop)
class OpenCVPlayerStreamTrack(PlayerStreamTrack):
def __init__(self, player, kind):
super().__init__(player, kind)
self.img = np.ones([480, 640, 3], dtype=np.uint8) * 200
_start: float
_timestamp: int
def response(self):
self.img = np.ones([480, 640, 3], dtype=np.uint8) * 100
async def recv(self):
if self.readyState != "live":
raise MediaStreamError
self._player._start(self)
frame = await self._queue.get()
if frame is None:
self.stop()
raise MediaStreamError
frame_time = frame.time
# control playback rate
if (
self._player is not None
and self._player._throttle_playback
and frame_time is not None
):
if self._start is None:
self._start = time.time() - frame_time
else:
wait = self._start + frame_time - time.time()
await asyncio.sleep(wait)
return frame
def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
if self.readyState != "live":
raise MediaStreamError
if hasattr(self, "_timestamp"):
self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE) - time.time()
time.sleep(wait)
else:
self._start = time.time()
self._timestamp = 0
return self._timestamp, VIDEO_TIME_BASE
def read(self) -> VideoFrame:
pts, time_base = self.next_timestamp()
# rotate image
rows, cols, _ = self.img.shape
M = cv2.getRotationMatrix2D((cols / 2, rows / 2), int(pts * time_base * 45), 1)
img = cv2.warpAffine(self.img, M, (cols, rows))
# create video frame
frame = VideoFrame.from_ndarray(img, format="bgr24")
frame.pts = pts
frame.time_base = time_base
return frame
class OpenCVMediaPlayer(MediaPlayer):
"""
A media source that reads audio and/or video from a file.
Examples:
.. code-block:: python
# Open a video file.
player = MediaPlayer('/path/to/some.mp4')
# Open an HTTP stream.
player = MediaPlayer(
'http://download.tsi.telecom-paristech.fr/'
'gpac/dataset/dash/uhd/mux_sources/hevcds_720p30_2M.mp4')
# Open webcam on Linux.
player = MediaPlayer('/dev/video0', format='v4l2', options={
'video_size': '640x480'
})
# Open webcam on OS X.
player = MediaPlayer('default:none', format='avfoundation', options={
'video_size': '640x480'
})
:param file: The path to a file, or a file-like object.
:param format: The format to use, defaults to autodect.
:param options: Additional options to pass to FFmpeg.
"""
def __init__(self):
#self.__container = av.open(file=file, format=format, mode="r", options=options)
self.__thread: Optional[threading.Thread] = None
self.__thread_quit: Optional[threading.Event] = None
# examine streams
self.__started: Set[OpenCVPlayerStreamTrack] = set()
self.__streams = []
self.__audio: Optional[OpenCVPlayerStreamTrack] = None
#for stream in self.__container.streams:
# if stream.type == "audio" and not self.__audio:
# self.__audio = OpenCVPlayerStreamTrack(self, kind="audio")
# self.__streams.append(stream)
# elif stream.type == "video" and not self.__video:
self.__video = OpenCVPlayerStreamTrack(self, kind="video")
self.__streams.append(1)
# check whether we need to throttle playback
#container_format = set(self.__container.format.name.split(","))
self._throttle_playback = False
#asyncio.run_coroutine_threadsafe(self.__video.recv(), asyncio.get_event_loop())
def videoResponse(self):
self.__video.response()
@property
def audio(self) -> MediaStreamTrack:
"""
A :class:`aiortc.MediaStreamTrack` instance if the file contains audio.
"""
return self.__audio
@property
def video(self) -> MediaStreamTrack:
"""
A :class:`aiortc.MediaStreamTrack` instance if the file contains video.
"""
return self.__video
def _start(self, track: OpenCVPlayerStreamTrack) -> None:
self.__started.add(track)
if self.__thread is None:
#self.__log_debug("Starting worker thread")
self.__thread_quit = threading.Event()
self.__thread = threading.Thread(
name="media-player",
target=opencv_player_worker,
args=(
asyncio.get_event_loop(),
#self.__container,
#self.__streams,
self.__audio,
self.__video,
self.__thread_quit,
self._throttle_playback,
),
)
self.__thread.start()
def _stop(self, track: OpenCVPlayerStreamTrack) -> None:
self.__started.discard(track)
if not self.__started and self.__thread is not None:
#self.__log_debug("Stopping worker thread")
self.__thread_quit.set()
self.__thread.join()
self.__thread = None
#if not self.__started and self.__container is not None:
# self.__container.close()
# self.__container = None
#def __log_debug(self, msg: str, *args) -> None:
# logger.debug(f"player(%s) {msg}", self.__container.name, *args)
import argparse
import asyncio
import json
import logging
import os
import ssl
import uuid
import cv2
from aiohttp import web
from av import VideoFrame
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder
from OpenCVMedia import OpenCVMediaPlayer
ROOT = os.path.dirname(__file__)
logger = logging.getLogger("pc")
pcs = set()
async def index(request):
content = open(os.path.join(ROOT, "index.html"), "r").read()
return web.Response(content_type="text/html", text=content)
async def javascript(request):
content = open(os.path.join(ROOT, "client.js"), "r").read()
return web.Response(content_type="application/javascript", text=content)
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pc_id = "PeerConnection(%s)" % uuid.uuid4()
pcs.add(pc)
def log_info(msg, *args):
logger.info(pc_id + " " + msg, *args)
log_info("Created for %s", request.remote)
# prepare local media
#player = MediaPlayer(os.path.join(ROOT, "demo-instruct.wav"))
@pc.on("datachannel")
def on_datachannel(channel):
@channel.on("message")
def on_message(message):
if isinstance(message, str) and message.startswith("ping"):
channel.send("pong" + message[4:])
if isinstance(message, str) and message.startswith("command"):
player.videoResponse()
@pc.on("iceconnectionstatechange")
async def on_iceconnectionstatechange():
log_info("ICE connection state is %s", pc.iceConnectionState)
if pc.iceConnectionState == "failed":
await pc.close()
pcs.discard(pc)
@pc.on("track")
def on_track(track):
log_info("Track %s received", track.kind)
@track.on("ended")
async def on_ended():
log_info("Track %s ended", track.kind)
# handle offer
await pc.setRemoteDescription(offer)
if player.audio:
pc.addTrack(player.audio)
if player.video:
pc.addTrack(player.video)
# send answer
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
),
)
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="WebRTC audio / video / data-channels demo"
)
parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
parser.add_argument(
"--host", default="127.0.0.1", help="Host for HTTP server (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
)
parser.add_argument("--verbose", "-v", action="count")
parser.add_argument("--write-audio", help="Write received audio to a file")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if args.cert_file:
ssl_context = ssl.SSLContext()
ssl_context.load_cert_chain(args.cert_file, args.key_file)
else:
ssl_context = None
player = OpenCVMediaPlayer()
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get("/", index)
app.router.add_get("/client.js", javascript)
app.router.add_post("/offer", offer)
web.run_app(
app, access_log=None, host=args.host, port=args.port, ssl_context=ssl_context
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment