Code for the Java trading engine solution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import ch.maxant.tradingengine.model.TradingEngine.Listener; | |
public class Buyer { | |
private static final Logger LOGGER = LogManager.getLogger("buyer"); | |
private String name; | |
private List<PurchaseOrder> purchaseOrders = new ArrayList<>(); | |
public Listener listener; | |
public Buyer(String name) { | |
this.name = name; | |
} | |
public String getName() { | |
return name; | |
} | |
public void addPurchaseOrder(PurchaseOrder purchaseOrder) { | |
LOGGER.debug(name + " adding " + purchaseOrder); | |
purchaseOrder.setBuyer(this); | |
this.purchaseOrders.add(purchaseOrder); | |
} | |
/** | |
* @return {Array} all the {@link PurchaseOrder}s for the given product, | |
* where the maximum acceptable price is more than the given price | |
*/ | |
public List<PurchaseOrder> getRelevantPurchaseOrders(String productId, | |
double price) { | |
return this.purchaseOrders | |
.stream() | |
.filter(po -> { | |
return po.getProductId().equals(productId) | |
&& po.getMaximumAcceptedPrice() >= price; | |
}).collect(Collectors.toList()); | |
} | |
public void removePurchaseOrder(PurchaseOrder purchaseOrder) { | |
this.purchaseOrders.remove(purchaseOrder); | |
} | |
public List<PurchaseOrder> removeOutdatedPurchaseOrders(long ageInMs) { | |
long now = System.currentTimeMillis(); | |
List<PurchaseOrder> filter = this.purchaseOrders.stream() | |
.filter(po -> { | |
return now - po.getCreated().getTime() > ageInMs; | |
}).collect(Collectors.toList()); | |
this.purchaseOrders.removeAll(filter); | |
return filter; | |
} | |
public List<PurchaseOrder> getPurchaseOrders() { | |
return purchaseOrders; | |
} | |
@Override | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
result = prime * result + ((name == null) ? 0 : name.hashCode()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
Buyer other = (Buyer) obj; | |
if (name == null) { | |
if (other.name != null) | |
return false; | |
} else if (!name.equals(other.name)) | |
return false; | |
return true; | |
} | |
@Override | |
public String toString() { | |
return "Buyer [name=" + name + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.web; | |
import java.util.concurrent.atomic.AtomicInteger; | |
public final class Constants { | |
// TODO use config to decide how engines to start | |
public static final int NUM_KIDS = 4; | |
public static final AtomicInteger ID = new AtomicInteger(); | |
public static final String[] PRODUCT_IDS = { "0", "1", "2", "3", "4", "5", | |
"6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", | |
"18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", | |
"29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", | |
"40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "50", | |
"51", "52", "53", "54", "55", "56", "57", "58", "59", "60", "61", | |
"62", "63", "64", "65", "66", "67", "68", "69", "70", "71", "72", | |
"73", "74", "75", "76", "77", "78", "79", "80", "81", "82", "83", | |
"84", "85", "86", "87", "88", "89", "90", "91", "92", "93", "94", | |
"95", "96", "97", "98", "99" }; | |
public static final long DELAY = 3; // how many milliseconds between | |
// trading | |
// sessions | |
public static final long TIMEOUT = 60000; // num ms after which incomplete | |
// SOs and POs should be removed | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Optional; | |
import java.util.Set; | |
import java.util.stream.Collectors; | |
import org.apache.commons.lang3.mutable.MutableBoolean; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
/** | |
* A market contains 0..n sellers. A seller has 0..i sales orders each of which | |
* contains a quantity of a product at a certain price. The seller is prepared | |
* to sell their product for that price. The market also contains 0..m buyers. | |
* Each buyer has 0..j purchase orders. A purchase order is for a given product | |
* and quantity. The purchase price may not exceed the given price. The market | |
* works by continuously looping through trade sittings. See the {@link #trade} | |
* method. | |
*/ | |
public class Market { | |
private static final Logger LOGGER = LogManager.getLogger("market"); | |
private List<Seller> sellers = new ArrayList<>(); | |
private List<Buyer> buyers = new ArrayList<>(); | |
private MarketInfo marketInfo; | |
public void addSeller(Seller seller) { | |
this.sellers.add(seller); | |
} | |
public void addBuyer(Buyer buyer) { | |
this.buyers.add(buyer); | |
} | |
/** | |
* At a single trade sitting, the following happens: 1) find all products | |
* available (on offer by sellers) 2) for each product: 2a) for each buyer | |
* interested in that product: 2ai) find the seller with the cheapest price | |
* for the current product 2aii) if such a seller exists, create a sale, | |
* otherwise nobody is selling the product anymore, so skip to next product. | |
* | |
* The point is that a buyer always goes to the cheapest seller, even if | |
* that seller doesnt have enough quantity. A buyer who wants more has to | |
* wait until the next trading session to find the next most suitable | |
* seller. | |
* | |
* @return {Array} array of {@link Sale}s in this trade | |
*/ | |
public List<Sale> trade() { | |
List<Sale> sales = new ArrayList<>(); | |
Set<String> productsInMarket = getProductsInMarket(); | |
this.collectMarketInfo(); | |
// trade each product in succession | |
productsInMarket | |
.stream() | |
.forEach( | |
productId -> { | |
MutableBoolean soldOutOfProduct = new MutableBoolean( | |
false); | |
LOGGER.debug("trading product " + productId); | |
List<Buyer> buyersInterestedInProduct = getBuyersInterestedInProduct(productId); | |
if (buyersInterestedInProduct.size() == 0) { | |
LOGGER.info("no buyers interested in product " | |
+ productId); | |
} else { | |
buyersInterestedInProduct.forEach(buyer -> { | |
if (soldOutOfProduct.isFalse()) { | |
LOGGER.debug(" buyer " | |
+ buyer.getName() | |
+ " is searching for product " | |
+ productId); | |
// select the cheapest seller | |
Optional<Seller> cheapestSeller = sellers | |
.stream() | |
.filter(seller -> { | |
return seller | |
.hasProduct(productId); | |
}) | |
.sorted((s1, s2) -> Double | |
.compare( | |
s1.getCheapestSalesOrder( | |
productId) | |
.getPrice(), | |
s2.getCheapestSalesOrder( | |
productId) | |
.getPrice())) | |
.findFirst(); | |
if (cheapestSeller.isPresent()) { | |
LOGGER.debug(" cheapest seller is " | |
+ cheapestSeller.get() | |
.getName()); | |
List<Sale> newSales = createSale( | |
buyer, | |
cheapestSeller.get(), | |
productId); | |
sales.addAll(newSales); | |
LOGGER.debug(" sales completed"); | |
} else { | |
LOGGER.warn(" market sold out of product " | |
+ productId); | |
soldOutOfProduct.setTrue(); | |
} | |
} | |
}); | |
} | |
}); | |
return sales; | |
}; | |
public void collectMarketInfo() { | |
this.marketInfo = new MarketInfo(); | |
this.marketInfo.pos = buyers.stream().map(buyer -> { | |
return buyer.getPurchaseOrders(); | |
}).flatMap(l -> l.stream()).collect(Collectors.groupingBy(po -> { | |
return po.getProductId(); | |
})); | |
this.marketInfo.sos = sellers.stream().map(seller -> { | |
return seller.getSalesOrders(); | |
}).flatMap(l -> l.stream()).collect(Collectors.groupingBy(so -> { | |
return so.getProductId(); | |
})); | |
/* | |
* for(String productId : this.marketInfo.pos.keySet()){ | |
* this.marketInfo.pos.put(productId) = | |
* this.marketInfo.pos[productId].length; } for(var productId : | |
* this.marketInfo.sos){ this.marketInfo.sos[productId] = | |
* this.marketInfo.sos[productId].length; } | |
*/ | |
}; | |
/** | |
* creates a sale if the prices is within the buyers budget. iterates all of | |
* the buyers purchase wishes for the given product so long as the seller | |
* still has the product. | |
* | |
* @return {Array} array of new {@link Sale}s, after having removed a | |
* quantity of the product from the seller/buyer. | |
*/ | |
public List<Sale> createSale(Buyer buyer, Seller seller, String productId) { | |
SalesOrder cheapestSalesOrder = seller.getCheapestSalesOrder(productId); | |
LOGGER.debug("cheapest sales order " + cheapestSalesOrder); | |
// find the buyers purchase orders, where the po.price => | |
// cheapestSalesOrder.price | |
// create a sale for each buyer's purchase order | |
// until either the seller has no more stock at this price | |
// or the buyer has bought all they want | |
List<PurchaseOrder> purchaseOrders = buyer.getRelevantPurchaseOrders( | |
productId, cheapestSalesOrder.getPrice()); | |
LOGGER.debug("relevant purchase orders: " + purchaseOrders); | |
List<Sale> sales = new ArrayList<>(); | |
purchaseOrders.stream().forEach( | |
purchaseOrder -> { | |
int quantity = Math.min( | |
cheapestSalesOrder.getRemainingQuantity(), | |
purchaseOrder.getRemainingQuantity()); | |
LOGGER.debug("quantity " + quantity + " for PO: " | |
+ purchaseOrder); | |
if (quantity > 0) { | |
Sale sale = new Sale(buyer, seller, productId, | |
cheapestSalesOrder.getPrice(), quantity); | |
// add PO and SO for events | |
sale.setPurchaseOrder(purchaseOrder); | |
sale.setSalesOrder(cheapestSalesOrder); | |
sales.add(sale); | |
LOGGER.debug("created sale: " + sale); | |
// adjust quantities purchaseOrder.remainingQuantity -= | |
// quantity; | |
cheapestSalesOrder.reduceRemainingQuantity(quantity); | |
// remove completed purchase wishes | |
if (purchaseOrder.getRemainingQuantity() == 0) { | |
LOGGER.debug("PO complete: " + sale); | |
buyer.removePurchaseOrder(purchaseOrder); | |
} | |
} | |
}); | |
// remove completed sales orders | |
if (cheapestSalesOrder.getRemainingQuantity() == 0) { | |
LOGGER.debug("SO complete: " + cheapestSalesOrder); | |
seller.removeSalesOrder(cheapestSalesOrder); | |
} | |
return sales; | |
} | |
/** | |
* @return all buyers in the market who have a purchase order for the given | |
* product | |
*/ | |
public List<Buyer> getBuyersInterestedInProduct(final String productId) { | |
return this.buyers.stream().filter(buyer -> { | |
return buyer.getPurchaseOrders().stream().anyMatch(po -> { | |
return po.getProductId().equals(productId); | |
}); | |
}).collect(Collectors.toList()); | |
} | |
/** @return all product IDs that are for sale in the market */ | |
public Set<String> getProductsInMarket() { | |
// TODO use flatmap, but also in js too! | |
Set<String> productsInMarket = new HashSet<>(); | |
sellers.forEach(seller -> { | |
seller.getSalesOrders().stream().forEach(salesOrder -> { | |
productsInMarket.add(salesOrder.getProductId()); | |
}); | |
}); | |
return productsInMarket; | |
} | |
public static class MarketInfo { | |
public Map<String, List<PurchaseOrder>> pos; | |
public Map<String, List<SalesOrder>> sos; | |
} | |
public MarketInfo getMarketInfo() { | |
return marketInfo; | |
} | |
public List<Seller> getSellers() { | |
return sellers; | |
} | |
public List<Buyer> getBuyers() { | |
return buyers; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.util.Collection; | |
import java.util.Collections; | |
import org.apache.commons.lang3.builder.EqualsBuilder; | |
import org.apache.commons.lang3.builder.HashCodeBuilder; | |
import org.apache.commons.lang3.builder.ToStringBuilder; | |
import org.apache.commons.lang3.builder.ToStringStyle; | |
public abstract class Model { | |
@Override | |
public boolean equals(Object o) { | |
return EqualsBuilder.reflectionEquals(this, o, getIgnoredFields()); | |
} | |
protected Collection<String> getIgnoredFields() { | |
return Collections.emptySet(); | |
} | |
@Override | |
public int hashCode() { | |
return HashCodeBuilder.reflectionHashCode(this, getIgnoredFields()); | |
} | |
@Override | |
public String toString() { | |
return ToStringBuilder.reflectionToString(this, | |
ToStringStyle.SHORT_PREFIX_STYLE); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
public class IdModel extends Model { | |
private int id; | |
public int getId() { | |
return id; | |
} | |
public void setId(int id) { | |
this.id = id; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.util.Date; | |
public class PurchaseOrder extends IdModel { | |
private String productId; | |
private int remainingQuantity; | |
private int originalQuantity; | |
private double maximumAcceptedPrice; | |
private Date created; | |
private Buyer buyer; | |
public PurchaseOrder(String productId, int quantity, | |
double maximumAcceptedPrice, int id) { | |
this.productId = productId; | |
this.remainingQuantity = quantity; | |
this.originalQuantity = quantity; | |
this.maximumAcceptedPrice = maximumAcceptedPrice; | |
this.created = new Date(); | |
setId(id); | |
} | |
public void setBuyer(Buyer buyer) { | |
this.buyer = buyer; | |
} | |
public String getProductId() { | |
return productId; | |
} | |
public Buyer getBuyer() { | |
return buyer; | |
} | |
public Date getCreated() { | |
return created; | |
} | |
public double getMaximumAcceptedPrice() { | |
return maximumAcceptedPrice; | |
} | |
public int getOriginalQuantity() { | |
return originalQuantity; | |
} | |
public int getRemainingQuantity() { | |
return remainingQuantity; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.util.Date; | |
public class Sale extends IdModel { | |
private Date timestamp; | |
private Buyer buyer; | |
private Seller seller; | |
private String productId; | |
private double price; | |
private int quantity; | |
private SalesOrder salesOrder; | |
private PurchaseOrder purchaseOrder; | |
/** | |
* a sale from a seller to a buyer for the given product and price and | |
* quantity. | |
*/ | |
public Sale(Buyer buyer, Seller seller, String productId, double price, | |
int quantity) { | |
this.buyer = buyer; | |
this.seller = seller; | |
this.productId = productId; | |
this.price = price; | |
this.quantity = quantity; | |
this.timestamp = new Date(); | |
} | |
public Buyer getBuyer() { | |
return buyer; | |
} | |
public double getPrice() { | |
return price; | |
} | |
public String getProductId() { | |
return productId; | |
} | |
public int getQuantity() { | |
return quantity; | |
} | |
public Seller getSeller() { | |
return seller; | |
} | |
public Date getTimestamp() { | |
return timestamp; | |
} | |
public void setSalesOrder(SalesOrder salesOrder) { | |
this.salesOrder = salesOrder; | |
} | |
public SalesOrder getSalesOrder() { | |
return salesOrder; | |
} | |
public void setPurchaseOrder(PurchaseOrder purchaseOrder) { | |
this.purchaseOrder = purchaseOrder; | |
} | |
public PurchaseOrder getPurchaseOrder() { | |
return purchaseOrder; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.util.Date; | |
public class SalesOrder extends IdModel { | |
private double price; | |
private String productId; | |
private int remainingQuantity; | |
private int originalQuantity; | |
private Date created; | |
private Seller seller; | |
/** | |
* an order to sell a given quantity of a product at a given price | |
*/ | |
public SalesOrder(double price, String productId, int quantity, int id) { | |
this.price = price; | |
this.productId = productId; | |
this.remainingQuantity = quantity; | |
this.originalQuantity = quantity; | |
this.created = new Date(); | |
setId(id); | |
} | |
public Date getCreated() { | |
return created; | |
} | |
public int getRemainingQuantity() { | |
return remainingQuantity; | |
} | |
public String getProductId() { | |
return productId; | |
} | |
public double getPrice() { | |
return price; | |
} | |
public int getOriginalQuantity() { | |
return originalQuantity; | |
} | |
public Seller getSeller() { | |
return seller; | |
} | |
public void setSeller(Seller seller) { | |
this.seller = seller; | |
} | |
public void reduceRemainingQuantity(double quantity) { | |
this.remainingQuantity -= quantity; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.stream.Collectors; | |
import org.apache.commons.lang3.ObjectUtils; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import ch.maxant.tradingengine.model.TradingEngine.Listener; | |
public class Seller { | |
private static final Logger LOGGER = LogManager.getLogger("seller"); | |
private List<SalesOrder> salesOrders = new ArrayList<>(); | |
private String name; | |
public Listener listener; | |
public Seller(String name) { | |
this.name = name; | |
} | |
public void addSalesOrder(SalesOrder salesOrder) { | |
LOGGER.debug(name + " adding " + salesOrder); | |
salesOrder.setSeller(this); | |
this.salesOrders.add(salesOrder); | |
} | |
public boolean hasProduct(String productId) { | |
return this.salesOrders.stream().anyMatch(so -> { | |
return so.getProductId().equals(productId); | |
}); | |
} | |
/** | |
* @return {SalesOrder} the sales order for the given product that has the | |
* lowest price | |
*/ | |
public SalesOrder getCheapestSalesOrder(String productId) { | |
return this.salesOrders.stream().filter(so -> { | |
return so.getProductId().equals(productId); | |
}).sorted((o1, o2) -> Double.compare(o1.getPrice(), o2.getPrice())) | |
.findFirst().get(); | |
} | |
public void removeSalesOrder(SalesOrder salesOrder) { | |
this.salesOrders = this.salesOrders.stream().filter(so -> { | |
return !salesOrder.equals(so); | |
}).collect(Collectors.toList()); | |
} | |
/** @return the out of date ones */ | |
public List<SalesOrder> removeOutdatedSalesOrders(long ageInMs) { | |
long now = System.currentTimeMillis(); | |
Map<Boolean, List<SalesOrder>> partitioned = salesOrders.stream() | |
.collect(Collectors.groupingBy(so -> { | |
return now - so.getCreated().getTime() > ageInMs; | |
})); | |
this.salesOrders = ObjectUtils.defaultIfNull( | |
partitioned.get(Boolean.FALSE), | |
Collections.synchronizedList(new ArrayList<SalesOrder>())); | |
return ObjectUtils.defaultIfNull(partitioned.get(Boolean.TRUE), | |
Collections.synchronizedList(new ArrayList<SalesOrder>())); | |
} | |
public List<SalesOrder> getSalesOrders() { | |
return salesOrders; | |
} | |
public String getName() { | |
return name; | |
} | |
@Override | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
result = prime * result + ((name == null) ? 0 : name.hashCode()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
Seller other = (Seller) obj; | |
if (name == null) { | |
if (other.name != null) | |
return false; | |
} else if (!name.equals(other.name)) | |
return false; | |
return true; | |
} | |
@Override | |
public String toString() { | |
return "Seller [name=" + name + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import java.sql.Connection; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.stream.Collectors; | |
import javax.naming.InitialContext; | |
import javax.naming.NamingException; | |
import javax.sql.DataSource; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
// ///////////////////////////////////////////////// | |
// this file contains all classes related to a trading | |
// engine which uses a market to simulate a trading platform. | |
// ///////////////////////////////////////////////// | |
public class TradingEngine { | |
private static final Logger LOGGER = LogManager.getLogger("tradingEngine"); | |
public static interface Listener { | |
public void onEvent(EventType type, Object data); | |
} | |
public static enum EventType { | |
SALE, PURCHASE, TIMEOUT_SALESORDER, TIMEOUT_PURCHASEORDER, STATS, STOPPED | |
} | |
private Market market = new Market(); | |
private Map<String, MarketPrice> marketPrices = new HashMap<>(); | |
private Map<String, List<VolumeRecord>> volumeRecords = new HashMap<>(); | |
private InitialContext ctx = new InitialContext(); | |
private Map<Seller, List<SalesOrder>> newSalesOrders = new HashMap<>(); | |
private Map<Buyer, List<PurchaseOrder>> newPurchaseOrders = new HashMap<>(); | |
private long delay; | |
private long timeout; | |
private Listener listener; | |
private boolean running = true; | |
/** | |
* if false, then runs in an infinite loop until {@link #stop()} is called. | |
* if true, then just runs once, and notifies listener that its stopped when | |
* that one run is done, so that the listener can decide when to restart the | |
* engine for another trading session. | |
*/ | |
private boolean runInActorMode; | |
/** | |
* basically a buyer goes into the market at a time where they are happy to | |
* pay the market price. they take it from the cheapest seller (ie the | |
* market price). depending on who is left, the market price goes up or down | |
* | |
* a trading engine has one market place and it controls the frequency of | |
* trades. between trades: - sellers and buyers may enter and exit - all | |
* sales are persisted | |
* | |
* @param delay | |
* number of milliseconds between trades | |
* @param timeout | |
* the number of milliseconds after which incomplete sales or | |
* purchase orders should be removed and their buyer/seller | |
* informed of the (partial) failure. | |
* @throws NamingException | |
*/ | |
public TradingEngine(long delay, long timeout, Listener listener) | |
throws NamingException { | |
this(delay, timeout, listener, false); | |
} | |
public TradingEngine(long delay, long timeout, Listener listener, | |
boolean runInActorMode) throws NamingException { | |
this.delay = delay; | |
this.timeout = timeout; | |
this.listener = listener; | |
this.runInActorMode = runInActorMode; | |
LOGGER.debug("market is opening for trading!"); | |
} | |
public void run() { | |
while (running) { | |
LOGGER.debug("\n\n------------------------------- trading...-------------------------"); | |
long start = System.currentTimeMillis(); | |
prepareMarket(); | |
List<Sale> sales = market.trade(); | |
LOGGER.info("trading completed"); | |
noteMarketPricesAndVolumes(sales); | |
try { | |
persistSale(sales); | |
} catch (Exception e) { | |
LOGGER.error("failed to persist sales: " + sales, e); | |
} | |
LOGGER.info("persisting completed, notifying involved parties..."); | |
sales.stream().forEach(sale -> { | |
if (sale.getBuyer().listener != null) | |
sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale); | |
if (sale.getSeller().listener != null) | |
sale.getSeller().listener.onEvent(EventType.SALE, sale); | |
}); | |
if (!sales.isEmpty()) { | |
LOGGER.warn("trading of " + sales.size() | |
+ " sales completed and persisted in " | |
+ (System.currentTimeMillis() - start) + "ms"); | |
} else { | |
LOGGER.info("no trades..."); | |
} | |
// debug(self.market, 10, false); | |
if (listener != null) | |
this.updateMarketVolume(null); // removes outdated data | |
listener.onEvent(EventType.STATS, | |
new Object[] { market.getMarketInfo(), this.marketPrices, | |
this.volumeRecords }); | |
try { | |
Thread.sleep(delay); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
if (runInActorMode) { | |
break; | |
} | |
} | |
listener.onEvent(EventType.STOPPED, null); | |
} | |
public void stop() { | |
this.running = false; | |
} | |
/** | |
* @method @return a VolumeRecord, just with no timestamp. properties are | |
* total in last minute. | |
*/ | |
public VolumeRecord getCurrentVolume(String productId) { | |
List<VolumeRecord> vrs = this.volumeRecords.get(productId); | |
if (vrs != null) { | |
long now = System.currentTimeMillis(); | |
vrs = vrs.stream().filter(vr -> { | |
return now - vr.timestamp.getTime() < 1000 * 10; | |
}).collect(Collectors.toList()); // remove old | |
this.volumeRecords.put(productId, vrs); // ensure records contains | |
// most up to date | |
// aggregate | |
VolumeRecord vr = new VolumeRecord(productId, 0, 0, null, 0); | |
vr = vrs.stream().reduce(vr, VolumeRecord::add); | |
return vr; | |
} else { | |
return new VolumeRecord(productId, 0, 0, null, 0); | |
} | |
} | |
/** @method @return the last known price */ | |
public MarketPrice getCurrentMarketPrice(String productId) { | |
return this.marketPrices.get(productId); | |
} | |
// handles timed out orders | |
private void prepareMarket() { | |
// handle timeouted sales orders | |
market.getSellers().forEach( | |
seller -> { | |
List<SalesOrder> incompleteSOs = seller | |
.removeOutdatedSalesOrders(timeout); | |
incompleteSOs.forEach(so -> { | |
if (so.getSeller().listener != null) | |
so.getSeller().listener.onEvent( | |
EventType.TIMEOUT_SALESORDER, so); | |
else | |
LOGGER.debug("incomplete SO: " + so); | |
}); | |
}); | |
// handle timeouted purchase orders | |
market.getBuyers().forEach( | |
buyer -> { | |
List<PurchaseOrder> incompletePOs = buyer | |
.removeOutdatedPurchaseOrders(timeout); | |
incompletePOs.forEach(po -> { | |
if (po.getBuyer().listener != null) | |
po.getBuyer().listener.onEvent( | |
EventType.TIMEOUT_PURCHASEORDER, po); | |
else | |
LOGGER.debug("incomplete PO: " + po); | |
}); | |
}); | |
if (!runInActorMode) { | |
// add new SOs and POs | |
synchronized (newSalesOrders) { | |
newSalesOrders.forEach((seller, sos) -> { | |
if (!this.market.getSellers().contains(seller)) { | |
LOGGER.debug("seller named " + seller.getName() | |
+ " doesnt exist -> adding a new one"); | |
this.market.addSeller(seller); | |
seller.listener = listener; | |
} else { | |
// swap temp seller with the actual one in the market | |
seller = this.market.getSellers().get( | |
this.market.getSellers().indexOf(seller)); | |
} | |
final Seller fSeller = seller; | |
sos.forEach(so -> { | |
fSeller.addSalesOrder(so); | |
}); | |
}); | |
newSalesOrders.clear(); | |
} | |
synchronized (newPurchaseOrders) { | |
newPurchaseOrders.forEach((buyer, pos) -> { | |
if (!this.market.getBuyers().contains(buyer)) { | |
LOGGER.debug("buyer named " + buyer.getName() | |
+ " doesnt exist -> adding a new one"); | |
this.market.addBuyer(buyer); | |
buyer.listener = listener; | |
} else { | |
// swap temp buyer with the actual one in the market | |
buyer = this.market.getBuyers().get( | |
this.market.getBuyers().indexOf(buyer)); | |
} | |
final Buyer fBuyer = buyer; | |
pos.forEach(po -> { | |
fBuyer.addPurchaseOrder(po); | |
}); | |
}); | |
newPurchaseOrders.clear(); | |
} | |
} | |
} | |
private void persistSale(List<Sale> sales) throws Exception { | |
if (!sales.isEmpty()) { | |
LOGGER.info("preparing to persist sales"); | |
DataSource ds = (DataSource) ctx.lookup("java:comp/env/jdbc/mysql"); | |
try (Connection c = ds.getConnection()) { | |
PreparedStatement stmt = c | |
.prepareStatement("INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) " | |
+ "values (?, ?, ?, ?, ?, ?, ?)"); | |
sales.forEach(sale -> { | |
try { | |
int i = 1; | |
stmt.setString(i++, sale.getBuyer().getName()); | |
stmt.setString(i++, sale.getSeller().getName()); | |
stmt.setInt(i++, Integer.parseInt(sale.getProductId())); | |
stmt.setDouble(i++, sale.getPrice()); | |
stmt.setInt(i++, sale.getQuantity()); | |
stmt.setInt(i++, sale.getPurchaseOrder().getId()); | |
stmt.setInt(i++, sale.getSalesOrder().getId()); | |
if (stmt.execute()) { | |
ResultSet rs = stmt.getGeneratedKeys(); | |
rs.next(); | |
sale.setId(rs.getInt(1)); | |
rs.close(); | |
} | |
} catch (SQLException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
} | |
} | |
} | |
private void noteMarketPricesAndVolumes(List<Sale> sales) { | |
sales.forEach(sale -> { | |
updateMarketPrice(sale); | |
updateMarketVolume(sale); | |
}); | |
} | |
public static class MarketPrice { | |
private String productId; | |
private double price; | |
private Date timestamp; | |
public MarketPrice(String productId, double price, Date timestamp) { | |
this.productId = productId; | |
this.price = price; | |
this.timestamp = timestamp; | |
} | |
public double getPrice() { | |
return price; | |
} | |
public String getProductId() { | |
return productId; | |
} | |
public Date getTimestamp() { | |
return timestamp; | |
} | |
} | |
private void updateMarketPrice(Sale sale) { | |
MarketPrice mp = marketPrices.get(sale.getProductId()); | |
if (mp == null | |
|| (mp != null && mp.getTimestamp().getTime() < sale | |
.getTimestamp().getTime())) { | |
// set price if none is known, or replace price if its older than | |
// current price | |
marketPrices.put( | |
sale.getProductId(), | |
new MarketPrice(sale.getProductId(), sale.getPrice(), sale | |
.getTimestamp())); | |
} | |
} | |
public static class VolumeRecord { | |
public static final VolumeRecord EMPTY = new VolumeRecord(null, 0, 0, | |
null, 0); | |
public String productId; | |
public int numberOfSales; | |
public double turnover; | |
public Date timestamp; | |
public int count; | |
public VolumeRecord(String productId, int numberOfSales, | |
double turnover, Date timestamp, int count) { | |
this.productId = productId; | |
this.numberOfSales = numberOfSales; | |
this.turnover = turnover; | |
this.timestamp = timestamp; | |
this.count = count; | |
} | |
public static VolumeRecord add(VolumeRecord a, VolumeRecord b) { | |
return new VolumeRecord(b.productId, a.numberOfSales | |
+ b.numberOfSales, a.turnover + b.turnover, null, a.count | |
+ b.count); | |
} | |
public static VolumeRecord aggregate(List<VolumeRecord> vrs) { | |
VolumeRecord vr = EMPTY; | |
vr = vrs.stream().reduce(vr, VolumeRecord::add); | |
return vr; | |
} | |
} | |
private void updateMarketVolume(Sale sale) { | |
// ////////////// | |
// remove old ones | |
// ////////////// | |
Map<String, List<VolumeRecord>> newVolumeRecords = new HashMap<>(); | |
long now = System.currentTimeMillis(); | |
volumeRecords.forEach((k, v) -> { | |
List<VolumeRecord> vrs = v.stream().filter(vr -> { | |
return now - vr.timestamp.getTime() < 1000 * 10; | |
}).collect(Collectors.toList()); // remove older than 10 secs | |
newVolumeRecords.put(k, vrs); | |
}); | |
volumeRecords = newVolumeRecords; // replace the old ones | |
// ////////////// | |
// add new data | |
// ////////////// | |
if (sale != null) { | |
List<VolumeRecord> vrs = volumeRecords.get(sale.getProductId()); | |
if (vrs == null) { | |
vrs = new ArrayList<>(); | |
} | |
vrs.add(new VolumeRecord(sale.getProductId(), sale.getQuantity(), | |
sale.getQuantity() * sale.getPrice(), sale.getTimestamp(), | |
1)); // scale up to "per minute" | |
volumeRecords.put(sale.getProductId(), vrs); // replace with old one | |
} | |
} | |
public PurchaseOrder addPurchaseOrder(String who, String productId, | |
int quantity, int id) { | |
if (runInActorMode) { | |
Buyer buyer = new Buyer(who); | |
if (!this.market.getBuyers().contains(buyer)) { | |
LOGGER.debug("buyer named " + who | |
+ " doesnt exist -> adding a new one"); | |
this.market.addBuyer(buyer); | |
buyer.listener = listener; | |
} else { | |
// swap temp buyer with the actual one in the market | |
buyer = this.market.getBuyers().get( | |
this.market.getBuyers().indexOf(buyer)); | |
} | |
PurchaseOrder po = new PurchaseOrder(productId, quantity, 9999.9, | |
id); | |
buyer.addPurchaseOrder(po); | |
return po; | |
} else { | |
synchronized (newPurchaseOrders) { | |
Buyer b = new Buyer(who); | |
List<PurchaseOrder> pos = newPurchaseOrders.get(b); | |
if (pos == null) { | |
pos = new ArrayList<>(); | |
newPurchaseOrders.put(b, pos); | |
} | |
PurchaseOrder po = new PurchaseOrder(productId, quantity, | |
9999.9, id); | |
pos.add(po); | |
return po; | |
} | |
} | |
} | |
public SalesOrder addSalesOrder(String who, String productId, int quantity, | |
double price, int id) { | |
if (runInActorMode) { | |
Seller seller = new Seller(who); | |
if (!this.market.getSellers().contains(seller)) { | |
LOGGER.debug("seller named " + who | |
+ " doesnt exist -> adding a new one"); | |
this.market.addSeller(seller); | |
seller.listener = listener; | |
} else { | |
// swap temp seller with the actual one in the market | |
seller = this.market.getSellers().get( | |
this.market.getSellers().indexOf(seller)); | |
} | |
SalesOrder so = new SalesOrder(price, productId, quantity, id); | |
seller.addSalesOrder(so); | |
return so; | |
} else { | |
synchronized (newSalesOrders) { | |
Seller s = new Seller(who); | |
List<SalesOrder> sos = newSalesOrders.get(s); | |
if (sos == null) { | |
sos = new ArrayList<>(); | |
newSalesOrders.put(s, sos); | |
} | |
SalesOrder so = new SalesOrder(price, productId, quantity, id); | |
sos.add(so); | |
return so; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.web; | |
import static ch.maxant.tradingengine.web.Constants.DELAY; | |
import static ch.maxant.tradingengine.web.Constants.ID; | |
import static ch.maxant.tradingengine.web.Constants.NUM_KIDS; | |
import static ch.maxant.tradingengine.web.Constants.PRODUCT_IDS; | |
import static ch.maxant.tradingengine.web.Constants.TIMEOUT; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import javax.naming.NamingException; | |
import javax.servlet.ServletException; | |
import javax.servlet.annotation.WebServlet; | |
import javax.servlet.http.HttpServlet; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import ch.maxant.tradingengine.model.PurchaseOrder; | |
import ch.maxant.tradingengine.model.Sale; | |
import ch.maxant.tradingengine.model.SalesOrder; | |
import ch.maxant.tradingengine.model.TradingEngine.EventType; | |
import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord; | |
import ch.maxant.tradingengine.model.TradingEngineThread; | |
@WebServlet(urlPatterns = { "/sell", "/buy", "/result" }) | |
public class TradingEngineServlet extends HttpServlet { | |
private static final long serialVersionUID = 1L; | |
private static final Logger LOGGER = LogManager | |
.getLogger("tradingEngineServlet"); | |
private static Stats stats = new Stats(); | |
private static final Map<String, TradingEngineThread> kids = new HashMap<>(); | |
private static final Map<String, Result> results = new ConcurrentHashMap<>(); | |
private static final Set<String> knownProducts = Collections | |
.synchronizedSet(new HashSet<>()); | |
private static final AtomicInteger timedoutSales = new AtomicInteger(0); | |
static { | |
try { | |
int chunk = PRODUCT_IDS.length / NUM_KIDS; | |
for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) { | |
String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i | |
+ chunk); | |
LOGGER.info("created engine for products " + temparray); | |
TradingEngineThread engineThread = new TradingEngineThread( | |
DELAY, TIMEOUT, (type, data) -> event(type, data)); | |
for (int k = 0; k < temparray.length; k++) { | |
LOGGER.debug("mapping productId '" + temparray[k] | |
+ "' to engine " + i); | |
kids.put(temparray[k], engineThread); | |
} | |
LOGGER.info("---started trading"); | |
engineThread.start(); | |
} | |
} catch (NamingException e) { | |
LOGGER.error("failed to start engine", e); | |
} | |
// remove results older than a minute, every 5 seconds. | |
// in a real system you wouldnt necessarily cache results like | |
// we are doing - the sales are actually persisted by the | |
// trading engine - so clients could go look there! | |
new Timer("cleaner", true).scheduleAtFixedRate(new TimerTask() { | |
@Override | |
public void run() { | |
LOGGER.error("cleaning results... sales per minute: " | |
+ stats.totalSalesPerMinute + ", " | |
+ timedoutSales.get() + " timedout orders"); | |
long now = System.currentTimeMillis(); | |
List<String> toRemove = new ArrayList<>(); | |
results.forEach((k, v) -> { | |
if (now - v.created > 60000) { | |
toRemove.add(k); | |
} | |
}); | |
toRemove.forEach(k -> results.remove(k)); | |
LOGGER.info("completed cleaning results in " | |
+ (System.currentTimeMillis() - now) + "ms"); | |
} | |
}, 5000L, 5000L); | |
} | |
public static synchronized void event(final EventType type, | |
final Object data) { | |
switch (type) { | |
case SALE: { | |
Sale sale = (Sale) data; | |
int id = sale.getSalesOrder().getId(); | |
results.put(String.valueOf(id), new Result(String.valueOf(data))); | |
if (sale.getSalesOrder().getRemainingQuantity() == 0) { | |
String msg = "COMPLETED sales order"; | |
LOGGER.info("\n" + id + ") " + msg + " " + data); | |
} else { | |
LOGGER.info("\n" + id + ") PARTIAL sales order " + data); | |
} | |
break; | |
} | |
case PURCHASE: { | |
Sale sale = (Sale) data; | |
int id = sale.getPurchaseOrder().getId(); | |
results.put(String.valueOf(id), new Result(String.valueOf(data))); | |
if (sale.getPurchaseOrder().getRemainingQuantity() == 0) { | |
String msg = "COMPLETED purchase order"; | |
LOGGER.info("\n" + id + ") " + msg + " " + data); | |
} else { | |
LOGGER.info("\n" + id + ") PARTIAL purchase order " + data); | |
} | |
break; | |
} | |
case TIMEOUT_SALESORDER: { | |
timedoutSales.incrementAndGet(); | |
SalesOrder so = (SalesOrder) data; | |
String msg = "TIMEOUT sales order"; | |
LOGGER.info("\n" + so.getId() + ") " + msg + " " + data); | |
break; | |
} | |
case TIMEOUT_PURCHASEORDER: { | |
timedoutSales.incrementAndGet(); | |
PurchaseOrder po = (PurchaseOrder) data; | |
String msg = "TIMEOUT purchase order"; | |
LOGGER.info("\n" + po.getId() + ") " + msg + " " + data); | |
break; | |
} | |
case STATS: { | |
synchronized (knownProducts) { | |
Map<String, List<VolumeRecord>> mapOfVolumeRecords = (Map<String, List<VolumeRecord>>) ((Object[]) data)[2]; | |
stats.totalSalesPerMinute = knownProducts | |
.stream() | |
.map(productId -> { | |
return VolumeRecord.aggregate(mapOfVolumeRecords | |
.getOrDefault(productId, | |
Collections.emptyList())).count; | |
}).reduce(Integer::sum).orElse(0) * 6; // since stats | |
// are | |
// recorded | |
// for the last | |
// 10 secs | |
} | |
break; | |
} | |
default: | |
break; | |
} | |
} | |
@Override | |
protected void doGet(HttpServletRequest req, HttpServletResponse resp) | |
throws ServletException, IOException { | |
String path = req.getServletPath(); | |
LOGGER.debug("received command: '" + path + "'"); | |
String who = req.getParameter("userId"); | |
String productId = req.getParameter("productId"); | |
int quantity = Integer.parseInt(req.getParameter("quantity")); | |
TradingEngineThread engine = kids.get(productId); | |
knownProducts.add(productId); | |
int id = ID.getAndIncrement(); | |
// /buy?productId=1&quantity=10&userId=ant | |
if (path.equals("/buy")) { | |
PurchaseOrder po = engine.addPurchaseOrder(who, productId, | |
quantity, id); | |
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(po)); | |
} else if (path.equals("/sell")) { | |
double price = Double.parseDouble(req.getParameter("price")); | |
SalesOrder so = engine.addSalesOrder(who, productId, quantity, | |
price, id); | |
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(so)); | |
} else if (path.equals("/result")) { | |
String key = req.getParameter("id"); | |
Result r = results.get(key); | |
if (r != null) { | |
results.remove(key); | |
resp.getWriter().write(r.data); | |
} else { | |
resp.getWriter().write("UNKNOWN OR PENDING"); | |
} | |
} else { | |
String msg = "Unknown command " + path; | |
LOGGER.warn(msg); | |
} | |
} | |
private static class Stats { | |
int totalSalesPerMinute; | |
} | |
private static class Result { | |
String data; | |
long created; | |
Result(String data) { | |
this.data = data; | |
this.created = System.currentTimeMillis(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.web; | |
import static ch.maxant.tradingengine.web.Constants.DELAY; | |
import static ch.maxant.tradingengine.web.Constants.ID; | |
import static ch.maxant.tradingengine.web.Constants.NUM_KIDS; | |
import static ch.maxant.tradingengine.web.Constants.PRODUCT_IDS; | |
import static ch.maxant.tradingengine.web.Constants.TIMEOUT; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import javax.naming.NamingException; | |
import javax.servlet.ServletException; | |
import javax.servlet.annotation.WebServlet; | |
import javax.servlet.http.HttpServlet; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import akka.actor.AbstractActor; | |
import akka.actor.ActorRef; | |
import akka.actor.ActorSystem; | |
import akka.actor.Props; | |
import akka.japi.pf.ReceiveBuilder; | |
import ch.maxant.tradingengine.model.Buyer; | |
import ch.maxant.tradingengine.model.PurchaseOrder; | |
import ch.maxant.tradingengine.model.Sale; | |
import ch.maxant.tradingengine.model.SalesOrder; | |
import ch.maxant.tradingengine.model.Seller; | |
import ch.maxant.tradingengine.model.TradingEngine; | |
import ch.maxant.tradingengine.model.TradingEngine.EventType; | |
import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord; | |
@WebServlet(urlPatterns = { "/sell2", "/buy2", "/result2" }) | |
public class TradingEngineServletWithActors extends HttpServlet { | |
private static final long serialVersionUID = 1L; | |
private static final Logger LOGGER = LogManager | |
.getLogger("tradingEngineServletWithActors"); | |
private static final ActorSystem teSystem = ActorSystem | |
.create("TradingEngines"); | |
private static final Stats stats = new Stats(); | |
private static final Map<String, ActorRef> kids = new HashMap<>(); | |
private static final Map<String, Result> results = new ConcurrentHashMap<>(); | |
private static final Set<String> knownProducts = Collections | |
.synchronizedSet(new HashSet<>()); | |
private static final AtomicInteger timedoutSales = new AtomicInteger(0); | |
static { | |
int chunk = PRODUCT_IDS.length / NUM_KIDS; | |
for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) { | |
String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i + chunk); | |
LOGGER.info("created engine for products " + temparray); | |
ActorRef actor = teSystem.actorOf( | |
Props.create(TradingEngineActor.class), "engine-" + i); | |
for (int k = 0; k < temparray.length; k++) { | |
LOGGER.debug("mapping productId '" + temparray[k] | |
+ "' to engine " + i); | |
kids.put(temparray[k], actor); | |
} | |
LOGGER.info("---started trading"); | |
actor.tell(TradingEngineActor.RUN, ActorRef.noSender()); | |
} | |
// remove results older than a minute, every 5 seconds. | |
// in a real system you wouldnt necessarily cache results like | |
// we are doing - the sales are actually persisted by the | |
// trading engine - so clients could go look there! | |
new Timer("cleaner", true).scheduleAtFixedRate(new TimerTask() { | |
@Override | |
public void run() { | |
LOGGER.error("cleaning results... sales per minute: " | |
+ stats.totalSalesPerMinute + ", " | |
+ timedoutSales.get() + " timedout orders"); | |
long now = System.currentTimeMillis(); | |
List<String> toRemove = new ArrayList<>(); | |
results.forEach((k, v) -> { | |
if (now - v.created > 60000) { | |
toRemove.add(k); | |
} | |
}); | |
toRemove.forEach(k -> results.remove(k)); | |
LOGGER.info("completed cleaning results in " | |
+ (System.currentTimeMillis() - now) + "ms"); | |
} | |
}, 5000L, 5000L); | |
} | |
public static synchronized void event(final EventType type, | |
final Object data) { | |
switch (type) { | |
case SALE: { | |
Sale sale = (Sale) data; | |
int id = sale.getSalesOrder().getId(); | |
results.put(String.valueOf(id), new Result(String.valueOf(data))); | |
if (sale.getSalesOrder().getRemainingQuantity() == 0) { | |
String msg = "COMPLETED sales order"; | |
LOGGER.info("\n" + id + ") " + msg + " " + data); | |
} else { | |
LOGGER.info("\n" + id + ") PARTIAL sales order " + data); | |
} | |
break; | |
} | |
case PURCHASE: { | |
Sale sale = (Sale) data; | |
int id = sale.getPurchaseOrder().getId(); | |
results.put(String.valueOf(id), new Result(String.valueOf(data))); | |
if (sale.getPurchaseOrder().getRemainingQuantity() == 0) { | |
String msg = "COMPLETED purchase order"; | |
LOGGER.info("\n" + id + ") " + msg + " " + data); | |
} else { | |
LOGGER.info("\n" + id + ") PARTIAL purchase order " + data); | |
} | |
break; | |
} | |
case TIMEOUT_SALESORDER: { | |
timedoutSales.incrementAndGet(); | |
SalesOrder so = (SalesOrder) data; | |
String msg = "TIMEOUT sales order"; | |
LOGGER.info("\n" + so.getId() + ") " + msg + " " + data); | |
break; | |
} | |
case TIMEOUT_PURCHASEORDER: { | |
timedoutSales.incrementAndGet(); | |
PurchaseOrder po = (PurchaseOrder) data; | |
String msg = "TIMEOUT purchase order"; | |
LOGGER.info("\n" + po.getId() + ") " + msg + " " + data); | |
break; | |
} | |
case STATS: { | |
// an alternative approach for dealing with concurrency problems | |
// like ConcurrentModificationException is to copy the data, but it | |
// only works when handling read only data | |
Set<String> knownProductsCopy = new HashSet<>(knownProducts); | |
Map<String, List<VolumeRecord>> mapOfVolumeRecords = new HashMap( | |
(Map<String, List<VolumeRecord>>) ((Object[]) data)[2]); | |
stats.totalSalesPerMinute = knownProductsCopy | |
.stream() | |
.map(productId -> { | |
return VolumeRecord.aggregate(new ArrayList( | |
mapOfVolumeRecords.getOrDefault(productId, | |
Collections.emptyList()))).count; | |
}).reduce(Integer::sum).orElse(0) * 6; | |
// times 6 since stats are recorded for only the last ten secs, | |
// and we want them per minute | |
break; | |
} | |
default: | |
break; | |
} | |
} | |
@Override | |
protected void doGet(HttpServletRequest req, HttpServletResponse resp) | |
throws ServletException, IOException { | |
String path = req.getServletPath(); | |
LOGGER.debug("received command: '" + path + "'"); | |
String who = req.getParameter("userId"); | |
String productId = req.getParameter("productId"); | |
int quantity = Integer.parseInt(req.getParameter("quantity")); | |
ActorRef engine = kids.get(productId); | |
knownProducts.add(productId); | |
int id = ID.getAndIncrement(); | |
// /buy?productId=1&quantity=10&userId=ant | |
if (path.equals("/buy2")) { | |
PurchaseOrder po = new PurchaseOrder(productId, quantity, 9999.9, | |
id); | |
po.setBuyer(new Buyer(who)); | |
engine.tell(po, ActorRef.noSender()); | |
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(po)); | |
} else if (path.equals("/sell2")) { | |
double price = Double.parseDouble(req.getParameter("price")); | |
SalesOrder so = new SalesOrder(price, productId, quantity, id); | |
so.setSeller(new Seller(who)); | |
engine.tell(so, ActorRef.noSender()); | |
resp.getWriter().write("\"id\":" + id + ", " + String.valueOf(so)); | |
} else if (path.equals("/result2")) { | |
String key = req.getParameter("id"); | |
Result r = results.get(key); | |
if (r != null) { | |
results.remove(key); | |
resp.getWriter().write(r.data); | |
} else { | |
resp.getWriter().write("UNKNOWN OR PENDING"); | |
} | |
} else { | |
String msg = "Unknown command " + path; | |
LOGGER.warn(msg); | |
} | |
} | |
private static class Stats { | |
int totalSalesPerMinute; | |
} | |
private static class Result { | |
String data; | |
long created; | |
Result(String data) { | |
this.data = data; | |
this.created = System.currentTimeMillis(); | |
} | |
} | |
/** | |
* using actors, we guarantee that only ever one thread accesses our trading | |
* engine at any one time, and so we avoid having to synchronize! | |
*/ | |
private static class TradingEngineActor extends AbstractActor { | |
private static final String RUN = "RUN"; | |
// STATE | |
private TradingEngine engine = new TradingEngine(DELAY, TIMEOUT, (type, | |
data) -> handle(type, data), true); | |
public TradingEngineActor() throws NamingException { | |
// INBOX | |
receive(ReceiveBuilder | |
.match(SalesOrder.class, | |
so -> { | |
// BEHAVIOUR (delegated to engine) | |
engine.addSalesOrder(so.getSeller().getName(), | |
so.getProductId(), | |
so.getRemainingQuantity(), | |
so.getPrice(), so.getId()); | |
}) | |
.match(PurchaseOrder.class, | |
po -> { | |
// BEHAVIOUR (delegated to engine) | |
engine.addPurchaseOrder( | |
po.getBuyer().getName(), | |
po.getProductId(), | |
po.getRemainingQuantity(), po.getId()); | |
}) | |
.match(String.class, s -> RUN.equals(s), command -> { | |
engine.run(); | |
}) | |
.matchAny( | |
o -> System.err | |
.println("received unknown message: " + o)) | |
.build()); | |
} | |
private void handle(EventType type, Object data) { | |
event(type, data); | |
if (type.equals(EventType.STOPPED)) { | |
self().tell(RUN, ActorRef.noSender()); // start another trading | |
// engine! | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.maxant.tradingengine.model; | |
import javax.naming.NamingException; | |
import ch.maxant.tradingengine.model.TradingEngine.Listener; | |
import ch.maxant.tradingengine.model.TradingEngine.MarketPrice; | |
import ch.maxant.tradingengine.model.TradingEngine.VolumeRecord; | |
/** | |
* a simple delegate which caches buyers and sellers, just like the node.js | |
* child processes do. | |
*/ | |
public class TradingEngineThread extends Thread { | |
private static int ID = 0; | |
private final TradingEngine engine; | |
public TradingEngineThread(long delay, long timeout, Listener listener) | |
throws NamingException { | |
super("engine-" + ID++); | |
engine = new TradingEngine(delay, timeout, listener); | |
} | |
@Override | |
public void run() { | |
engine.run(); | |
} | |
public PurchaseOrder addPurchaseOrder(String who, String productId, | |
int quantity, int id) { | |
return engine.addPurchaseOrder(who, productId, quantity, id); | |
} | |
public SalesOrder addSalesOrder(String who, String productId, int quantity, | |
double price, int id) { | |
return engine.addSalesOrder(who, productId, quantity, price, id); | |
} | |
public VolumeRecord getCurrentVolume(String productId) { | |
return engine.getCurrentVolume(productId); | |
} | |
public MarketPrice getMarketPrice(String productId) { | |
return engine.getCurrentMarketPrice(productId); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment