Skip to content

Instantly share code, notes, and snippets.

@harish2704
Created July 7, 2017 20:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harish2704/34c978ef730821364316570a8f46c1bb to your computer and use it in GitHub Desktop.
Save harish2704/34c978ef730821364316570a8f46c1bb to your computer and use it in GitHub Desktop.
An in-memory data buffer class for Sequelize models. Can be used for buffering findOrCreate / create / upsert queries
/* ഓം ബ്രഹ്മാർപ്പണം. */
/*
* DataBuffer.js
* Created: Fri Jul 07 2017 23:14:58 GMT+0530 (IST)
* Copyright 2017 Harish.K<harish2704@gmail.com>
*/
/**
* DataBuffer
*
* An in-memory buffer for sequelize model.
* By default, findOrCreate method will just keep data in memory.
* When memory size exceeds DataBuffer *size*, current datastore is locked. Then it will be bulk inserted into corresponding sequelize model
* Other than buffer overflow, there will be one setInterval timer running for earch DataBuffer,
* which will flush the buffer a specific time even when the buffer is not full
*/
const debug = console.log.bind( console, 'DataBuffer::debug ' );
class Store{
constructor(){
this.db = {};
this.count = 0;
}
add( id, item ){
this.db[ id ] = item;
this.count++;
}
remove( id ){
delete this.db[ id ];
this.count--;
}
}
class DataBuffer{
constructor( { uniqueFields, size, model, toBeUpdated, beforeFlush, autoSaveInterval=30000 } ){
this.uniqueFields = uniqueFields;
this.model = model;
this.toBeUpdated = toBeUpdated;
this.size = size;
this.beforeFlush = beforeFlush || ( ()=>Promise.resolve() ) ;
this.store = new Store();
this.lockedStores = [];
this.findOrCreate = this.findOrCreate.bind(this);
this.flush = this.flush.bind(this);
this.timer = setInterval(this.flush, autoSaveInterval );
}
getId( doc ){
return this.uniqueFields.map( f => doc[ f ] ).join('::');
}
findOrCreate( item ){
if( Array.isArray( item ) ){
return item.map( this.findOrCreate );
}
const id = this.getId( item );
const existing = this.store.db[ id ] ;
if( existing ){
return Object.assign( existing, item );
}
this.store.add( id, item );
if( this.store.count > this.size ){
debug( `Flushing ${this.model.name} due to overflow ${this.store.count}`);
this.flush();
}
return item;
}
flush(){
if( this.store.count === 0 ){
return Promise.resolve();
}
debug('before Flushing', this.model.name );
const lockedStore = this.store;
this.store = new Store();
// Current data store is Locked now
this.lockedStores.push( lockedStore );
return this.beforeFlush()
.then( () => {
debug('real Flushing', this.model.name );
return this.flushStore( lockedStore );
});
}
flushStore( store ){
return this.model.bulkCreate( Object.values( store.db ), { updateOnDuplicate: this.toBeUpdated } )
.tap(() => this.lockedStores.splice( this.lockedStores.indexOf(store), 1 ) )
.catch( err => console.log('Synching failed', err ));
}
}
module.exports = DataBuffer;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment