Skip to content

Instantly share code, notes, and snippets.

View iandow's full-sized avatar

Ian Downard iandow

View GitHub Profile
/* Copyright (c) 2009 & onwards. MapR Tech, Inc., All rights reserved */
package com.mapr.demo.finserv;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.mapr.db.MapRDB;
import com.mapr.db.Table;
import org.ojai.Document;
package com.mapr.demo.finserv;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
package com.mapr.test;
/******************************************************************************
* PURPOSE:
* This Kafka consumer is designed to measure how fast we can consume
* messages from a topic and persist them to MapR-DB. It output throughput
* stats to stdout.
*
* This Kafka consumer reads NYSE Tick data from a MapR Stream topic and
* persists each message in a MapR-DB table as a JSON Document, which can

BY USING THIS SOFTWARE, YOU EXPRESSLY ACCEPT AND AGREE TO THE TERMS OF THE AGREEMENT CONTAINED IN THIS GITHUB REPOSITORY. See the file EULA.md for details.

An Example Application for Processing Stock Market Trade Data on the MapR Converged Data Platform

This project provides a processing engine for ingesting real time streams of trades, bids and asks into MapR Streams at a high rate. The application consists of the following components:

  • A Producer microservice that streams trades, bids and asks using the NYSE TAQ format. The data source is the Daily Trades dataset described here. The schema for our data is detailed in Table 6, "Daily Trades File Data Fields", on page 26 of Daily TAQ Client Specification (from December 1st, 2013).
  • A multi-threaded Consumer microservice that indexes the trades by receiver and sender.
  • Example Spark code for querying the indexed streams at interactive speeds, enabling Spark SQL
MESSAGES=1000000
MSGSIZE=100
maprcli stream delete -path /user/mapr/iantest >& /dev/null
maprcli stream create -path /user/mapr/iantest -produceperm p -consumeperm p -topicperm p
mapr perfproducer -path /user/mapr/iantest | tail -n 12 | grep ":" | awk -F ":" '{printf "%s, ",$1}' >> partitions.csv
echo "numPartitions, numMsgs, sizeMsg, eth0 TX kB/s, eth0 RX kB/s" >> partitions.csv
maprcli stream delete -path /user/mapr/iantest >& /dev/null
for NUMPARTITIONS in 1 2 3 4 5 6 7 8 9 10 15 20 25 30 35 40 45 50 60 70 80 90 100 150 200 250 300 350 400; do
maprcli stream create -path /user/mapr/iantest -produceperm p -consumeperm p -topicperm p -defaultpartitions $NUMPARTITIONS
while true; do
tail -n 40 /opt/kafka_2.11-0.10.0.1/logs/server.log | grep ZkTimeout;
if [ $? -eq 0 ]; then
sudo service kafka stop
ssh kafkanodeb sudo service kafka stop
ssh kafkanodec sudo service kafka stop
sudo rm -rf /tmp/kafka-logs
ssh kafkanodeb sudo rm -rf /tmp/kafka-logs
ssh kafkanodec sudo rm -rf /tmp/kafka-logs
sudo rm -rf /tmp/zookeeper/version-2
# Run these commands to measure consumer throughput in Kafka
MESSAGES=1000000
echo "numMsgs, sizeMsg, start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec" >> consumer.csv
for MSGSIZE in 10 100 1000 2000 4000 6000 8000 10000; do
for i in `seq 1 20`; do
TOPIC='iantest-'$MSGSIZE
echo $TOPIC
/opt/kafka_2.11-0.10.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic $TOPIC --config compression.type=uncompressed
/opt/kafka_2.11-0.10.0.1/bin/kafka-producer-perf-test.sh --topic $TOPIC --num-records $MESSAGES --record-size $MSGSIZE --throughput -1 --producer-props bootstrap.servers=localhost:9092 buffer.memory=67108864 batch.size=8196 acks=all >& /dev/null
echo -n "$MESSAGES, $MSGSIZE, " >> consumer.csv
environment:
mapr_core_version: 5.2.0
config:
admin_id: mapr
cluster_admin_create: false
cluster_admin_gid: 5000
cluster_admin_group: mapr
cluster_admin_id: mapr
cluster_admin_uid: 5000
cluster_id: '6486878629867426246'
import os, sys
import tensorflow as tf
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# change this as you see fit
image_path = sys.argv[1]
# Read in the image_data
@iandow
iandow / gist:97502c6959bdaa1eb0e5ed030f9e82a0
Last active July 24, 2017 23:03
maprdb_pyspark_error.txt
mapr@nodea107:~/python-bindings$ PYSPARK_PYTHON=/usr/bin/python3 /opt/mapr/spark/spark-2.1.0/bin/pyspark --py-files dist/maprdb-0.0.1-py3.4.egg
Python 3.4.3 (default, Nov 17 2016, 01:08:31)
[GCC 4.8.4] on linux
Type "help", "copyright", "credits" or "license" for more information.
17/07/24 22:45:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/07/24 22:45:40 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
17/07/24 22:45:40 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
Welcome to
____ __
/ __/__ ___ _____/ /__