Skip to content

Instantly share code, notes, and snippets.

Producer
Setup
bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test-rep-one --partitions 6 --replication-factor 1
bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3
Single thread, no replication
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test7 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196
// Set up the input DStream to read from Kafka (in parallel)
val kafkaStream = {
val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
val kafkaParams = Map(
"zookeeper.connect" -> "zookeeper1:2181",
"group.id" -> "spark-streaming-test",
"zookeeper.connection.timeout.ms" -> "1000")
val inputTopic = "input-topic"
val numPartitionsOfInputTopic = 5
val streams = (1 to numPartitionsOfInputTopic) map { _ =>
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisPubSubtest {
private static final String JEDIS_SERVER = "somewhere";
@samklr
samklr / musisngs.scala
Created February 25, 2015 01:28
Algebird Musings
import com.twitter.algebird._
import HyperLogLog._
import com.twitter.algebird.Monoid
import com.twitter.algebird.DecayedValue
import com.twitter.algebird.Operators._
val hll = new HyperLogLogMonoid(4)
@samklr
samklr / MaxHashTags.scala
Created March 1, 2015 13:10
Popular Hashtags
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
*
@samklr
samklr / DistrinctAggregateCount.scala
Created March 1, 2015 14:13
Distinct Aggregator Counting Events
class DistinctAggregateCount (
val key: String,
val value: String,
val hierarchy: Option[String] = Some("/"),
val window: (Option[Long], Option[Long]) = (None, None))
extends AbstractOperator with Serializable {
override def process(stream: DStream[Event]) = {
val windowedStream = windowStream(stream, window)
/**
* From a file that contains
* doc_id w1 w2 w3 ... lines, separated by tabs
* return an inverted index Map of w -> Set(doc_id)
*
* @return Map[String,Set[String]]
*/
import scala.collection.immutable.Map
def invertedIndex() = {
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.hadoop.io.{DataInputBuffer, DataOutputBuffer, Writable}
import scala.reflect.ClassTag
class WritableKryoSerializer[V <% Writable, W <: Writable <% V : ClassTag] extends Serializer[V] {
def write(kryo: Kryo, out: Output, v: V) = {
val dob = new DataOutputBuffer()
v.write(dob)
import sys
from scikits.learn.feature_extraction.text import CountVectorizer
from scikits.learn.feature_extraction.text import TfidfTransformer
from scikits.learn.feature_extraction.text import CharNGramAnalyzer
from scikits.learn.svm.sparse import LinearSVC
from scikits.learn.pipeline import Pipeline
from scikits.learn.datasets import load_files
from scikits.learn import metrics
@samklr
samklr / spark-standalone.sh
Last active August 29, 2015 14:16
Spark Standalone Cluster
--------Spark Standalone---------
Prerequistes :
* Set JAVA_HOME env variable
* Configure ssh so master and workers can talk without pw
- $ ssh-keygen -- enter
- Copy the SSH Public Key (id_rsa.pub) to the root account on your target hosts.
.ssh/id_rsa .ssh/id_rsa.pub
- Add the SSH Public Key to the authorized_keys file on your target hosts.
$ cat id_rsa.pub >> authorized_keys