Skip to content

Instantly share code, notes, and snippets.

View maxant's full-sized avatar

Ant Kutschera maxant

View GitHub Profile
@maxant
maxant / TradingEngineServlet-doGet.java
Created December 25, 2014 20:40
Servlet handling requests
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String path = req.getServletPath();
LOGGER.debug("received command: '" + path + "'");
String who = req.getParameter("userId");
String productId = req.getParameter("productId");
TradingEngineThread engine = kids.get(productId);
int quantity = Integer.parseInt(req.getParameter("quantity"));
int id = ID.getAndIncrement();
@maxant
maxant / TradingEngineServletWithActors-creatingActors.java
Created December 25, 2014 21:31
Trading engine servlet based on actors
@WebServlet(urlPatterns = { "/sell2", "/buy2", "/result2" })
public class TradingEngineServletWithActors extends HttpServlet {
private static final ActorSystem teSystem = ActorSystem.create("TradingEngines");
private static final Map<String, ActorRef> kids = new HashMap<>();
static {
int chunk = PRODUCT_IDS.length / NUM_KIDS;
for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) {
String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i + chunk);
@maxant
maxant / TradingEngineActor-relevantParts.java
Created December 25, 2014 21:39
The trading engine actor class.
private static class TradingEngineActor extends AbstractActor {
// STATE
private TradingEngine engine = new TradingEngine(DELAY, TIMEOUT, (type, data) -> handle(type, data), true);
public TradingEngineActor() throws NamingException {
// INBOX
receive(ReceiveBuilder
.match(SalesOrder.class, so -> {
@maxant
maxant / TradingEngineTestClient.java
Created December 25, 2014 22:02
Trading Engine Test Client
import java.util.Date;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@maxant
maxant / createdb.sql
Created December 26, 2014 09:52
Database creation script
USE TRADER;
DROP TABLE IF EXISTS SALES;
DROP TABLE IF EXISTS PRODUCTS;
CREATE TABLE PRODUCTS (
ID BIGINT NOT NULL AUTO_INCREMENT,
NAME VARCHAR(255) NOT NULL,
PRIMARY KEY (ID)
) ENGINE = INNODB;
@maxant
maxant / Main1.java
Last active August 29, 2015 14:12
Main program, setting up Spray and Akka
public static final ActorSystem system = ActorSystem.create("system");
public static void main(String[] args) {
...
ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");
InetSocketAddress endpoint = new InetSocketAddress(3000);
int backlog = 100;
List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
Option<ServerSettings> settings = scala.Option.empty();
@maxant
maxant / Main2.java
Created December 30, 2014 16:25
HttpActor for handling requests and routing them to the actors encapsulating the trading engines
private static class HttpActor extends AbstractActor {
private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();
public HttpActor() {
final Router router = partitionAndCreateRouter();
receive(ReceiveBuilder
.match(HttpRequest.class, r -> {
int id = Constants.ID.getAndIncrement();
@maxant
maxant / Main3.java
Created December 30, 2014 20:36
Partitioning the market and creating the router
private Router partitionAndCreateRouter() {
Map<String, ActorRef> kids = new HashMap<>();
java.util.List<Routee> routees = new ArrayList<Routee>();
int chunk = Constants.PRODUCT_IDS.length / NUM_KIDS;
for (int i = 0, j = Constants.PRODUCT_IDS.length; i < j; i += chunk) {
String[] temparray = Arrays.copyOfRange(Constants.PRODUCT_IDS, i, i + chunk);
LOGGER.info("created engine for products " + temparray);
ActorRef actor = getContext().actorOf(Props.create(EngineActor.class));
getContext().watch(actor);
routees.add(new ActorRefRoutee(actor));
@maxant
maxant / Main4.java
Created December 30, 2014 20:46
The routing logic used to decide which actor will handle a sales or purchase order.
public static class PartitioningRoutingLogic implements RoutingLogic {
private Map<String, ActorRef> kids;
public PartitioningRoutingLogic(Map<String, ActorRef> kids) {
this.kids = kids;
}
@Override
public Routee select(Object message, IndexedSeq<Routee> routees) {
@maxant
maxant / Main5.java
Created December 30, 2014 20:57
Asynchronous persistence
private void persistSales(List<Sale> sales, final PersistenceComplete f) {
if (!sales.isEmpty()) {
LOGGER.info("preparing to persist sales");
final AtomicInteger count = new AtomicInteger(sales.size());
sales.forEach(sale -> {
List values = Arrays.asList(sale.getBuyer().getName(),
sale.getSeller().getName(),
sale.getProductId(),
sale.getPrice(),