Skip to content

Instantly share code, notes, and snippets.

@maxant
Created December 24, 2014 22:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save maxant/3d4c5cfdd21c62647d3f to your computer and use it in GitHub Desktop.
Save maxant/3d4c5cfdd21c62647d3f to your computer and use it in GitHub Desktop.
Code for the Java trading engine solution
package ch.maxant.tradingengine.model;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import ch.maxant.tradingengine.model.TradingEngine.Listener;
public class Buyer {
private static final Logger LOGGER = LogManager.getLogger("buyer");
private String name;
private List<PurchaseOrder> purchaseOrders = new ArrayList<>();
public Listener listener;
public Buyer(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void addPurchaseOrder(PurchaseOrder purchaseOrder) {
LOGGER.debug(name + " adding " + purchaseOrder);
purchaseOrder.setBuyer(this);
this.purchaseOrders.add(purchaseOrder);
}
/**
* @return {Array} all the {@link PurchaseOrder}s for the given product,
* where the maximum acceptable price is more than the given price
*/
public List<PurchaseOrder> getRelevantPurchaseOrders(String productId,
double price) {
return this.purchaseOrders
.stream()
.filter(po -> {
return po.getProductId().equals(productId)
&& po.getMaximumAcceptedPrice() >= price;
}).collect(Collectors.toList());
}
public void removePurchaseOrder(PurchaseOrder purchaseOrder) {
this.purchaseOrders.remove(purchaseOrder);
}
public List<PurchaseOrder> removeOutdatedPurchaseOrders(long ageInMs) {
long now = System.currentTimeMillis();
List<PurchaseOrder> filter = this.purchaseOrders.stream()
.filter(po -> {
return now - po.getCreated().getTime() > ageInMs;
}).collect(Collectors.toList());
this.purchaseOrders.removeAll(filter);
return filter;
}
public List<PurchaseOrder> getPurchaseOrders() {
return purchaseOrders;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Buyer other = (Buyer) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
@Override
public String toString() {
return "Buyer [name=" + name + "]";
}
}
package ch.maxant.tradingengine.web;
import java.util.concurrent.atomic.AtomicInteger;
public final class Constants {
// TODO use config to decide how engines to start
public static final int NUM_KIDS = 4;
public static final AtomicInteger ID = new AtomicInteger();
public static final String[] PRODUCT_IDS = { "0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17",
"18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28",
"29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39",
"40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "50",
"51", "52", "53", "54", "55", "56", "57", "58", "59", "60", "61",
"62", "63", "64", "65", "66", "67", "68", "69", "70", "71", "72",
"73", "74", "75", "76", "77", "78", "79", "80", "81", "82", "83",
"84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94",
"95", "96", "97", "98", "99" };
public static final long DELAY = 3; // how many milliseconds between
// trading
// sessions
public static final long TIMEOUT = 60000; // num ms after which incomplete
// SOs and POs should be removed
}
package ch.maxant.tradingengine.model;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* A market contains 0..n sellers. A seller has 0..i sales orders each of which
* contains a quantity of a product at a certain price. The seller is prepared
* to sell their product for that price. The market also contains 0..m buyers.
* Each buyer has 0..j purchase orders. A purchase order is for a given product
* and quantity. The purchase price may not exceed the given price. The market
* works by continuously looping through trade sittings. See the {@link #trade}
* method.
*/
public class Market {
private static final Logger LOGGER = LogManager.getLogger("market");
private List<Seller> sellers = new ArrayList<>();
private List<Buyer> buyers = new ArrayList<>();
private MarketInfo marketInfo;
public void addSeller(Seller seller) {
this.sellers.add(seller);
}
public void addBuyer(Buyer buyer) {
this.buyers.add(buyer);
}
/**
* At a single trade sitting, the following happens: 1) find all products
* available (on offer by sellers) 2) for each product: 2a) for each buyer
* interested in that product: 2ai) find the seller with the cheapest price
* for the current product 2aii) if such a seller exists, create a sale,
* otherwise nobody is selling the product anymore, so skip to next product.
*
* The point is that a buyer always goes to the cheapest seller, even if
* that seller doesnt have enough quantity. A buyer who wants more has to
* wait until the next trading session to find the next most suitable
* seller.
*
* @return {Array} array of {@link Sale}s in this trade
*/
public List<Sale> trade() {
List<Sale> sales = new ArrayList<>();
Set<String> productsInMarket = getProductsInMarket();
this.collectMarketInfo();
// trade each product in succession
productsInMarket
.stream()
.forEach(
productId -> {
MutableBoolean soldOutOfProduct = new MutableBoolean(
false);
LOGGER.debug("trading product " + productId);
List<Buyer> buyersInterestedInProduct = getBuyersInterestedInProduct(productId);
if (buyersInterestedInProduct.size() == 0) {
LOGGER.info("no buyers interested in product "
+ productId);
} else {
buyersInterestedInProduct.forEach(buyer -> {
if (soldOutOfProduct.isFalse()) {
LOGGER.debug(" buyer "
+ buyer.getName()
+ " is searching for product "
+ productId);
// select the cheapest seller
Optional<Seller> cheapestSeller = sellers
.stream()
.filter(seller -> {
return seller
.hasProduct(productId);
})
.sorted((s1, s2) -> Double
.compare(
s1.getCheapestSalesOrder(
productId)
.getPrice(),
s2.getCheapestSalesOrder(
productId)
.getPrice()))
.findFirst();
if (cheapestSeller.isPresent()) {
LOGGER.debug(" cheapest seller is "
+ cheapestSeller.get()
.getName());
List<Sale> newSales = createSale(
buyer,
cheapestSeller.get(),
productId);
sales.addAll(newSales);
LOGGER.debug(" sales completed");
} else {
LOGGER.warn(" market sold out of product "
+ productId);
soldOutOfProduct.setTrue();
}
}
});
}
});
return sales;
};
public void collectMarketInfo() {
this.marketInfo = new MarketInfo();
this.marketInfo.pos = buyers.stream().map(buyer -> {
return buyer.getPurchaseOrders();
}).flatMap(l -> l.stream()).collect(Collectors.groupingBy(po -> {
return po.getProductId();
}));
this.marketInfo.sos = sellers.stream().map(seller -> {
return seller.getSalesOrders();
}).flatMap(l -> l.stream()).collect(Collectors.groupingBy(so -> {
return so.getProductId();
}));
/*
* for(String productId : this.marketInfo.pos.keySet()){
* this.marketInfo.pos.put(productId) =
* this.marketInfo.pos[productId].length; } for(var productId :
* this.marketInfo.sos){ this.marketInfo.sos[productId] =
* this.marketInfo.sos[productId].length; }
*/
};
/**
* creates a sale if the prices is within the buyers budget. iterates all of
* the buyers purchase wishes for the given product so long as the seller
* still has the product.
*
* @return {Array} array of new {@link Sale}s, after having removed a
* quantity of the product from the seller/buyer.
*/
public List<Sale> createSale(Buyer buyer, Seller seller, String productId) {
SalesOrder cheapestSalesOrder = seller.getCheapestSalesOrder(productId);
LOGGER.debug("cheapest sales order " + cheapestSalesOrder);
// find the buyers purchase orders, where the po.price =>
// cheapestSalesOrder.price
// create a sale for each buyer's purchase order
// until either the seller has no more stock at this price
// or the buyer has bought all they want
List<PurchaseOrder> purchaseOrders = buyer.getRelevantPurchaseOrders(
productId, cheapestSalesOrder.getPrice());
LOGGER.debug("relevant purchase orders: " + purchaseOrders);
List<Sale> sales = new ArrayList<>();
purchaseOrders.stream().forEach(
purchaseOrder -> {
int quantity = Math.min(
cheapestSalesOrder.getRemainingQuantity(),
purchaseOrder.getRemainingQuantity());
LOGGER.debug("quantity " + quantity + " for PO: "
+ purchaseOrder);
if (quantity > 0) {
Sale sale = new Sale(buyer, seller, productId,
cheapestSalesOrder.getPrice(), quantity);
// add PO and SO for events
sale.setPurchaseOrder(purchaseOrder);
sale.setSalesOrder(cheapestSalesOrder);
sales.add(sale);
LOGGER.debug("created sale: " + sale);
// adjust quantities purchaseOrder.remainingQuantity -=
// quantity;
cheapestSalesOrder.reduceRemainingQuantity(quantity);
// remove completed purchase wishes
if (purchaseOrder.getRemainingQuantity() == 0) {
LOGGER.debug("PO complete: " + sale);
buyer.removePurchaseOrder(purchaseOrder);
}
}
});
// remove completed sales orders
if (cheapestSalesOrder.getRemainingQuantity() == 0) {
LOGGER.debug("SO complete: " + cheapestSalesOrder);
seller.removeSalesOrder(cheapestSalesOrder);
}
return sales;
}
/**
* @return all buyers in the market who have a purchase order for the given
* product
*/
public List<Buyer> getBuyersInterestedInProduct(final String productId) {
return this.buyers.stream().filter(buyer -> {
return buyer.getPurchaseOrders().stream().anyMatch(po -> {
return po.getProductId().equals(productId);
});
}).collect(Collectors.toList());
}
/** @return all product IDs that are for sale in the market */
public Set<String> getProductsInMarket() {
// TODO use flatmap, but also in js too!
Set<String> productsInMarket = new HashSet<>();
sellers.forEach(seller -> {
seller.getSalesOrders().stream().forEach(salesOrder -> {
productsInMarket.add(salesOrder.getProductId());
});
});
return productsInMarket;
}
public static class MarketInfo {
public Map<String, List<PurchaseOrder>> pos;
public Map<String, List<SalesOrder>> sos;
}
public MarketInfo getMarketInfo() {
return marketInfo;
}
public List<Seller> getSellers() {
return sellers;
}
public List<Buyer> getBuyers() {
return buyers;
}
}
package ch.maxant.tradingengine.model;
import java.util.Collection;
import java.util.Collections;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
public abstract class Model {
@Override
public boolean equals(Object o) {
return EqualsBuilder.reflectionEquals(this, o, getIgnoredFields());
}
protected Collection<String> getIgnoredFields() {
return Collections.emptySet();
}
@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this, getIgnoredFields());
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this,
ToStringStyle.SHORT_PREFIX_STYLE);
}
}
package ch.maxant.tradingengine.model;
public class IdModel extends Model {
private int id;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}
package ch.maxant.tradingengine.model;
import java.util.Date;
public class PurchaseOrder extends IdModel {
private String productId;
private int remainingQuantity;
private int originalQuantity;
private double maximumAcceptedPrice;
private Date created;
private Buyer buyer;
public PurchaseOrder(String productId, int quantity,
double maximumAcceptedPrice, int id) {
this.productId = productId;
this.remainingQuantity = quantity;
this.originalQuantity = quantity;
this.maximumAcceptedPrice = maximumAcceptedPrice;
this.created = new Date();
setId(id);
}
public void setBuyer(Buyer buyer) {
this.buyer = buyer;
}
public String getProductId() {
return productId;
}
public Buyer getBuyer() {
return buyer;
}
public Date getCreated() {
return created;
}
public double getMaximumAcceptedPrice() {
return maximumAcceptedPrice;
}
public int getOriginalQuantity() {
return originalQuantity;
}
public int getRemainingQuantity() {
return remainingQuantity;
}
}
package ch.maxant.tradingengine.model;
import java.util.Date;
public class Sale extends IdModel {
private Date timestamp;
private Buyer buyer;
private Seller seller;
private String productId;
private double price;
private int quantity;
private SalesOrder salesOrder;
private PurchaseOrder purchaseOrder;
/**
* a sale from a seller to a buyer for the given product and price and
* quantity.
*/
public Sale(Buyer buyer, Seller seller, String productId, double price,
int quantity) {
this.buyer = buyer;
this.seller = seller;
this.productId = productId;
this.price = price;
this.quantity = quantity;
this.timestamp = new Date();
}
public Buyer getBuyer() {
return buyer;
}
public double getPrice() {
return price;
}
public String getProductId() {
return productId;
}
public int getQuantity() {
return quantity;
}
public Seller getSeller() {
return seller;
}
public Date getTimestamp() {
return timestamp;
}
public void setSalesOrder(SalesOrder salesOrder) {
this.salesOrder = salesOrder;
}
public SalesOrder getSalesOrder() {
return salesOrder;
}
public void setPurchaseOrder(PurchaseOrder purchaseOrder) {
this.purchaseOrder = purchaseOrder;
}
public PurchaseOrder getPurchaseOrder() {
return purchaseOrder;
}
}
package ch.maxant.tradingengine.model;
import java.util.Date;
public class SalesOrder extends IdModel {
private double price;
private String productId;
private int remainingQuantity;
private int originalQuantity;
private Date created;
private Seller seller;
/**
* an order to sell a given quantity of a product at a given price
*/
public SalesOrder(double price, String productId, int quantity, int id) {
this.price = price;
this.productId = productId;
this.remainingQuantity = quantity;
this.originalQuantity = quantity;
this.created = new Date();
setId(id);
}
public Date getCreated() {
return created;
}
public int getRemainingQuantity() {
return remainingQuantity;
}
public String getProductId() {
return productId;
}
public double getPrice() {
return price;
}
public int getOriginalQuantity() {
return originalQuantity;
}
public Seller getSeller() {
return seller;
}
public void setSeller(Seller seller) {
this.seller = seller;
}
public void reduceRemainingQuantity(double quantity) {
this.remainingQuantity -= quantity;
}
}
package ch.maxant.tradingengine.model;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import ch.maxant.tradingengine.model.TradingEngine.Listener;
public class Seller {
private static final Logger LOGGER = LogManager.getLogger("seller");
private List<SalesOrder> salesOrders = new ArrayList<>();
private String name;
public Listener listener;
public Seller(String name) {
this.name = name;
}
public void addSalesOrder(SalesOrder salesOrder) {
LOGGER.debug(name + " adding " + salesOrder);
salesOrder.setSeller(this);
this.salesOrders.add(salesOrder);
}
public boolean hasProduct(String productId) {
return this.salesOrders.stream().anyMatch(so -> {
return so.getProductId().equals(productId);
});
}
/**
* @return {SalesOrder} the sales order for the given product that has the
* lowest price
*/
public SalesOrder getCheapestSalesOrder(String productId) {
return this.salesOrders.stream().filter(so -> {
return so.getProductId().equals(productId);
}).sorted((o1, o2) -> Double.compare(o1.getPrice(), o2.getPrice()))
.findFirst().get();
}
public void removeSalesOrder(SalesOrder salesOrder) {
this.salesOrders = this.salesOrders.stream().filter(so -> {
return !salesOrder.equals(so);
}).collect(Collectors.toList());
}
/** @return the out of date ones */
public List<SalesOrder> removeOutdatedSalesOrders(long ageInMs) {
long now = System.currentTimeMillis();
Map<Boolean, List<SalesOrder>> partitioned = salesOrders.stream()
.collect(Collectors.groupingBy(so -> {
return now - so.getCreated().getTime() > ageInMs;
}));
this.salesOrders = ObjectUtils.defaultIfNull(
partitioned.get(Boolean.FALSE),
Collections.synchronizedList(new ArrayList<SalesOrder>()));
return ObjectUtils.defaultIfNull(partitioned.get(Boolean.TRUE),
Collections.synchronizedList(new ArrayList<SalesOrder>()));
}
public List<SalesOrder> getSalesOrders() {
return salesOrders;
}
public String getName() {
return name;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Seller other = (Seller) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
@Override
public String toString() {
return "Seller [name=" + name + "]";
}
}
package ch.maxant.tradingengine.model;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
// /////////////////////////////////////////////////
// this file contains all classes related to a trading
// engine which uses a market to simulate a trading platform.
// /////////////////////////////////////////////////
public class TradingEngine {
private static final Logger LOGGER = LogManager.getLogger("tradingEngine");
public static interface Listener {
public void onEvent(EventType type, Object data);
}
public static enum EventType {
SALE, PURCHASE, TIMEOUT_SALESORDER, TIMEOUT_PURCHASEORDER, STATS, STOPPED
}
private Market market = new Market();
private Map<String, MarketPrice> marketPrices = new HashMap<>();
private Map<String, List<VolumeRecord>> volumeRecords = new HashMap<>();
private InitialContext ctx = new InitialContext();
private Map<Seller, List<SalesOrder>> newSalesOrders = new HashMap<>();
private Map<Buyer, List<PurchaseOrder>> newPurchaseOrders = new HashMap<>();
private long delay;
private long timeout;
private Listener listener;
private boolean running = true;
/**
* if false, then runs in an infinite loop until {@link #stop()} is called.
* if true, then just runs once, and notifies listener that its stopped when
* that one run is done, so that the listener can decide when to restart the
* engine for another trading session.
*/
private boolean runInActorMode;
/**
* basically a buyer goes into the market at a time where they are happy to
* pay the market price. they take it from the cheapest seller (ie the
* market price). depending on who is left, the market price goes up or down
*
* a trading engine has one market place and it controls the frequency of
* trades. between trades: - sellers and buyers may enter and exit - all
* sales are persisted
*
* @param delay
* number of milliseconds between trades
* @param timeout
* the number of milliseconds after which incomplete sales or
* purchase orders should be removed and their buyer/seller
* informed of the (partial) failure.
* @throws NamingException
*/
public TradingEngine(long delay, long timeout, Listener listener)
throws NamingException {
this(delay, timeout, listener, false);
}
public TradingEngine(long delay, long timeout, Listener listener,
boolean runInActorMode) throws NamingException {
this.delay = delay;
this.timeout = timeout;
this.listener = listener;
this.runInActorMode = runInActorMode;
LOGGER.debug("market is opening for trading!");
}
public void run() {
while (running) {
LOGGER.debug("\n\n------------------------------- trading...-------------------------");
long start = System.currentTimeMillis();
prepareMarket();
List<Sale> sales = market.trade();
LOGGER.info("trading completed");
noteMarketPricesAndVolumes(sales);
try {
persistSale(sales);
} catch (Exception e) {
LOGGER.error("failed to persist sales: " + sales, e);
}
LOGGER.info("persisting completed, notifying involved parties...");
sales.stream().forEach(sale -> {
if (sale.getBuyer().listener != null)
sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale);
if (sale.getSeller().listener != null)
sale.getSeller().listener.onEvent(EventType.SALE, sale);
});
if (!sales.isEmpty()) {
LOGGER.warn("trading of " + sales.size()
+ " sales completed and persisted in "
+ (System.currentTimeMillis() - start) + "ms");
} else {
LOGGER.info("no trades...");
}
// debug(self.market, 10, false);
if (listener != null)
this.updateMarketVolume(null); // removes outdated data
listener.onEvent(EventType.STATS,
new Object[] { market.getMarketInfo(), this.marketPrices,
this.volumeRecords });
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (runInActorMode) {
break;
}
}
listener.onEvent(EventType.STOPPED, null);
}
public void stop() {
this.running = false;
}
/**
* @method @return a VolumeRecord, just with no timestamp. properties are
* total in last minute.
*/
public VolumeRecord getCurrentVolume(String productId) {
List<VolumeRecord> vrs = this.volumeRecords.get(productId);
if (vrs != null) {
long now = System.currentTimeMillis();
vrs = vrs.stream().filter(vr -> {
return now - vr.timestamp.getTime() < 1000 * 10;
}).collect(Collectors.toList()); // remove old
this.volumeRecords.put(productId, vrs); // ensure records contains
// most up to date
// aggregate
VolumeRecord vr = new VolumeRecord(productId, 0, 0, null, 0);
vr = vrs.stream().reduce(vr, VolumeRecord::add);
return vr;
} else {
return new VolumeRecord(productId, 0, 0, null, 0);
}
}
/** @method @return the last known price */
public MarketPrice getCurrentMarketPrice(String productId) {
return this.marketPrices.get(productId);
}
// handles timed out orders
private void prepareMarket() {
// handle timeouted sales orders
market.getSellers().forEach(
seller -> {
List<SalesOrder> incompleteSOs = seller
.removeOutdatedSalesOrders(timeout);
incompleteSOs.forEach(so -> {
if (so.getSeller().listener != null)
so.getSeller().listener.onEvent(
EventType.TIMEOUT_SALESORDER, so);
else
LOGGER.debug("incomplete SO: " + so);
});
});
// handle timeouted purchase orders
market.getBuyers().forEach(
buyer -> {
List<PurchaseOrder> incompletePOs = buyer
.removeOutdatedPurchaseOrders(timeout);
incompletePOs.forEach(po -> {
if (po.getBuyer().listener != null)
po.getBuyer().listener.onEvent(
EventType.TIMEOUT_PURCHASEORDER, po);
else
LOGGER.debug("incomplete PO: " + po);
});
});
if (!runInActorMode) {
// add new SOs and POs
synchronized (newSalesOrders) {
newSalesOrders.forEach((seller, sos) -> {
if (!this.market.getSellers().contains(seller)) {
LOGGER.debug("seller named " + seller.getName()
+ " doesnt exist -> adding a new one");
this.market.addSeller(seller);
seller.listener = listener;
} else {
// swap temp seller with the actual one in the market
seller = this.market.getSellers().get(
this.market.getSellers().indexOf(seller));
}
final Seller fSeller = seller;
sos.forEach(so -> {
fSeller.addSalesOrder(so);
});
});
newSalesOrders.clear();
}
synchronized (newPurchaseOrders) {
newPurchaseOrders.forEach((buyer, pos) -> {
if (!this.market.getBuyers().contains(buyer)) {
LOGGER.debug("buyer named " + buyer.getName()
+ " doesnt exist -> adding a new one");
this.market.addBuyer(buyer);
buyer.listener = listener;
} else {
// swap temp buyer with the actual one in the market
buyer = this.market.getBuyers().get(
this.market.getBuyers().indexOf(buyer));
}
final Buyer fBuyer = buyer;
pos.forEach(po -> {
fBuyer.addPurchaseOrder(po);
});
});
newPurchaseOrders.clear();
}
}
}
private void persistSale(List<Sale> sales) throws Exception {
if (!sales.isEmpty()) {
LOGGER.info("preparing to persist sales");
DataSource ds = (DataSource) ctx.lookup("java:comp/env/jdbc/mysql");
try (Connection c = ds.getConnection()) {
PreparedStatement stmt = c
.prepareStatement("INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) "
+ "values (?, ?, ?, ?, ?, ?, ?)");
sales.forEach(sale -> {
try {
int i = 1;
stmt.setString(i++, sale.getBuyer().getName());
stmt.setString(i++, sale.getSeller().getName());
stmt.setInt(i++, Integer.parseInt(sale.getProductId()));
stmt.setDouble(i++, sale.getPrice());
stmt.setInt(i++, sale.getQuantity());
stmt.setInt(i++, sale.getPurchaseOrder().getId());
stmt.setInt(i++, sale.getSalesOrder().getId());
if (stmt.execute()) {
ResultSet rs = stmt.getGeneratedKeys();
rs.next();
sale.setId(rs.getInt(1));
rs.close();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
}
}
}
private void noteMarketPricesAndVolumes(List<Sale> sales) {
sales.forEach(sale -> {
updateMarketPrice(sale);
updateMarketVolume(sale);
});
}
public static class MarketPrice {
private String productId;
private double price;
private Date timestamp;
public MarketPrice(String productId, double price, Date timestamp) {
this.productId = productId;
this.price = price;
this.timestamp = timestamp;
}
public double getPrice() {
return price;
}
public String getProductId() {
return productId;
}
public Date getTimestamp() {
return timestamp;
}
}
private void updateMarketPrice(Sale sale) {
MarketPrice mp = marketPrices.get(sale.getProductId());
if (mp == null
|| (mp != null && mp.getTimestamp().getTime() < sale
.getTimestamp().getTime())) {
// set price if none is known, or replace price if its older than
// current price
marketPrices.put(
sale.getProductId(),
new MarketPrice(sale.getProductId(), sale.getPrice(), sale
.getTimestamp()));
}
}
public static class VolumeRecord {
public static final VolumeRecord EMPTY = new VolumeRecord(null, 0, 0,
null, 0);
public String productId;
public int numberOfSales;
public double turnover;
public Date timestamp;
public int count;
public VolumeRecord(String productId, int numberOfSales,
double turnover, Date timestamp, int count) {
this.productId = productId;
this.numberOfSales = numberOfSales;
this.turnover = turnover;
this.timestamp = timestamp;
this.count = count;
}
public static VolumeRecord add(VolumeRecord a, VolumeRecord b) {
return new VolumeRecord(b.productId, a.numberOfSales
+ b.numberOfSales, a.turnover + b.turnover, null, a.count
+ b.count);
}
public static VolumeRecord aggregate(List<VolumeRecord> vrs) {
VolumeRecord vr = EMPTY;
vr = vrs.stream().reduce(vr, VolumeRecord::add);
return vr;
}
}
private void updateMarketVolume(Sale sale) {
// //////////////
// remove old ones
// //////////////
Map<String, List<VolumeRecord>> newVolumeRecords = new HashMap<>();
long now = System.currentTimeMillis();
volumeRecords.forEach((k, v) -> {
List<VolumeRecord> vrs = v.stream().filter(vr -> {
return now - vr.timestamp.getTime() < 1000 * 10;
}).collect(Collectors.toList()); // remove older than 10 secs
newVolumeRecords.put(k, vrs);
});
volumeRecords = newVolumeRecords; // replace the old ones
// //////////////
// add new data
// //////////////
if (sale != null) {
List<VolumeRecord> vrs = volumeRecords.get(sale.getProductId());
if (vrs == null) {
vrs = new ArrayList<>();
}
vrs.add(new VolumeRecord(sale.getProductId(), sale.getQuantity(),
sale.getQuantity() * sale.getPrice(), sale.getTimestamp(),
1)); // scale up to "per minute"
volumeRecords.put(sale.getProductId(), vrs); // replace with old one
}
}
public PurchaseOrder addPurchaseOrder(String who, String productId,
int quantity, int id) {
if (runInActorMode) {
Buyer buyer = new Buyer(who);
if (!this.market.getBuyers().contains(buyer)) {
LOGGER.debug("buyer named " + who
+ " doesnt exist -> adding a new one");
this.market.addBuyer(buyer);
buyer.listener = listener;
} else {
// swap temp buyer with the actual one in the market
buyer = this.market.getBuyers().get(
this.market.getBuyers().indexOf(buyer));
}
PurchaseOrder po = new PurchaseOrder(productId, quantity, 9999.9,
id);
buyer.addPurchaseOrder(po);
return po;
} else {
synchronized (newPurchaseOrders) {
Buyer b = new Buyer(who);
List<PurchaseOrder> pos = newPurchaseOrders.get(b);
if (pos == null) {
pos = new ArrayList<>();
newPurchaseOrders.put(b, pos);
}
PurchaseOrder po = new PurchaseOrder(productId, quantity,
9999.9, id);
pos.add(po);
return po;
}
}
}
public SalesOrder addSalesOrder(String who, String productId, int quantity,
double price, int id) {
if (runInActorMode) {
Seller seller = new Seller(who);
if (!this.market.getSellers().contains(seller)) {
LOGGER.debug("seller named " + who
+ " doesnt exist -> adding a new one");
this.market.addSeller(seller);
seller.listener = listener;
} else {
// swap temp seller with the actual one in the market
seller = this.market.getSellers().get(
this.market.getSellers().indexOf(seller));
}
SalesOrder so = new SalesOrder(price, productId, quantity, id);
seller.addSalesOrder(so);
return so;
} else {
synchronized (newSalesOrders) {
Seller s = new Seller(who);
List<SalesOrder> sos = newSalesOrders.get(s);
if (sos == null) {
sos = new ArrayList<>();
newSalesOrders.put(s, sos);
}
SalesOrder so = new SalesOrder(price, productId, quantity, id);
sos.add(so);
return so;
}
}
}
}
package ch.maxant.tradingengine.web;
import static ch.maxant.tradingengine.web.Constants.DELAY;
import static ch.maxant.tradingengine.web.Constants.ID;
import static ch.maxant.tradingengine.web.Constants.NUM_KIDS;
import static ch.maxant.tradingengine.web.Constants.PRODUCT_IDS;
import static ch.maxant.tradingengine.web.Constants.TIMEOUT;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.NamingException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import ch.maxant.tradingengine.model.PurchaseOrder;
import ch.maxant.tradingengine.model.Sale;
import ch.maxant.tradingengine.model.SalesOrder;
import ch.maxant.tradingengine.model.TradingEngine.EventType;
import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord;
import ch.maxant.tradingengine.model.TradingEngineThread;
@WebServlet(urlPatterns = { "/sell", "/buy", "/result" })
public class TradingEngineServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager
.getLogger("tradingEngineServlet");
private static Stats stats = new Stats();
private static final Map<String, TradingEngineThread> kids = new HashMap<>();
private static final Map<String, Result> results = new ConcurrentHashMap<>();
private static final Set<String> knownProducts = Collections
.synchronizedSet(new HashSet<>());
private static final AtomicInteger timedoutSales = new AtomicInteger(0);
static {
try {
int chunk = PRODUCT_IDS.length / NUM_KIDS;
for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) {
String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i
+ chunk);
LOGGER.info("created engine for products " + temparray);
TradingEngineThread engineThread = new TradingEngineThread(
DELAY, TIMEOUT, (type, data) -> event(type, data));
for (int k = 0; k < temparray.length; k++) {
LOGGER.debug("mapping productId '" + temparray[k]
+ "' to engine " + i);
kids.put(temparray[k], engineThread);
}
LOGGER.info("---started trading");
engineThread.start();
}
} catch (NamingException e) {
LOGGER.error("failed to start engine", e);
}
// remove results older than a minute, every 5 seconds.
// in a real system you wouldnt necessarily cache results like
// we are doing - the sales are actually persisted by the
// trading engine - so clients could go look there!
new Timer("cleaner", true).scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
LOGGER.error("cleaning results... sales per minute: "
+ stats.totalSalesPerMinute + ", "
+ timedoutSales.get() + " timedout orders");
long now = System.currentTimeMillis();
List<String> toRemove = new ArrayList<>();
results.forEach((k, v) -> {
if (now - v.created > 60000) {
toRemove.add(k);
}
});
toRemove.forEach(k -> results.remove(k));
LOGGER.info("completed cleaning results in "
+ (System.currentTimeMillis() - now) + "ms");
}
}, 5000L, 5000L);
}
public static synchronized void event(final EventType type,
final Object data) {
switch (type) {
case SALE: {
Sale sale = (Sale) data;
int id = sale.getSalesOrder().getId();
results.put(String.valueOf(id), new Result(String.valueOf(data)));
if (sale.getSalesOrder().getRemainingQuantity() == 0) {
String msg = "COMPLETED sales order";
LOGGER.info("\n" + id + ") " + msg + " " + data);
} else {
LOGGER.info("\n" + id + ") PARTIAL sales order " + data);
}
break;
}
case PURCHASE: {
Sale sale = (Sale) data;
int id = sale.getPurchaseOrder().getId();
results.put(String.valueOf(id), new Result(String.valueOf(data)));
if (sale.getPurchaseOrder().getRemainingQuantity() == 0) {
String msg = "COMPLETED purchase order";
LOGGER.info("\n" + id + ") " + msg + " " + data);
} else {
LOGGER.info("\n" + id + ") PARTIAL purchase order " + data);
}
break;
}
case TIMEOUT_SALESORDER: {
timedoutSales.incrementAndGet();
SalesOrder so = (SalesOrder) data;
String msg = "TIMEOUT sales order";
LOGGER.info("\n" + so.getId() + ") " + msg + " " + data);
break;
}
case TIMEOUT_PURCHASEORDER: {
timedoutSales.incrementAndGet();
PurchaseOrder po = (PurchaseOrder) data;
String msg = "TIMEOUT purchase order";
LOGGER.info("\n" + po.getId() + ") " + msg + " " + data);
break;
}
case STATS: {
synchronized (knownProducts) {
Map<String, List<VolumeRecord>> mapOfVolumeRecords = (Map<String, List<VolumeRecord>>) ((Object[]) data)[2];
stats.totalSalesPerMinute = knownProducts
.stream()
.map(productId -> {
return VolumeRecord.aggregate(mapOfVolumeRecords
.getOrDefault(productId,
Collections.emptyList())).count;
}).reduce(Integer::sum).orElse(0) * 6; // since stats
// are
// recorded
// for the last
// 10 secs
}
break;
}
default:
break;
}
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
String path = req.getServletPath();
LOGGER.debug("received command: '" + path + "'");
String who = req.getParameter("userId");
String productId = req.getParameter("productId");
int quantity = Integer.parseInt(req.getParameter("quantity"));
TradingEngineThread engine = kids.get(productId);
knownProducts.add(productId);
int id = ID.getAndIncrement();
// /buy?productId=1&quantity=10&userId=ant
if (path.equals("/buy")) {
PurchaseOrder po = engine.addPurchaseOrder(who, productId,
quantity, id);
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(po));
} else if (path.equals("/sell")) {
double price = Double.parseDouble(req.getParameter("price"));
SalesOrder so = engine.addSalesOrder(who, productId, quantity,
price, id);
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(so));
} else if (path.equals("/result")) {
String key = req.getParameter("id");
Result r = results.get(key);
if (r != null) {
results.remove(key);
resp.getWriter().write(r.data);
} else {
resp.getWriter().write("UNKNOWN OR PENDING");
}
} else {
String msg = "Unknown command " + path;
LOGGER.warn(msg);
}
}
private static class Stats {
int totalSalesPerMinute;
}
private static class Result {
String data;
long created;
Result(String data) {
this.data = data;
this.created = System.currentTimeMillis();
}
}
}
package ch.maxant.tradingengine.web;
import static ch.maxant.tradingengine.web.Constants.DELAY;
import static ch.maxant.tradingengine.web.Constants.ID;
import static ch.maxant.tradingengine.web.Constants.NUM_KIDS;
import static ch.maxant.tradingengine.web.Constants.PRODUCT_IDS;
import static ch.maxant.tradingengine.web.Constants.TIMEOUT;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.NamingException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import ch.maxant.tradingengine.model.Buyer;
import ch.maxant.tradingengine.model.PurchaseOrder;
import ch.maxant.tradingengine.model.Sale;
import ch.maxant.tradingengine.model.SalesOrder;
import ch.maxant.tradingengine.model.Seller;
import ch.maxant.tradingengine.model.TradingEngine;
import ch.maxant.tradingengine.model.TradingEngine.EventType;
import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord;
@WebServlet(urlPatterns = { "/sell2", "/buy2", "/result2" })
public class TradingEngineServletWithActors extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager
.getLogger("tradingEngineServletWithActors");
private static final ActorSystem teSystem = ActorSystem
.create("TradingEngines");
private static final Stats stats = new Stats();
private static final Map<String, ActorRef> kids = new HashMap<>();
private static final Map<String, Result> results = new ConcurrentHashMap<>();
private static final Set<String> knownProducts = Collections
.synchronizedSet(new HashSet<>());
private static final AtomicInteger timedoutSales = new AtomicInteger(0);
static {
int chunk = PRODUCT_IDS.length / NUM_KIDS;
for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) {
String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i + chunk);
LOGGER.info("created engine for products " + temparray);
ActorRef actor = teSystem.actorOf(
Props.create(TradingEngineActor.class), "engine-" + i);
for (int k = 0; k < temparray.length; k++) {
LOGGER.debug("mapping productId '" + temparray[k]
+ "' to engine " + i);
kids.put(temparray[k], actor);
}
LOGGER.info("---started trading");
actor.tell(TradingEngineActor.RUN, ActorRef.noSender());
}
// remove results older than a minute, every 5 seconds.
// in a real system you wouldnt necessarily cache results like
// we are doing - the sales are actually persisted by the
// trading engine - so clients could go look there!
new Timer("cleaner", true).scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
LOGGER.error("cleaning results... sales per minute: "
+ stats.totalSalesPerMinute + ", "
+ timedoutSales.get() + " timedout orders");
long now = System.currentTimeMillis();
List<String> toRemove = new ArrayList<>();
results.forEach((k, v) -> {
if (now - v.created > 60000) {
toRemove.add(k);
}
});
toRemove.forEach(k -> results.remove(k));
LOGGER.info("completed cleaning results in "
+ (System.currentTimeMillis() - now) + "ms");
}
}, 5000L, 5000L);
}
public static synchronized void event(final EventType type,
final Object data) {
switch (type) {
case SALE: {
Sale sale = (Sale) data;
int id = sale.getSalesOrder().getId();
results.put(String.valueOf(id), new Result(String.valueOf(data)));
if (sale.getSalesOrder().getRemainingQuantity() == 0) {
String msg = "COMPLETED sales order";
LOGGER.info("\n" + id + ") " + msg + " " + data);
} else {
LOGGER.info("\n" + id + ") PARTIAL sales order " + data);
}
break;
}
case PURCHASE: {
Sale sale = (Sale) data;
int id = sale.getPurchaseOrder().getId();
results.put(String.valueOf(id), new Result(String.valueOf(data)));
if (sale.getPurchaseOrder().getRemainingQuantity() == 0) {
String msg = "COMPLETED purchase order";
LOGGER.info("\n" + id + ") " + msg + " " + data);
} else {
LOGGER.info("\n" + id + ") PARTIAL purchase order " + data);
}
break;
}
case TIMEOUT_SALESORDER: {
timedoutSales.incrementAndGet();
SalesOrder so = (SalesOrder) data;
String msg = "TIMEOUT sales order";
LOGGER.info("\n" + so.getId() + ") " + msg + " " + data);
break;
}
case TIMEOUT_PURCHASEORDER: {
timedoutSales.incrementAndGet();
PurchaseOrder po = (PurchaseOrder) data;
String msg = "TIMEOUT purchase order";
LOGGER.info("\n" + po.getId() + ") " + msg + " " + data);
break;
}
case STATS: {
// an alternative approach for dealing with concurrency problems
// like ConcurrentModificationException is to copy the data, but it
// only works when handling read only data
Set<String> knownProductsCopy = new HashSet<>(knownProducts);
Map<String, List<VolumeRecord>> mapOfVolumeRecords = new HashMap(
(Map<String, List<VolumeRecord>>) ((Object[]) data)[2]);
stats.totalSalesPerMinute = knownProductsCopy
.stream()
.map(productId -> {
return VolumeRecord.aggregate(new ArrayList(
mapOfVolumeRecords.getOrDefault(productId,
Collections.emptyList()))).count;
}).reduce(Integer::sum).orElse(0) * 6;
// times 6 since stats are recorded for only the last ten secs,
// and we want them per minute
break;
}
default:
break;
}
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
String path = req.getServletPath();
LOGGER.debug("received command: '" + path + "'");
String who = req.getParameter("userId");
String productId = req.getParameter("productId");
int quantity = Integer.parseInt(req.getParameter("quantity"));
ActorRef engine = kids.get(productId);
knownProducts.add(productId);
int id = ID.getAndIncrement();
// /buy?productId=1&quantity=10&userId=ant
if (path.equals("/buy2")) {
PurchaseOrder po = new PurchaseOrder(productId, quantity, 9999.9,
id);
po.setBuyer(new Buyer(who));
engine.tell(po, ActorRef.noSender());
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(po));
} else if (path.equals("/sell2")) {
double price = Double.parseDouble(req.getParameter("price"));
SalesOrder so = new SalesOrder(price, productId, quantity, id);
so.setSeller(new Seller(who));
engine.tell(so, ActorRef.noSender());
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(so));
} else if (path.equals("/result2")) {
String key = req.getParameter("id");
Result r = results.get(key);
if (r != null) {
results.remove(key);
resp.getWriter().write(r.data);
} else {
resp.getWriter().write("UNKNOWN OR PENDING");
}
} else {
String msg = "Unknown command " + path;
LOGGER.warn(msg);
}
}
private static class Stats {
int totalSalesPerMinute;
}
private static class Result {
String data;
long created;
Result(String data) {
this.data = data;
this.created = System.currentTimeMillis();
}
}
/**
* using actors, we guarantee that only ever one thread accesses our trading
* engine at any one time, and so we avoid having to synchronize!
*/
private static class TradingEngineActor extends AbstractActor {
private static final String RUN = "RUN";
// STATE
private TradingEngine engine = new TradingEngine(DELAY, TIMEOUT, (type,
data) -> handle(type, data), true);
public TradingEngineActor() throws NamingException {
// INBOX
receive(ReceiveBuilder
.match(SalesOrder.class,
so -> {
// BEHAVIOUR (delegated to engine)
engine.addSalesOrder(so.getSeller().getName(),
so.getProductId(),
so.getRemainingQuantity(),
so.getPrice(), so.getId());
})
.match(PurchaseOrder.class,
po -> {
// BEHAVIOUR (delegated to engine)
engine.addPurchaseOrder(
po.getBuyer().getName(),
po.getProductId(),
po.getRemainingQuantity(), po.getId());
})
.match(String.class, s -> RUN.equals(s), command -> {
engine.run();
})
.matchAny(
o -> System.err
.println("received unknown message: " + o))
.build());
}
private void handle(EventType type, Object data) {
event(type, data);
if (type.equals(EventType.STOPPED)) {
self().tell(RUN, ActorRef.noSender()); // start another trading
// engine!
}
}
}
}
package ch.maxant.tradingengine.model;
import javax.naming.NamingException;
import ch.maxant.tradingengine.model.TradingEngine.Listener;
import ch.maxant.tradingengine.model.TradingEngine.MarketPrice;
import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord;
/**
* a simple delegate which caches buyers and sellers, just like the node.js
* child processes do.
*/
public class TradingEngineThread extends Thread {
private static int ID = 0;
private final TradingEngine engine;
public TradingEngineThread(long delay, long timeout, Listener listener)
throws NamingException {
super("engine-" + ID++);
engine = new TradingEngine(delay, timeout, listener);
}
@Override
public void run() {
engine.run();
}
public PurchaseOrder addPurchaseOrder(String who, String productId,
int quantity, int id) {
return engine.addPurchaseOrder(who, productId, quantity, id);
}
public SalesOrder addSalesOrder(String who, String productId, int quantity,
double price, int id) {
return engine.addSalesOrder(who, productId, quantity, price, id);
}
public VolumeRecord getCurrentVolume(String productId) {
return engine.getCurrentVolume(productId);
}
public MarketPrice getMarketPrice(String productId) {
return engine.getCurrentMarketPrice(productId);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment