Skip to content

Instantly share code, notes, and snippets.

@benzmuircroft
Last active December 13, 2023 19:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benzmuircroft/2dcbbf3aa8b136d87606facc6377140c to your computer and use it in GitHub Desktop.
Save benzmuircroft/2dcbbf3aa8b136d87606facc6377140c to your computer and use it in GitHub Desktop.
handle offline events server/client on an autodeebee. Get events that happened when offline, handle them when you come back online
/*
SERVER EXAMPLE
const server = require('hyperdown')(uniqueKeyPair, storageFolderName, true);
server.onClientConsumedEvents(function(remotePublicKey, eventsArray) {
// do something ...
});
await server.addEvent(remotePublicKey, eventString);
CLIENT EXAMPLE
const client = require('hyperdown')(uniqueKeyPair, storageFolderName, true);
client.eventHandler(function(id, ev, cb) {
cb(id, true);
});
OPTIONS
{
uniqueKeyPair: must be unique to each peer (including the server peer) and be able to reproduce socket.remotePublicKey
folderName: is storage and the swarm topic
isServer: is a bool
onClientConsumedEvents: required for server. is a function (see sever example)
eventHandler: required for clients. is a function (see client example)
}
*/
async function hyperdown(options) {
const Corestore = require('corestore');
const AutobaseManager = require('@lejeunerenard/autobase-manager');
const Autodeebee = require('hyperdbee/autodeebee');
const { DB } = require('hyperdbee');
const Hyperswarm = require('hyperswarm');
const Keychain = require('keypear');
const b4a = require('b4a');
const goodbye = await import('graceful-goodbye');
let base, swarm, keyPair;
if (!options) {
throw new Error('options object is missing');
}
else if (!options.uniqueKeyPair) {
throw new Error('options.uniqueKeyPair should be a KeyChain or keyPair. see: https://github.com/holepunchto/keypear');
}
else if (!options.folderName || typeof options.folderName !== 'string') {
throw new Error('options.folderName should be a string');
}
else if (options.isServer && (!options.onClientConsumedEvents || typeof options.onClientConsumedEvents !== 'function' || options.eventHandler)) {
throw new Error('options.onClientConsumedEvents should be a function if you intend this to be a server');
}
else if (!options.isServer && (!options.eventHandler || typeof options.eventHandler !== 'function' || options.onClientConsumedEvents)) {
throw new Error('options.eventHandler should be a function if you intend this to be a client');
}
if (!options.uniqueKeyPair.publicKey) {
if (typeof options.uniqueKeyPair.get == 'function') {
keyPair = options.uniqueKeyPair.get();
}
else {
throw new Error('options.uniqueKeyPair should be a KeyChain or keyPair. see: https://github.com/holepunchto/keypear');
}
}
else {
keyPair = new Keychain(options.uniqueKeyPair);
keyPair = keyPair.get();
}
const store = new Corestore(`./${options.folderName}`);
const input = store.get({ name: 'input', sparse: false, valueEncoding: 'json' });
const output = store.get({ name: 'output', sparse: false, valueEncoding: 'json' });
if (options.isServer) { // --------------------------------------- server
this.onClientConsumedEvents = options.onClientConsumedEvents;
base = new Autobase({
inputs: [input],
localInput: input,
localOutput: output
});
const manager = new AutobaseManager(
base,
(key, coreType, channel) => true, // function to filter core keys
store.get.bind(store), // get(key) function to get a hypercore given a key
store.storage, // Storage for managing autobase keys
{ id: options.folderName } // Options
);
await manager.ready();
const autobee = new Autodeebee(base);
this.db = new DB(autobee);
this.addEvent = async function(userPublicKey, data) {
const hyperdownId = +new Date(); // todo: improve this
const user = await this.db.collection('events').findOne(userPublicKey);
let events = user.events;
data.hyperdownId = hyperdownId;
events[hyperdownId] = data;
await this.db.collection('events').update({ _id: userPublicKey }, { events: events }, { multi: false, upsert: true });
if (!user.offline && clients[userPublicKey]) {
clients[userPublicKey].write(b4a.from(JSON.stringify({ event: data })));
}
};
swarm = new Hyperswarm({
keyPair: keyPair
});
const clients = {};
swarm.on('connection', function(socket) {
const stream = store.replicate(socket);
manager.attachStream(stream); // Attach manager
socket.write(b4a.from(JSON.stringify({ isServer: options.isServer }))); // tell the client you are the server ...
clients[socket.remotePublicKey] = socket;
socket.on('data', async function(data) {
let e;
try {
data = JSON.parse(data);
}
catch (err) {
e = err;
}
if (!e) {
if (data.consumedEvents) {
let user = await this.db.collection('events').findOne(socket.remotePublicKey);
let consumedEvents = [];
for (const hyperdownId in user.events) {
if (user.consumed.includes(hyperdownId)) {
consumedEvents.push(JSON.stringify(JSON.parse(user.events[hyperdownId])));
delete user.events[hyperdownId];
}
}
await this.db.collection('events').update({ _id: socket.remotePublicKey }, { events: user.events, consumed: [] }, { multi: false });
this.onClientConsumedEvents(socket.remotePublicKey, consumedEvents); // application can handle anything it needs to ....
}
else if (data.goodbye) {
if (!(await this.db.collection('events').findOne(data.goodbye)).offline) {
await this.db.collection('events').update({ _id: data.goodbye }, { offline: true }, { multi: false });
}
}
}
});
socket.on('close', async function() {
delete clients[socket.remotePublicKey];
await this.db.collection('events').update({ _id: socket.remotePublicKey }, { offline: true }, { multi: false });
});
});
goodbye(() => swarm.destroy());
await swarm.join(b4a.alloc(32).fill(options.folderName));
await swarm.flush();
}
else { // ---------------------------------------------------------------- client
this.eventHandler = options.eventHandler;
base = new Autobase({
inputs: [input],
localInput: input,
localOutput: output
});
let server;
const manager = new AutobaseManager(
base,
(key, coreType, channel) => true, // function to filter core keys
store.get.bind(store), // get(key) function to get a hypercore given a key
store.storage, // Storage for managing autobase keys
{ id: options.folderName } // Options
);
await manager.ready();
const autobee = new Autodeebee(base);
this.db = new DB(autobee);
swarm = new Hyperswarm({
keyPair: keyPair
});
swarm.on('connection', async function(socket) {
const stream = store.replicate(socket);
manager.attachStream(stream); // Attach manager
socket.on('data', async function(data) {
let e;
try {
data = JSON.parse(data);
}
catch (err) {
e = err;
}
if (!e) {
if (data.isServer) {
hasServer(socket); // server is ready to talk !
}
else if (data.event) {
const hyperdownId = data.event.hyperdownId + ''; // clone
delete data.event.hyperdownId;
this.eventHandler(hyperdownId, data.event, async function(id, bool) { // call back
if (id !== hyperdownId) {
throw new Error(`Malformed hyperdownId for event. Got: '${id}', expected: '${hyperdownId}'`);
}
if (bool) { // true
await this.db.collection('events').update({ _id: keyPair.publicKey }, { $push: { consumed: hyperdownId } }, { multi: false, upsert: true });
if (server) {
server.write(b4a.from(JSON.stringify({ consumedEvents: true })));
}
}
});
}
}
});
});
goodbye(async function() {
if (server) {
server.write(b4a.from(JSON.stringify({ goodbye: keyPair.publicKey.toString('hex') })));
}
await this.db.collection('events').update({ _id: keyPair.publicKey }, { offline: true }, { multi: false });
swarm.destroy();
});
await swarm.join(b4a.alloc(32).fill(options.folderName));
await swarm.flush();
// when the server is ready to talk ...
async function hasServer(socket) {
server = socket;
socket.on('close', function() {
server = undefined;
});
if (!await this.db.collection('events').findOne(keyPair.publicKey)) {
await this.db.collection('events').insert({ _id: keyPair.publicKey, offline: false, events: {} });
}
else {
await this.db.collection('events').update({ _id: keyPair.publicKey }, { offline: false }, { multi: false });
}
// look up our events and consume them ...
let found = (await this.db.collection('events').findOne({ _id: keyPair.publicKey })).events;
this.evs = JSON.stringify(JSON.parse(found));
if (this.evs.length) {
let hyperdownId = Object.keys(found);
;(async function next(s, that) {
if (found[hyperdownId[s]]) {
that.eventHandler(hyperdownId[s], found[hyperdownId[s]], async function(id, bool) { // callback result
if (id !== hyperdownId[s]) {
throw new Error(`Malformed hyperdownId for event. Got: '${id}', expected: '${hyperdownId[s]}'`);
}
if (bool) { // true
await this.db.collection('events').update({ _id: keyPair.publicKey }, { $push: { consumed: hyperdownId[s] } }, { multi: false, upsert: true });
}
await next(s + 1, that);
});
}
else { //end
next = null;
if (server) {
server.write(b4a.from(JSON.stringify({ consumedEvents: true })));
}
}
})(0, this);
}
}
}
};
module.exports = hyperdown;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment