Created
July 7, 2017 20:59
-
-
Save harish2704/34c978ef730821364316570a8f46c1bb to your computer and use it in GitHub Desktop.
An in-memory data buffer class for Sequelize models. Can be used for buffering findOrCreate / create / upsert queries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* ഓം ബ്രഹ്മാർപ്പണം. */ | |
/* | |
* DataBuffer.js | |
* Created: Fri Jul 07 2017 23:14:58 GMT+0530 (IST) | |
* Copyright 2017 Harish.K<harish2704@gmail.com> | |
*/ | |
/** | |
* DataBuffer | |
* | |
* An in-memory buffer for sequelize model. | |
* By default, findOrCreate method will just keep data in memory. | |
* When memory size exceeds DataBuffer *size*, current datastore is locked. Then it will be bulk inserted into corresponding sequelize model | |
* Other than buffer overflow, there will be one setInterval timer running for earch DataBuffer, | |
* which will flush the buffer a specific time even when the buffer is not full | |
*/ | |
const debug = console.log.bind( console, 'DataBuffer::debug ' ); | |
class Store{ | |
constructor(){ | |
this.db = {}; | |
this.count = 0; | |
} | |
add( id, item ){ | |
this.db[ id ] = item; | |
this.count++; | |
} | |
remove( id ){ | |
delete this.db[ id ]; | |
this.count--; | |
} | |
} | |
class DataBuffer{ | |
constructor( { uniqueFields, size, model, toBeUpdated, beforeFlush, autoSaveInterval=30000 } ){ | |
this.uniqueFields = uniqueFields; | |
this.model = model; | |
this.toBeUpdated = toBeUpdated; | |
this.size = size; | |
this.beforeFlush = beforeFlush || ( ()=>Promise.resolve() ) ; | |
this.store = new Store(); | |
this.lockedStores = []; | |
this.findOrCreate = this.findOrCreate.bind(this); | |
this.flush = this.flush.bind(this); | |
this.timer = setInterval(this.flush, autoSaveInterval ); | |
} | |
getId( doc ){ | |
return this.uniqueFields.map( f => doc[ f ] ).join('::'); | |
} | |
findOrCreate( item ){ | |
if( Array.isArray( item ) ){ | |
return item.map( this.findOrCreate ); | |
} | |
const id = this.getId( item ); | |
const existing = this.store.db[ id ] ; | |
if( existing ){ | |
return Object.assign( existing, item ); | |
} | |
this.store.add( id, item ); | |
if( this.store.count > this.size ){ | |
debug( `Flushing ${this.model.name} due to overflow ${this.store.count}`); | |
this.flush(); | |
} | |
return item; | |
} | |
flush(){ | |
if( this.store.count === 0 ){ | |
return Promise.resolve(); | |
} | |
debug('before Flushing', this.model.name ); | |
const lockedStore = this.store; | |
this.store = new Store(); | |
// Current data store is Locked now | |
this.lockedStores.push( lockedStore ); | |
return this.beforeFlush() | |
.then( () => { | |
debug('real Flushing', this.model.name ); | |
return this.flushStore( lockedStore ); | |
}); | |
} | |
flushStore( store ){ | |
return this.model.bulkCreate( Object.values( store.db ), { updateOnDuplicate: this.toBeUpdated } ) | |
.tap(() => this.lockedStores.splice( this.lockedStores.indexOf(store), 1 ) ) | |
.catch( err => console.log('Synching failed', err )); | |
} | |
} | |
module.exports = DataBuffer; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment