Skip to content

Instantly share code, notes, and snippets.

Forked from ronnyroeller/signaling-server.ts
Created March 21, 2023 10:14
Show Gist options
  • Save 166MMX/76f06f398ee3844f4ee25556344f6da3 to your computer and use it in GitHub Desktop.
Save 166MMX/76f06f398ee3844f4ee25556344f6da3 to your computer and use it in GitHub Desktop.
Lambda function to provide webrtc signaling server
import { ApiGatewayManagementApi, DynamoDB } from 'aws-sdk';
import { scanItems } from './dynamodb';
const { AWS_REGION, TOPICS_TABLE } = process.env;
const dynamoDb = new DynamoDB({
apiVersion: '2012-08-10',
region: AWS_REGION,
// Message structure and protocol flow taken from y-webrtc/bin/server.js
interface YWebRtcSubscriptionMessage {
type: 'subscribe' | 'unsubscribe';
topics?: string[];
interface YWebRtcPingMessage {
type: 'ping';
interface YWebRtcPublishMessage {
type: 'publish';
topic?: string;
[k: string]: any;
async function subscribe(topic: string, connectionId: string) {
try {
return await dynamoDb
Key: { name: { S: topic } },
UpdateExpression: 'ADD receivers :r',
ExpressionAttributeValues: {
':r': { SS: [connectionId] },
} catch (err) {
console.log(`Cannot update topic ${topic}: ${err.message}`);
async function unsubscribe(topic: string, connectionId: string) {
try {
return await dynamoDb
Key: { name: { S: topic } },
UpdateExpression: 'DELETE receivers :r',
ExpressionAttributeValues: {
':r': { SS: [connectionId] },
} catch (err) {
console.log(`Cannot update topic ${topic}: ${err.message}`);
async function getReceivers(topic: string) {
try {
const { Item: item } = await dynamoDb
Key: { name: { S: topic } },
return item?.receivers ? item.receivers.SS : [];
} catch (err) {
console.log(`Cannot get topic ${topic}: ${err.message}`);
return [];
async function handleYWebRtcMessage(
connectionId: string,
| YWebRtcSubscriptionMessage
| YWebRtcPublishMessage
| YWebRtcPingMessage,
send: (receiver: string, message: any) => Promise<void>,
) {
const promises = [];
if (message && message.type) {
switch (message.type) {
case 'subscribe':
(message.topics || []).forEach(topic => {
promises.push(subscribe(topic, connectionId));
case 'unsubscribe':
(message.topics || []).forEach(topic => {
promises.push(unsubscribe(topic, connectionId));
case 'publish':
if (message.topic) {
const receivers = await getReceivers(message.topic);
receivers.forEach(receiver => {
promises.push(send(receiver, message));
case 'ping':
promises.push(send(connectionId, { type: 'pong' }));
await Promise.all(promises);
function handleConnect(connectionId: string) {
// Nothing to do
console.log(`Connected: ${connectionId}`);
async function handleDisconnect(connectionId: string) {
console.log(`Disconnected: ${connectionId}`);
// Remove the connection from all topics
// This is quite expensive, as we need to go through all topics in the table
const promises = [];
for await (const item of scanItems(dynamoDb, TOPICS_TABLE)) {
const receivers = item.receivers?.SS ?? [];
if (receivers.includes(connectionId)) {
promises.push(unsubscribe(, connectionId));
await Promise.all(promises);
export async function handler(
event: HttpV2WebsocketEvent,
): Promise<HttpV2Response> {
return { statusCode: 502, body: 'Not configured' };
// The AWS "simple chat" example uses event.requestContext.domainName/...stage, but that doesn't work with custom domain
// names. It also doesn't matter, this is anyways an internal (AWS->AWS) call.
const apigwManagementApi = new ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: `https://${event.requestContext.apiId}.execute-api.${AWS_REGION}${event.requestContext.stage}`,
const send = async (connectionId: string, message: any) => {
try {
await apigwManagementApi
ConnectionId: connectionId,
Data: JSON.stringify(message),
} catch (err) {
if (err.statusCode === 410) {
console.log(`Found stale connection, deleting ${connectionId}`);
await handleDisconnect(connectionId);
} else {
// Log, but otherwise ignore: There's not much we can do, really.
console.log(`Error when sending to ${connectionId}: ${err.message}`);
try {
switch (event.requestContext.routeKey) {
case '$connect':
case '$disconnect':
await handleDisconnect(event.requestContext.connectionId);
case '$default':
await handleYWebRtcMessage(
return { statusCode: 200 };
} catch (err) {
console.log(`Error ${event.requestContext.connectionId}`, err);
return { statusCode: 500, body: err.message };
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment