Skip to content

Instantly share code, notes, and snippets.

@nodumo
Last active June 13, 2021 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nodumo/75a0fa4a834930e0130cb475568de441 to your computer and use it in GitHub Desktop.
Save nodumo/75a0fa4a834930e0130cb475568de441 to your computer and use it in GitHub Desktop.
// https://javascript.info/arraybuffer-binary-arrays
import * as logging from "lib0/logging";
// eslint-disable-line
/** @doc https://www.npmjs.com/package/uint8-to-base64 */
import { decode, encode } from "uint8-to-base64";
/** @doc https://github.com/dmonad/lib0 */
import { Observable } from "lib0/observable";
/** @doc https://github.com/yjs/y-protocols */
import * as awarenessProtocol from "y-protocols/awareness";
import * as time from "lib0/time";
import * as encoding from "lib0/encoding";
import * as syncProtocol from "y-protocols/sync";
import * as decoding from "lib0/decoding";
import * as authProtocol from "y-protocols/auth";
import * as broadcastChannel from "lib0/broadcastchannel";
import * as mutex from "lib0/mutex";
/** @doc https://docs.yjs.dev/api/y.doc#event-handler */
const YDocEvents = {
UPDATE: "update",
DESTROY: "destroy",
};
/** @doc https://docs.yjs.dev/api/about-awareness */
const YDocAwarenessEvents = {
UPDATE: "update",
CHANGE: "change",
};
const PubNubPresenceChannelEvents = {
JOIN: "join",
LEAVE: "leave",
INTERVAL: "leave",
};
const ProviderStatusEvents = {
STATUS: "status",
SYNC: "sync",
SYNCED: "sync",
};
const ProviderStatusState = {
CONNECTED: "connected",
CONNECTING: "connecting",
DISCONNECTED: "disconnected",
};
const WindowEvents = {
BEFORE_UNLOAD: "beforeunload",
};
const YDocMessageType = {
YDOC_BASE64_MESSAGE: "YDOC_BASE64_MESSAGE",
};
//================================================================================
// YDoc protocol helpers
//================================================================================
/**
* @param {PubnubProvider} provider
* @param {string} reason
*/
const permissionDeniedHandler = (provider, reason) =>
console.warn(`Permission denied to access ${provider.url}.\n${reason}`);
const messageSync = 0;
const messageQueryAwareness = 3;
const messageAwareness = 1;
const messageAuth = 2;
/**
* Message handler hash-map as array.
* encoder
* decoder
* provider
* emitSynced
* messageType
* @type {Array<function(encoding.Encoder, decoding.Decoder, PubNubProvider, boolean, number):void>}
*/
const messageHandlers = [];
messageHandlers[messageSync] = (
encoder,
decoder,
provider,
emitSynced,
// eslint-disable-next-line no-unused-vars
messageType
) => {
encoding.writeVarUint(encoder, messageSync);
const syncMessageType = syncProtocol.readSyncMessage(
decoder,
encoder,
provider.doc,
provider
);
if (
emitSynced &&
syncMessageType === syncProtocol.messageYjsSyncStep2 &&
!provider.synced
) {
provider.synced = true;
}
};
messageHandlers[messageQueryAwareness] = (
encoder,
decoder,
provider,
// eslint-disable-next-line no-unused-vars
emitSynced,
// eslint-disable-next-line no-unused-vars
messageType
) => {
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
provider.awareness,
Array.from(provider.awareness.getStates().keys())
)
);
};
messageHandlers[messageAwareness] = (
encoder,
decoder,
provider,
// eslint-disable-next-line no-unused-vars
emitSynced,
// eslint-disable-next-line no-unused-vars
messageType
) => {
awarenessProtocol.applyAwarenessUpdate(
provider.awareness,
decoding.readVarUint8Array(decoder),
provider
);
};
messageHandlers[messageAuth] = (
encoder,
decoder,
provider,
// eslint-disable-next-line no-unused-vars
emitSynced,
// eslint-disable-next-line no-unused-vars
messageType
) => {
authProtocol.readAuthMessage(
decoder,
provider.doc,
permissionDeniedHandler
);
};
//================================================================================
// Provider implementation
//================================================================================
const messageReconnectTimeout = 30000;
export const defaultOptions = {
connect: true,
pubnubRoom: "nickdemochannel",
resyncInterval: -1,
};
/**
* Creates a Provider for Yjs. Creates a pubnub connection to sync the shared document.
* The document name is attached to the provided url. I.e.
*
* @extends {Observable<string>}
*/
export class PubnubProvider extends Observable {
/**
* PubNub provider.
*
* @constructor
* @param {object} pubnubInstance- Pubnub instance
* @param {Y.Doc} yDocInstance - YDoc instance
* @param {object} [options]
* @param {boolean} [options.connect] Connect to PubNub
* @param {string} [options.pubnubRoom] Room to connect to
* @param {number} [options.resyncInterval] Request server state every `resyncInterval` milliseconds
*/
constructor(pubnubInstance, yDocInstance, options = defaultOptions) {
if (options === undefined) {
throw new Error("Options param error: must not be undefined!");
}
if (!options.pubnubRoom) {
throw new Error(
"Options param error: You must define a room name!"
);
}
super();
this.LOGGING_CONTEXT = "PubnubProvider";
/** @doc https://docs.yjs.dev/api/about-awareness*/
this.awareness = new awarenessProtocol.Awareness(yDocInstance);
// @todo set metadata: https://www.pubnub.com/docs/sdks/javascript/api-reference/objects#set-user-metadata
logging.print(
this.LOGGING_CONTEXT,
"provider.doc.clientID:",
this.awareness.clientID
);
this.doc = yDocInstance;
this.pubnubRoom = options.pubnubRoom;
this.pubnubRooms = [options.pubnubRoom];
this.pubnub = pubnubInstance;
this.pubnubConnected = false;
this.bcChannel = this.pubnubRoom;
/**
* @type {boolean}
*/
this.bcconnected = false;
this.messageHandlers = messageHandlers.slice();
this.mux = mutex.createMutex();
/**
* Whether to connect to other peers or not.
* @type {boolean}
*/
this.shouldConnect = options.connect;
/**
* @type {boolean}
*/
this._synced = false;
const pubnubProvider = this;
//================================================================================
// Pubnub event handlers
//================================================================================
this.pubnub.addListener({
// Client events
disconnect: this.onPubNubDisconnect.bind(this),
reconnect: this.onPubNubReconnect.bind(this),
// Internal events
message: this.onPubNubMessage.bind(this),
presence: this.onPubNubPresence.bind(this),
signal: this.onPubNubSignal.bind(this),
status: this.onPubNubStatus.bind(this),
});
/**
* @type {number}
*/
this._resyncInterval = 0;
if (options.resyncInterval > 0) {
this._resyncInterval = /** @type {any} */ (
setInterval(() => {
if (this.pubnub) {
// resend sync step 1
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeSyncStep1(
encoder,
pubnubProvider.doc
);
this.send(encoding.toUint8Array(encoder));
}
}, options.resyncInterval)
);
} else {
this._resyncInterval = null;
}
//================================================================================
// YDoc update handlers
//================================================================================
this.doc.on(YDocEvents.UPDATE, this.onYDocUpdate.bind(this));
this.doc.on(YDocEvents.DESTROY, this.onYDocDestroy.bind(this));
//================================================================================
// YDoc update handlers
//================================================================================
this.awareness.on(
YDocAwarenessEvents.CHANGE,
this.onYDocProtocolChange.bind(this)
);
this.awareness.on(
YDocAwarenessEvents.UPDATE,
this.onYDocAwarenessUpdateHandler.bind(this)
);
//================================================================================
// Window event handlers
//================================================================================
if (typeof window !== "undefined") {
window.addEventListener(WindowEvents.BEFORE_UNLOAD, () => {
awarenessProtocol.removeAwarenessStates(
this.awareness,
[this.awareness.clientID],
"window unload"
);
});
}
this._checkInterval = /** @type {any} */ (
setInterval(() => {
if (
this.pubnubConnected &&
messageReconnectTimeout <
// @todo
time.getUnixTime() - this.pubnubLastMessageReceived
) {
// no message received in a long time - not even your own awareness
// updates (which are updated every 15 seconds)
// @todo
}
}, messageReconnectTimeout / 10)
);
if (options.connect) {
this.connect();
}
}
// --- Broadcast channel handlers
onBcSubscriber(data) {
this.mux(() => {
const encoder = this.readMessage(new Uint8Array(data), false);
if (encoding.length(encoder) > 1) {
broadcastChannel.publish(
this.bcChannel,
encoding.toUint8Array(encoder)
);
}
});
}
// --- Ydoc handlers
onYDocDestroy(doc) {
logging.print(this.LOGGING_CONTEXT, "onYDocDestroy", doc);
}
/**
* Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
* @param {Uint8Array} update
* @param {any} origin
*/
onYDocUpdate(update, origin) {
logging.print(this.LOGGING_CONTEXT, "onYDocUpdate", update, origin);
if (origin !== this) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeUpdate(encoder, update);
this.broadcast(encoding.toUint8Array(encoder));
}
}
/**
* @param {ArrayBuffer} data
*/
onYDocSubscribe(data) {
return this.mux(() => {
const encoder = this.readMessage(new Uint8Array(data), false);
if (encoding.length(encoder) > 1) {
broadcastChannel.publish(
this.bcChannel,
encoding.toUint8Array(encoder)
);
}
});
}
// --- Ydoc protocol handlers
onYDocProtocolChange(added, updated, removed) {
logging.print(
this.LOGGING_CONTEXT,
"onYDocProtocolChange",
added,
updated,
removed
);
}
/**
* On YDoc protocol update.
*
* @param {any} changed
* @param {any} origin
*/
onYDocAwarenessUpdateHandler({ added, updated, removed }, origin) {
logging.print(
this.LOGGING_CONTEXT,
"onYDocAwarenessUpdateHandler",
added,
updated,
removed,
origin
);
const changedClients = added.concat(updated).concat(removed);
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
this.awareness,
changedClients
)
);
this.broadcast(encoding.toUint8Array(encoder));
}
// --- Pubnub handlers
/**
* Ports logic from "websocket.onopen" from Y-Webcoket
*
*/
doPubNubJoinProtocol() {
logging.print(this.LOGGING_CONTEXT, "doPubNubJoinProtocol");
// always send sync step 1 when connected
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeSyncStep1(encoder, this.doc);
this.send(encoding.toUint8Array(encoder));
// broadcast local awareness state
if (this.awareness.getLocalState() !== null) {
const encoderAwarenessState = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessState, messageAwareness);
const uint8Array = awarenessProtocol.encodeAwarenessUpdate(
this.awareness,
[this.doc.clientID]
);
encoding.writeVarUint8Array(encoderAwarenessState, uint8Array);
this.send(encoding.toUint8Array(encoderAwarenessState));
}
this.emit(ProviderStatusEvents.STATUS, [
{
status: ProviderStatusState.CONNECTED,
},
]);
}
/**
* Disconnected protocol handles setting the state for this
* active client being disconnected from the network. See the "provider.onclose"
* in the yDoc WebSocket client.
*/
doDisconnectedProtocol() {
// update awareness (all users except local left)
awarenessProtocol.removeAwarenessStates(
this.awareness,
Array.from(this.awareness.getStates().keys()).filter(
(client) => client !== this.doc.clientID
),
this
);
this.emit("status", [
{
status: "disconnected",
},
]);
}
/**
* On PubNub disconnect.
*/
onPubNubDisconnect() {
this.doDisconnectedProtocol();
}
/**
* On PubNub reconnect.
*/
onPubNubReconnect() {
this.doPubNubJoinProtocol();
this.emit(ProviderStatusEvents.STATUS, [
{
status: ProviderStatusState.CONNECTED,
},
]);
}
/**
* @doc https://www.pubnub.com/docs/chat/features/events#objects-events
* @param {object} [payload]
* @param {object} [payload.message] Payload message
* @param {object} [payload.message.context] ClientId of the sender
* @param {string} [payload.message.context.clientID] ClientId of the sender
* @param {string} [payload.message.payload] Payload
* @param {string} [payload.message.yDocMessageType] Message type for the payload
*/
onPubNubMessage(payload) {
logging.print(this.LOGGING_CONTEXT, "onPubNubMessage", payload);
const message = payload.message;
switch (message.yDocMessageType) {
case YDocMessageType.YDOC_BASE64_MESSAGE:
this.pubnubLastMessageReceived = time.getUnixTime();
logging.print(
this.LOGGING_CONTEXT,
"onPubNubMessage",
"payload",
message.payload,
"message.clientID(Sender)",
message.context.clientID
);
// eslint-disable-next-line no-case-declarations
const uint8Array = decode(message.payload);
// eslint-disable-next-line no-case-declarations
const encoder = this.readMessage(uint8Array, true);
if (encoding.length(encoder) > 1) {
this.send(encoding.toUint8Array(encoder));
}
break;
default:
logging.printError(
new Error(`Unrecognized payload ${JSON.stringify(payload)}`)
);
}
}
/** @doc https://www.pubnub.com/docs/chat/features/events#presence-events */
onPubNubPresence(response) {
logging.print(this.LOGGING_CONTEXT, "onPubNubPresence", response);
if (response.action === PubNubPresenceChannelEvents.JOIN) {
for (let i = 0; i < response.occupancy; i++) {
if (response.uuid !== undefined) {
}
}
}
if (response.action === PubNubPresenceChannelEvents.INTERVAL) {
if (response.join !== undefined) {
for (let i = 0; i < response.occupancy; i++) {
// eslint-disable-next-line no-empty
if (response.join[i] !== undefined) {
}
}
}
if (response.leave !== undefined) {
for (let i = 0; i < response.occupancy; i++) {}
}
}
if (response.action === PubNubPresenceChannelEvents.LEAVE) {
for (let i = 0; i < response.occupancy; i++) {}
}
}
/** @doc https://www.pubnub.com/docs/chat/features/typing */
onPubNubSignal(message) {
logging.print(this.LOGGING_CONTEXT, "onPubNubSignal", message);
}
onPubNubStatus(status) {
logging.print(this.LOGGING_CONTEXT, "onPubNubStatus", status);
}
//--- Connection handlers
/**
* @type {boolean}
*/
get synced() {
return this._synced;
}
/**
* @type {void}
*/
set synced(state) {
if (this._synced !== state) {
this._synced = state;
this.emit(ProviderStatusEvents.SYNCED, [state]);
this.emit(ProviderStatusEvents.SYNC, [state]);
}
}
/**
* Destroys the provider.
*
*/
destroy() {
if (this._resyncInterval !== 0) {
clearInterval(this._resyncInterval);
}
clearInterval(this._checkInterval);
this.disconnect();
this.awareness.off(
YDocAwarenessEvents.UPDATE,
this.onYDocAwarenessUpdateHandler
);
this.doc.off(YDocEvents.UPDATE, this.onYDocUpdate);
super.destroy();
}
/**
* Connects to the broadcast channel for the document.
*
*/
connectBc() {
if (!this.bcconnected) {
broadcastChannel.subscribe(
this.bcChannel,
this.onBcSubscriber.bind(this)
);
this.bcconnected = true;
}
// send sync step1 to bc
this.mux(() => {
// write sync step 1
const encoderSync = encoding.createEncoder();
encoding.writeVarUint(encoderSync, messageSync);
syncProtocol.writeSyncStep1(encoderSync, this.doc);
broadcastChannel.publish(
this.bcChannel,
encoding.toUint8Array(encoderSync)
);
// broadcast local state
const encoderState = encoding.createEncoder();
encoding.writeVarUint(encoderState, messageSync);
syncProtocol.writeSyncStep2(encoderState, this.doc);
broadcastChannel.publish(
this.bcChannel,
encoding.toUint8Array(encoderState)
);
// write queryAwareness
const encoderAwarenessQuery = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness);
broadcastChannel.publish(
this.bcChannel,
encoding.toUint8Array(encoderAwarenessQuery)
);
// broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessState, messageAwareness);
encoding.writeVarUint8Array(
encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(this.awareness, [
this.doc.clientID,
])
);
broadcastChannel.publish(
this.bcChannel,
encoding.toUint8Array(encoderAwarenessState)
);
});
}
/**
* Disconnects from the broadcast channel for the document.
*
*/
disconnectBc() {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
this.awareness,
[this.doc.clientID],
new Map()
)
);
this.broadcast(encoding.toUint8Array(encoder));
if (this.bcconnected) {
broadcastChannel.unsubscribe(
this.bcChannel,
this.onBcSubscriber.bind(this)
);
this.bcconnected = false;
}
}
/**
* Disconnects from PubNub channel(s) for the document.
*
*/
disconnect() {
this.shouldConnect = false;
this.pubnub.unsubscribe({
channels: this.pubnubRooms,
});
this.emit(ProviderStatusEvents.STATUS, [
{
status: ProviderStatusState.DISCONNECTED,
},
]);
this.disconnectBc();
this.pubnubConnected = false;
}
/**
* Connects to the PubNub channel(s) for the document.
*
*/
connect() {
this.shouldConnect = true;
if (!this.pubnubConnected) {
this.pubnub.subscribe({
channels: this.pubnubRooms,
});
this.connectBc();
this.pubnubConnected = true;
this.emit(ProviderStatusEvents.STATUS, [
{
status: ProviderStatusState.CONNECTED,
},
]);
this.doPubNubJoinProtocol();
}
}
/**
* Connects to the PubNub channel(s) for the document.
*
*/
reconnect() {
this.shouldConnect = true;
if (!this.pubnubConnected) {
this.pubnub.subscribe({
channels: this.pubnubRooms,
});
this.connectBc();
this.pubnubConnected = true;
this.emit(ProviderStatusEvents.STATUS, [
{
status: ProviderStatusState.CONNECTED,
},
]);
}
}
/**
* Encode Uint8Array to Pubnub message.
*
* @param {Uint8Array}} buf
* @return {any}
*/
_encodeUint8ArrayToPubnub(buf) {
return {
yDocMessageType: YDocMessageType.YDOC_BASE64_MESSAGE,
context: {
clientID: this.doc.clientID,
},
payload: encode(buf),
};
}
/**
* Creates PubNub publish payload.
*
* @return {any} message
*/
_createPubnubPublishPayload(message) {
return {
channel: this.pubnubRoom,
message: message,
};
}
/**
* Read protocol message.
*
* @param {Uint8Array} buf
* @param {boolean} emitSynced
* @return {encoding.Encoder}
*/
readMessage(buf, emitSynced) {
const decoder = decoding.createDecoder(buf);
const encoder = encoding.createEncoder();
const messageType = decoding.readVarUint(decoder);
const messageHandler = this.messageHandlers[messageType];
if (/** @type {any} */ (messageHandler)) {
messageHandler(encoder, decoder, this, emitSynced, messageType);
} else {
logging.printError(new Error("Unable to compute message"));
}
return encoder;
}
/**
* Used in place of broadcastMessage()" as seen in the YDoc Websocket provider.
*
* @param {ArrayBuffer} buff
*/
broadcast(buff) {
logging.print(this.LOGGING_CONTEXT, "broadcast", buff);
if (this.pubnubConnected) {
this.send(buff);
}
if (this.bcconnected) {
this.mux(() => {
broadcastChannel.publish(this.bcChannel, buff);
});
}
}
/**
* Used in place of "websocket.send()" as seen in the YDoc Websocket provider.
*
* Send message through pubnub.
* @param {ArrayBuffer} buff
*/
send(buff) {
logging.print(this.LOGGING_CONTEXT, "send", buff);
const pubnubEncodedPayload = this._encodeUint8ArrayToPubnub(buff);
const pubnubPublishPayload =
this._createPubnubPublishPayload(pubnubEncodedPayload);
logging.print(
this.LOGGING_CONTEXT,
"send",
"payload",
pubnubPublishPayload
);
this.pubnub.publish(pubnubPublishPayload);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment