Skip to content

Instantly share code, notes, and snippets.

@iandow
Created October 5, 2016 17:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iandow/f6376264c2281d1c0e3a2485e86c9f23 to your computer and use it in GitHub Desktop.
Save iandow/f6376264c2281d1c0e3a2485e86c9f23 to your computer and use it in GitHub Desktop.
/* Copyright (c) 2009 & onwards. MapR Tech, Inc., All rights reserved */
package com.mapr.demo.finserv;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.mapr.db.MapRDB;
import com.mapr.db.Table;
import org.ojai.Document;
import org.ojai.DocumentStream;
import org.ojai.store.QueryCondition;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
public class Persister {
// Declare a new consumer.
private static KafkaConsumer consumer;
// every N rows print a MapR-DB update
private static int U_INTERVAL = 100;
// polling
private static int TIMEOUT = 1000;
public static void main(String[] args) throws IOException {
configureConsumer(args);
// we will listen to everything in JSON format after it's
// been processed
String topic = "/user/mapr/taq:processed";
String tableName = "/user/mapr/ticktable";
Set<String> syms = new HashSet<String>();
long nmsgs = 0;
Table table;
if (args.length == 1) {
topic = args[0];
}
List<String> topics = new ArrayList<String>();
topics.add(topic);
// subscribe to the raw data
System.out.println("Subscribing to " + topic);
consumer.subscribe(topics);
// delete the old table if it's there
if (MapRDB.tableExists(tableName)) {
System.out.println("deleting old table " + tableName);
MapRDB.deleteTable(tableName);
}
// make a new table
table = MapRDB.createTable(tableName);
// probably want this
table.setOption(Table.TableOption.BUFFERWRITE, true);
// request everything
for (;;) {
ConsumerRecords<String, byte[]> msg = consumer.poll(TIMEOUT);
if (msg.count() == 0) {
System.out.println("No messages after 1 second wait.");
} else {
System.out.println("Read " + msg.count() + " messages");
nmsgs += msg.count();
// Iterate through returned records, extract the value
// of each message, and print the value to standard output.
Iterator<ConsumerRecord<String, byte[]>> iter = msg.iterator();
while (iter.hasNext()) {
ConsumerRecord<String, byte[]> record = iter.next();
Tick tick = new Tick(record.value());
Document document = MapRDB.newDocument((Object)tick);
String this_sym = document.getString("symbol");
syms.add(this_sym);
// save document into the table
table.insertOrReplace(tick.getTradeSequenceNumber(), document);
}
}
if ((msg.count() != 0) && (nmsgs % U_INTERVAL) == 0) {
System.out.println("Write update per-symbol:");
System.out.println("------------------------");
for (String s : syms) {
QueryCondition cond = MapRDB.newCondition()
.is("symbol", QueryCondition.Op.EQUAL, s).build();
DocumentStream results = table.find(cond);
int c = 0;
for (Document d : results) {
c++;
}
System.out.println("\t" + s + ": " + c);
}
}
}
}
/* Set the value for configuration parameters.*/
private static void configureConsumer(String[] args) {
Properties props = new Properties();
// cause consumers to start at beginning of topic on first read
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// which class to use to deserialize the value of each message
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumer = new KafkaConsumer<String, String>(props);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment