This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { | |
String path = req.getServletPath(); | |
LOGGER.debug("received command: '" + path + "'"); | |
String who = req.getParameter("userId"); | |
String productId = req.getParameter("productId"); | |
TradingEngineThread engine = kids.get(productId); | |
int quantity = Integer.parseInt(req.getParameter("quantity")); | |
int id = ID.getAndIncrement(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@WebServlet(urlPatterns = { "/sell2", "/buy2", "/result2" }) | |
public class TradingEngineServletWithActors extends HttpServlet { | |
private static final ActorSystem teSystem = ActorSystem.create("TradingEngines"); | |
private static final Map<String, ActorRef> kids = new HashMap<>(); | |
static { | |
int chunk = PRODUCT_IDS.length / NUM_KIDS; | |
for (int i = 0, j = PRODUCT_IDS.length; i < j; i += chunk) { | |
String[] temparray = Arrays.copyOfRange(PRODUCT_IDS, i, i + chunk); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static class TradingEngineActor extends AbstractActor { | |
// STATE | |
private TradingEngine engine = new TradingEngine(DELAY, TIMEOUT, (type, data) -> handle(type, data), true); | |
public TradingEngineActor() throws NamingException { | |
// INBOX | |
receive(ReceiveBuilder | |
.match(SalesOrder.class, so -> { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Date; | |
import java.io.BufferedReader; | |
import java.io.ByteArrayOutputStream; | |
import java.io.InputStreamReader; | |
import java.net.URL; | |
import java.net.URLConnection; | |
import java.util.Random; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
USE TRADER; | |
DROP TABLE IF EXISTS SALES; | |
DROP TABLE IF EXISTS PRODUCTS; | |
CREATE TABLE PRODUCTS ( | |
ID BIGINT NOT NULL AUTO_INCREMENT, | |
NAME VARCHAR(255) NOT NULL, | |
PRIMARY KEY (ID) | |
) ENGINE = INNODB; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static final ActorSystem system = ActorSystem.create("system"); | |
public static void main(String[] args) { | |
... | |
ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); | |
InetSocketAddress endpoint = new InetSocketAddress(3000); | |
int backlog = 100; | |
List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList(); | |
Option<ServerSettings> settings = scala.Option.empty(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static class HttpActor extends AbstractActor { | |
private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1(); | |
public HttpActor() { | |
final Router router = partitionAndCreateRouter(); | |
receive(ReceiveBuilder | |
.match(HttpRequest.class, r -> { | |
int id = Constants.ID.getAndIncrement(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private Router partitionAndCreateRouter() { | |
Map<String, ActorRef> kids = new HashMap<>(); | |
java.util.List<Routee> routees = new ArrayList<Routee>(); | |
int chunk = Constants.PRODUCT_IDS.length / NUM_KIDS; | |
for (int i = 0, j = Constants.PRODUCT_IDS.length; i < j; i += chunk) { | |
String[] temparray = Arrays.copyOfRange(Constants.PRODUCT_IDS, i, i + chunk); | |
LOGGER.info("created engine for products " + temparray); | |
ActorRef actor = getContext().actorOf(Props.create(EngineActor.class)); | |
getContext().watch(actor); | |
routees.add(new ActorRefRoutee(actor)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class PartitioningRoutingLogic implements RoutingLogic { | |
private Map<String, ActorRef> kids; | |
public PartitioningRoutingLogic(Map<String, ActorRef> kids) { | |
this.kids = kids; | |
} | |
@Override | |
public Routee select(Object message, IndexedSeq<Routee> routees) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void persistSales(List<Sale> sales, final PersistenceComplete f) { | |
if (!sales.isEmpty()) { | |
LOGGER.info("preparing to persist sales"); | |
final AtomicInteger count = new AtomicInteger(sales.size()); | |
sales.forEach(sale -> { | |
List values = Arrays.asList(sale.getBuyer().getName(), | |
sale.getSeller().getName(), | |
sale.getProductId(), | |
sale.getPrice(), |