Skip to content

Instantly share code, notes, and snippets.

@maxant maxant/market.js
Last active Jun 9, 2019

Embed
What would you like to do?
Files for creating a trading engine with Node.js.
// /////////////////////////////////////////////////
// this file contains all the classes related to a
// market.
// /////////////////////////////////////////////////
var log4js = require('log4js');
var logger = log4js.getLogger(require('path').basename(__filename, '.js'));
require('es6-collections');
var _ = require('underscore');
/**
* A market contains 0..n sellers. A seller has 0..i sales orders
* each of which contains a quantity of a product at a certain price.
* The seller is prepared to sell their product for that price.
* The market also contains 0..m buyers. Each buyer has 0..j purchase
* orders. A purchase order is for a given product and quantity. The
* purchase price may not exceed the given price.
* The market works by continuously looping through trade sittings.
* See the {@link #trade} method.
*
* @constructor
* @author Ant
* @property {Array} sellers {@link Seller}s who currently have sales orders.
* @property {Array} buyers {@link Buyer}s who currently have purchase orders.
* @property {Object} marketInfo statistics about what is currently on offer and what is currently being looked for.
*/
function Market(){
this.sellers = [];
this.buyers = [];
this.marketInfo = {};
this.addSeller = function(seller){
this.sellers.push(seller);
};
this.addBuyer = function(buyer){
this.buyers.push(buyer);
};
/**
* At a single trade sitting, the following happens:
* 1) find all products available (on offer by sellers)
* 2) for each product:
* 2a) for each buyer interested in that product:
* 2ai) find the seller with the cheapest price for the current product
* 2aii) if such a seller exists, create a sale, otherwise nobody is selling the product anymore, so skip to next product.
*
* The point is that a buyer always goes to the cheapest seller, even if that seller doesnt have enough
* quantity. A buyer who wants more has to wait until the next trading session to find the next most suitable
* seller.
*
* @return {Array} array of {@link Sale}s in this trade
*/
this.trade = function(){
var self = this;
var sales = [];
var productsInMarket = this.getProductsInMarket().values();
this.collectMarketInfo();
//trade each product in succession
_.each(productsInMarket, function(productId){
var soldOutOfProduct = false;
logger.debug('trading product ' + productId);
var buyersInterestedInProduct = self.getBuyersInterestedInProduct(productId);
if(buyersInterestedInProduct.length === 0){
logger.info('no buyers interested in product ' + productId);
}else{
_.each(buyersInterestedInProduct, function(buyer){
if(!soldOutOfProduct){
logger.debug(' buyer ' + buyer.name + ' is searching for product ' + productId);
//select the cheapest seller
var cheapestSeller = _.chain(self.sellers)
.filter(function(seller){return seller.hasProduct(productId);})
.sortBy(function(seller){return seller.getCheapestSalesOrder(productId).price;})
.first()
.value();
if(cheapestSeller){
logger.debug(' cheapest seller is ' + cheapestSeller.name);
var newSales = self.createSale(buyer, cheapestSeller, productId);
sales = sales.concat(newSales);
logger.debug(' sales completed');
}else{
logger.warn(' market sold out of product ' + productId);
soldOutOfProduct = true;
}
}
});
}
});
return sales;
};
this.collectMarketInfo = function(){
this.marketInfo = {};
this.marketInfo.pos = _.chain(this.buyers).reduce(function(accum, buyer){
return accum.concat(buyer.purchaseOrders);
}, [])
.groupBy(function(po){ return po.productId; }).value();
this.marketInfo.sos = _.chain(this.sellers).reduce(function(accum, seller){
return accum.concat(seller.salesOrders);
}, []).groupBy(function(so){ return so.productId; }).value();
for(var productId in this.marketInfo.pos){
this.marketInfo.pos[productId] = this.marketInfo.pos[productId].length;
}
for(var productId in this.marketInfo.sos){
this.marketInfo.sos[productId] = this.marketInfo.sos[productId].length;
}
};
/**
* creates a sale if the prices is within the buyers budget.
* iterates all of the buyers purchase wishes for the given product
* so long as the seller still has the product.
* @return {Array} array of new {@link Sale}s, after having removed a quantity of the product from the seller/buyer.
*/
this.createSale = function(buyer, seller, productId){
var cheapestSalesOrder = seller.getCheapestSalesOrder(productId);
logger.debug('cheapest sales order ' + cheapestSalesOrder);
//find the buyers purchase orders, where the po.price => cheapestSalesOrder.price
//create a sale for each buyer's purchase order until either the seller has no more stock at this price
//or the buyer has bought all they want
var purchaseOrders = buyer.getRelevantPurchaseOrders(productId, cheapestSalesOrder.price);
logger.debug('relevant purchase orders: ' + purchaseOrders);
var sales = [];
_.each(purchaseOrders, function(purchaseOrder){
var quantity = Math.min(cheapestSalesOrder.remainingQuantity, purchaseOrder.remainingQuantity);
logger.debug('quantity ' + quantity + ' for PO: ' + purchaseOrder);
if(quantity > 0){
var sale = new Sale(buyer, seller, productId, cheapestSalesOrder.price, quantity);
//add PO and SO for events
sale.po = purchaseOrder;
sale.so = cheapestSalesOrder;
sales.push(sale);
logger.debug('created sale: ' + sale);
//adjust quantities
purchaseOrder.remainingQuantity -= quantity;
cheapestSalesOrder.remainingQuantity -= quantity;
//remove completed purchase wishes
if(purchaseOrder.remainingQuantity === 0){
logger.debug('PO complete: ' + sale);
buyer.removePurchaseOrder(purchaseOrder);
}
}
});
//remove completed sales orders
if(cheapestSalesOrder.remainingQuantity === 0){
logger.debug('SO complete: ' + cheapestSalesOrder);
seller.removeSalesOrder(cheapestSalesOrder);
}
return sales;
};
/** @return all buyers in the market who have a purchase order for the given product */
this.getBuyersInterestedInProduct = function(productId){
return _.filter(this.buyers, function(buyer){
return undefined !== _.find(buyer.purchaseOrders, function(po){
return po.productId === productId;
});
});
};
/** @return {Set} all product IDs (Strings) that are for sale in the market */
this.getProductsInMarket = function(){
var productsInMarket = new Set();
_.each(this.sellers, function(seller){
_.each(seller.salesOrders, function(salesOrder){
productsInMarket.add(salesOrder.productId);
});
});
return productsInMarket;
};
}
exports.Market = Market;
/**
* @constructor
* @author Ant
*/
function Seller(name){
this.name = name;
this.salesOrders = [];
this.addSalesOrder = function(salesOrder){
logger.debug(name + ' adding ' + salesOrder);
salesOrder.seller = this;
this.salesOrders.push(salesOrder);
};
this.hasProduct = function(productId){
return _.some(this.salesOrders, function(so){ return so.productId === productId; });
};
/** @return {SalesOrder} the sales order for the given product that has the lowest price */
this.getCheapestSalesOrder = function(productId){
return _.chain(this.salesOrders)
.filter(function(so){ return so.productId === productId; })
.sortBy(function(so){ return so.price; })
.first()
.value();
};
this.removeSalesOrder = function(salesOrder){
this.salesOrders = this.salesOrders.filter(function(so){ return salesOrder !== so;});
};
this.removeOutdatedSalesOrders = function(ageInMs){
var now = new Date().getTime();
var p = _.partition(this.salesOrders, function(so){ return now-so.created.getTime() > ageInMs; });
this.salesOrders = p[1];
return p[0];
};
}
exports.Seller = Seller;
Seller.prototype.toString = function(){
return 'Seller(' +
'name=' + this.name + ',' +
'SOs=[' + this.salesOrders + '])';
};
/**
* an order to sell a given quantity of a product at a given price
* @constructor
* @author Ant
*/
function SalesOrder(price, productId, quantity, id){
this.price = price;
this.productId = productId;
this.remainingQuantity = quantity;
this.originalQuantity = quantity;
this.created = new Date();
this.id = id;
}
exports.SalesOrder = SalesOrder;
SalesOrder.prototype.toString = function(){
return 'SalesOrder(' +
(this.seller ? this.seller.name + ',' : '') +
'product=' + this.productId + ',' +
'price=' + this.price + ',' +
'remainingQty=' + this.remainingQuantity + ',' +
'originalQty=' + this.originalQuantity + ',' +
'id=' + this.id + ',' +
'created=' + this.created + ')';
};
/**
* a sale from a seller to a buyer for the given product and price and quantity.
* @constructor
* @author Ant
*/
function Sale(buyer, seller, productId, price, quantity){
this.buyer = buyer;
this.seller = seller;
this.productId = productId;
this.price = price;
this.quantity = quantity;
this.timestamp = new Date();
}
exports.Sale = Sale;
Sale.prototype.toString = function(){
return 'Sale(' +
this.buyer.name + ',' +
this.seller.name + ',' +
'product=' + this.productId + ',' +
'price=' + this.price + ',' +
'qty=' + this.quantity + ',' +
'timestamp=' + this.timestamp + ')';
};
/**
* @constructor
* @author Ant
*/
function Buyer(name){
this.name = name;
this.purchaseOrders = [];
this.addPurchaseOrder = function(purchaseOrder){
logger.debug(name + ' adding ' + purchaseOrder);
purchaseOrder.buyer = this;
this.purchaseOrders.push(purchaseOrder);
};
/** @return {Array} all the {@link PurchaseOrder}s for the given product, where the maximum acceptable
price is more than the given price */
this.getRelevantPurchaseOrders = function(productId, price){
var pos = [];
_.each(this.purchaseOrders, function(po){
if(po.productId === productId){
if(po.maximumAcceptedPrice >= price){
pos.push(po);
}
}
});
return pos;
};
this.removePurchaseOrder = function(purchaseOrder){
this.purchaseOrders = this.purchaseOrders.filter(function(e){ return purchaseOrder !== e;});
};
this.removeOutdatedPurchaseOrders = function(ageInMs){
var now = new Date().getTime();
var p = _.partition(this.purchaseOrders, function(po){ return now-po.created.getTime() > ageInMs; });
this.purchaseOrders = p[1];
return p[0];
};
}
exports.Buyer = Buyer;
Buyer.prototype.toString = function(){
return 'Buyer(' +
'name=' + this.name + ',' +
'POs=[' + this.purchaseOrders + '])';
};
/**
* @constructor
* @author Ant
*/
function PurchaseOrder(productId, quantity, maximumAcceptedPrice, id){
this.productId = productId;
this.remainingQuantity = quantity;
this.originalQuantity = quantity;
this.maximumAcceptedPrice = maximumAcceptedPrice;
this.id = id;
this.created = new Date();
}
exports.PurchaseOrder = PurchaseOrder;
PurchaseOrder.prototype.toString = function(){
return 'PurchaseOrder(' +
(this.buyer ? this.buyer.name + ',' : '') +
'product=' + this.productId + ',' +
'maxPrice=' + this.maximumAcceptedPrice + ',' +
'remainingQty=' + this.remainingQuantity + ',' +
'originalQty=' + this.originalQuantity + ',' +
'id=' + this.id + ',' +
'created=' + this.created + ')';
};
// /////////////////////////////////////////////////
// a child process which wraps a trading engine for
// certain products. work is passed from the parent
// and completed work is passed back to the parent.
// /////////////////////////////////////////////////
var log4js = require('log4js');
var logger = log4js.getLogger(require('path').basename(__filename, '.js'));
var t = require('./trading-engine.js');
var m = require('./market.js');
var _ = require('underscore');
var debug = require('./debug.js').debug;
log4js.configure('log4js.json', { reloadSecs: 10 });
var DELAY = 3; //how many milliseconds between trading sessions
var TIMEOUT = 60000; //num ms after which incomplete SOs and POs should be removed
var engine = new t.TradingEngine(DELAY, TIMEOUT, function(t,d){
event(null, t, d);
});
engine.start();
logger.info('---started trading');
var buyers = new Map();
var sellers = new Map();
var knownProducts = new Set();
var numTimeouts = 0;
process.on('message', function(model) {
logger.debug('received command: "' + model.command + '"');
if(model.command == t.EventType.PURCHASE){
var buyer = buyers.get(model.who);
if(!buyer){
logger.debug('buyer named ' + model.who + ' doesnt exist -> adding a new one');
buyer = new m.Buyer(model.who);
engine.market.buyers.push(buyer);
buyers.set(buyer.name, buyer);
}
knownProducts.add(model.what.productId);
var po = new m.PurchaseOrder(model.what.productId, model.what.quantity, model.what.maxPrice, model.id);
buyer.event = function(type, data){ event(model.id, type, data); };
buyer.addPurchaseOrder(po);
}else if(model.command == t.EventType.SALE){
var seller = sellers.get(model.who);
if(!seller){
logger.debug('seller named ' + model.who + ' doesnt exist -> adding a new one');
seller = new m.Seller(model.who);
engine.market.sellers.push(seller);
sellers.set(seller.name, seller);
}
knownProducts.add(model.what.productId);
var so = new m.SalesOrder(model.what.price, model.what.productId, model.what.quantity, model.id);
seller.event = function(type, data){ event(model.id, type, data); };
seller.addSalesOrder(so);
}else{
var msg = 'Unknown command ' + model.command;
logger.warn(msg);
process.send({id: model.id, err: msg});
}
});
function event(id, type, data){
switch(type){
case t.EventType.SALE:
if(data.so.remainingQuantity === 0){
var msg = 'COMPLETED sales order';
process.send({id: id,
msg: msg,
data: {productId: data.productId,
quantity: data.quantity,
price: data.price,
buyer: data.buyer.name,
seller: data.seller.name,
id: data.id
}
});
logger.info('\n' + id + ') ' + msg + ' ' + data);
}else{
logger.info('\n' + id + ') PARTIAL sales order ' + data);
}
break;
case t.EventType.PURCHASE:
if(data.po.remainingQuantity === 0){
var msg = 'COMPLETED purchase order';
process.send({id: id,
msg: msg,
data: {productId: data.productId,
quantity: data.quantity,
price: data.price,
buyer: data.buyer.name,
seller: data.seller.name,
id: data.id
}
});
logger.info('\n' + id + ') ' + msg + ' ' + data);
}else{
logger.info('\n' + id + ') PARTIAL purchase order ' + data);
}
break;
case t.EventType.TIMEOUT_SALESORDER:
var msg = 'TIMEOUT sales order';
process.send({id: id,
msg: msg,
data: {productId: data.productId,
quantity: data.remainingQuantity,
price: data.price,
numTimeouts: numTimeouts++
}
});
logger.info('\n' + id + ') ' + msg + ' ' + data);
break;
case t.EventType.TIMEOUT_PURCHASEORDER:
var msg = 'TIMEOUT purchase order';
process.send({id: id,
msg: msg,
data: {productId: data.productId,
quantity: data.remainingQuantity,
price: data.price,
numTimeouts: numTimeouts++
}
});
logger.info('\n' + id + ') ' + msg + ' ' + data);
break;
case t.EventType.STATS:
var marketPrices = {};
var volumes = {};
_.each(knownProducts.values(), function(e){
marketPrices[e] = engine.getCurrentMarketPrice(e);
volumes[e] = engine.getCurrentVolume(e);
});
process.send({marketInfo: engine.market.marketInfo,
marketPrices: marketPrices,
volumes: volumes
});
break;
default:
break;
}
}
// /////////////////////////////////////////////////
// in this generation i go back to using children,
// but i no longer block connections.
// commands are taken, and put into the model
// and a reference is returned. the client can then
// poll for results.
//
// run: node lib/trading-engine-parent3.js
// /////////////////////////////////////////////////
var log4js = require('log4js');
var logger = log4js.getLogger(require('path').basename(__filename, '.js'));
var cp = require('child_process');
var t = require('./trading-engine.js');
var m = require('./market.js');
var _ = require('underscore');
require('es6-collections');
log4js.configure('log4js.json', { reloadSecs: 10 });
//TODO use config to decide how many child processes to start
var NUM_KIDS = 2;
var PRODUCT_IDS = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', //watch out, need to be Strings!!
'10', '11', '12', '13', '14', '15', '16', '17', '18', '19',
'20', '21', '22', '23', '24', '25', '26', '27', '28', '29',
'30', '31', '32', '33', '34', '35', '36', '37', '38', '39',
'40', '41', '42', '43', '44', '45', '46', '47', '48', '49',
'50', '51', '52', '53', '54', '55', '56', '57', '58', '59',
'60', '61', '62', '63', '64', '65', '66', '67', '68', '69',
'70', '71', '72', '73', '74', '75', '76', '77', '78', '79',
'80', '81', '82', '83', '84', '85', '86', '87', '88', '89',
'90', '91', '92', '93', '94', '95', '96', '97', '98', '99'
];
var chunk = PRODUCT_IDS.length / NUM_KIDS;
var stats = { marketInfo: {pos: {}, sos: {}}, volumes: {}, marketPrices: {}, numTimeouts: 0 };
var kids = new Map();
for (var i=0, j=PRODUCT_IDS.length; i<j; i+=chunk) {
var n = cp.fork('./lib/trading-engine-child.js');
n.on('message', messageFromChild);
var temparray = PRODUCT_IDS.slice(i,i+chunk);
logger.info('created child process for products ' + temparray);
_.each(temparray, function(e){
logger.debug('mapping productId "' + e + '" to child process ' + n.pid);
kids.set(e, n);
});
}
// //////////////// RESULTS ////////////////////////////////
// keep results in memory, just in case a client wants to know what happened...
// see comment below
var results = new Map();
function messageFromChild(model) {
if(model.id){
//TODO improve model - it should contain say an action or command, rather than implicitly having the id be the case for a response and otherwise it being stats!
model.completed = new Date();
results.set(model.id, model);
if(model.data && model.data.numTimeouts){
stats.numTimeouts += model.data.numTimeouts;
}
}else{
if(model.marketInfo){
for(var productId in model.marketInfo.pos){
stats.marketInfo.pos[productId] = model.marketInfo.pos[productId];
}
for(var productId in model.marketInfo.sos){
stats.marketInfo.sos[productId] = model.marketInfo.sos[productId];
}
}
var numSalesInLastMinute = 0;
for(var productId in model.volumes){
stats.volumes[productId] = model.volumes[productId];
numSalesInLastMinute += stats.volumes[productId].count;
}
for(var productId in model.marketPrices){
stats.marketPrices[productId] = model.marketPrices[productId];
}
stats.totalSalesPerMinute = numSalesInLastMinute * 6; //since stats are recored for the last 10 secs
}
};
// remove results older than a minute, every 5 seconds.
// in a real system you wouldnt necessarily cache results like
// we are doing - the sales are actually persisted by the
// trading engine - so clients could go look there!
// TODO can we use functional style with ES6 collections?
setInterval(function(){
logger.error('cleaning results... sales per minute: ' + stats.totalSalesPerMinute + ', num timedout orders=' + stats.numTimeouts);
var now = new Date().getTime();
var toRemove = [];
results.forEach(function(context, key, value, mapObject){
if(value.completed && now - value.completed.getTime() > 60000){
toRemove.push(key);
}
});
_.each(toRemove, function(e){
results.delete(e);
});
logger.info('completed cleaning results in ' + (new Date().getTime()-now) + 'ms');
}, 5000);
// ////////////////////// UI /////////////////////////////////
logger.info('setting up HTTP server for receiving commands');
var express = require('express')
var app = express()
var id = 0;
app.get('/buy', function (req, res) {
logger.info(id + ') buying "' + req.query.quantity + '" of "' + req.query.productId + '"');
var kid = kids.get(req.query.productId);
if(kid){
var cmd = {command: t.EventType.PURCHASE,
id: id,
who: req.query.userId,
what: {productId: req.query.productId, quantity: req.query.quantity, maxPrice: 2000.0}
};
kid.send(cmd);
res.json(cmd);
id++;
}else{
res.send('Unknown product ' + req.query.productId + ' - why is the client using that ID??');
}
});
app.get('/sell', function (req, res) {
logger.info(id + ') selling "' + req.query.quantity + '" of "' + req.query.productId + '" at price "' + req.query.price + '"');
var kid = kids.get(req.query.productId);
if(kid){
var cmd = {command: t.EventType.SALE,
id: id,
who: req.query.userId,
what: {productId: req.query.productId, quantity: req.query.quantity, price: req.query.price}
};
kid.send(cmd);
res.json(cmd);
id++;
}else{
res.send('Unknown product ' + req.query.productId + ' - why is the client using that ID??');
}
});
app.get('/marketStats', function (req, res) {
res.json(stats);
});
app.get('/result', function (req, res) {
var key = parseInt(req.query.id);
var r = results.get(key);
if(r){
results.delete(key);
res.json(r);
}else{
res.json({msg: 'UNKNOWN OR PENDING'});
}
});
var server = app.listen(3000, function () {
var host = server.address().address;
var port = server.address().port;
logger.warn('Trading engine listening at http://%s:%s', host, port)
});
// /////////////////////////////////////////////////
// this file contains all classes related to a trading
// engine which uses a market to simulate a trading platform.
// /////////////////////////////////////////////////
var log4js = require('log4js');
var logger = log4js.getLogger(require('path').basename(__filename, '.js'));
var m = require('./market.js');
var Market = m.Market;
var Seller = m.Seller;
var Buyer = m.Buyer;
var Sale = m.Sale;
var PurchaseOrder = m.PurchaseOrder;
var debug = require('./debug.js').debug;
var resources = require('./service-locator.js').locate('../lib/resources.js');
var _ = require('underscore');
require('es6-collections');
///////////// constants ///////////////
var SWEETIES = 'sweeties';
var SALES = 'sales';
exports.EventType = {};
exports.EventType.SALE = 'SALE';
exports.EventType.PURCHASE = 'PURCHASE';
exports.EventType.TIMEOUT_SALESORDER = 'TIMEOUT_SALESORDER';
exports.EventType.TIMEOUT_PURCHASEORDER = 'TIMEOUT_PURCHASEORDER';
exports.EventType.STATS = 'STATS';
/**
* basically a buyer goes into the market at a time where they are happy to pay the market price.
* they take it from the cheapest seller (ie the market price).
* depending on who is left, the market price goes up or down
*
* a trading engine has one market place and it controls the frequency of trades.
* between trades:
* - sellers and buyers may enter and exit
* - all sales are persisted
* @constructor
* @author Ant
* @parameter delay number of milliseconds between trades
* @parameter timeout the number of milliseconds after which incomplete sales or purchase orders should be removed and their buyer/seller informed of the (partial) failure.
*/
function TradingEngine(delay, timeout, listener){
logger.debug('market is opening for trading!');
this.market = new Market();
this.marketPrices = new Map(); //productId:marketPrice
this.volumeRecords = new Map(); //productId:Array[VolumeRecord]
this.running = false;
/**
* starts the trading asynchronously i.e. when the event loop is next free.
* if already running, has no effect.
* @method
*/
this.start = function(){
if(!this.running){
this.running = true;
setTimeout(loop, 0); //return immediately
}
};
/**
* gracefully stops the engine after the current trading session completes
* @method
* @parameter stopCallback an optional callback (taking no parameters) which will be called when
* the engine actually stops.
*/
this.stop = function(stopCallback){
this.running = false;
this.stopCallback = stopCallback;
};
var self = this;
function loop(){
logger.debug('\n\n------------------------------- trading...-------------------------');
var start = new Date().getTime();
prepareMarket(self.market, timeout);
var sales = self.market.trade();
logger.info('trading completed');
noteMarketPricesAndVolumes(self.marketPrices, self.volumeRecords, sales);
persistSale(sales, function(err){
if(err) logger.warn(err);
else {
logger.info('persisting completed, notifying involved parties...');
_.each(sales, function(sale){
if(sale.buyer.event) sale.buyer.event(exports.EventType.PURCHASE, sale);
if(sale.seller.event) sale.seller.event(exports.EventType.SALE, sale);
});
if(sales.length > 0){
logger.warn('trading of ' + sales.length + ' sales completed and persisted in ' + (new Date().getTime()-start) + 'ms');
}else{
logger.info('no trades...');
}
//debug(self.market, 10, false);
}
if(self.running){
if(listener) listener(exports.EventType.STATS, self.market.marketInfo);
setTimeout(loop, 0 + delay); //let the process handle other stuff too
}else{
logger.warn('Stopping trading!');
if(self.stopCallback) self.stopCallback();
}
});
}
/** @method
* @return {Object} the newly created {@link Buyer} which has been added to the market */
this.addBuyer = function(name){
var buyer = new Buyer(name);
this.market.buyers.push(buyer);
return buyer;
};
/** @method
* @return {Object} the newly created {@link Seller} which has been added to the market */
this.addSeller = function(name){
var seller = new Seller(name);
this.market.sellers.push(seller);
return seller;
};
/** @method @return a VolumeRecord, just with no timestamp. properties are total in last minute. */
this.getCurrentVolume = function(productId){
var vrs = this.volumeRecords.get(productId);
if(vrs){
var now = new Date().getTime();
vrs = _.filter(vrs, function(vr){ return now - vr.timestamp.getTime() < 1000*10;}); //remove old
this.volumeRecords.set(productId, vrs); //ensure records contains most up to date
return _.reduce(vrs, function(acc, e){
return new VolumeRecord(productId, acc.numberOfSales + e.numberOfSales, acc.turnover + e.turnover, null, acc.count + e.count);
}, new VolumeRecord(productId, 0, 0, null, 0));
}else{
return new VolumeRecord(productId, 0, 0, null, 0);
}
};
/** @method @return the last known price */
this.getCurrentMarketPrice = function(productId){
return this.marketPrices.get(productId);
};
}
exports.TradingEngine = TradingEngine;
//handles timed out orders
function prepareMarket(market, timeout){
//handle timeouted sales orders
_.each(market.sellers, function(seller){
var incompleteSOs = seller.removeOutdatedSalesOrders(timeout);
_.each(incompleteSOs, function(so){
if(so.seller.event) so.seller.event(exports.EventType.TIMEOUT_SALESORDER, so);
else logger.debug('incomplete SO: ' + so);
});
});
//handle timeouted purchase orders
_.each(market.buyers, function(buyer){
var incompletePOs = buyer.removeOutdatedPurchaseOrders(timeout);
_.each(incompletePOs, function(po){
if(po.buyer.event) po.buyer.event(exports.EventType.TIMEOUT_PURCHASEORDER, po);
else logger.debug('incomplete PO: ' + po);
});
});
}
function persistSale(sales, callback){
if(sales.length === 0) {
callback();
}else{
resources.dbConnection(function(err, connection) {
if(err) callback(err); else {
saveOne(sales, 0, connection, callback);
}
});
}
}
function saveOne(sales, idx, connection, callback){
if(idx < sales.length){
logger.info('preparing to persist ' + sales.length + ' sales');
var sale = sales[idx];
connection.query("INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) " +
"values (?, ?, ?, ?, ?, ?, ?)",
[sale.buyer.name, sale.seller.name, sale.productId, sale.price, sale.quantity, sale.po.id, sale.so.id],
function(err, rows, fields) {
if(err) callback(err); else {
sale.id = rows.insertId;
saveOne(sales, idx+1, connection, callback);
}
});
}else{
logger.info('persisted ' + idx + ' sales');
connection.release();
callback();
}
}
function noteMarketPricesAndVolumes(marketPrices, volumeRecords, sales){
_.each(sales, function(sale){
updateMarketPrice(sale, marketPrices);
updateMarketVolume(sale, volumeRecords);
});
}
/**
* @constructor
*/
function MarketPrice(productId, price, timestamp){
this.productId = productId;
this.price = price;
this.timestamp = timestamp;
}
function updateMarketPrice(sale, marketPrices){
var mp = marketPrices.get(sale.productId);
if(!mp || (mp && mp.timestamp.getTime() < sale.timestamp.getTime())){
//set price if none is known, or replace price if its older than current price
marketPrices.set(sale.productId, new MarketPrice(sale.productId, sale.price, sale.timestamp));
}
}
/**
* @constructor
*/
function VolumeRecord(productId, numberOfSales, turnover, timestamp, count){
this.productId = productId;
this.numberOfSales = numberOfSales;
this.turnover = turnover;
this.timestamp = timestamp;
this.count = count;
}
function updateMarketVolume(sale, volumeRecords){
var vrs = volumeRecords.get(sale.productId);
if(vrs){
var now = new Date().getTime();
vrs = _.filter(vrs, function(vr){ return now - vr.timestamp.getTime() < 1000*10;}); //remove older than 10 secs
}else{
vrs = [];
}
vrs.push(new VolumeRecord(sale.productId, sale.quantity, sale.quantity * sale.price, sale.timestamp, 1)); //scale up to "per minute"
volumeRecords.set(sale.productId, vrs); //replace with old one
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.