Skip to content

Instantly share code, notes, and snippets.

@lurenx
lurenx / install-azkaban.md
Created May 14, 2016 08:17 — forked from greenqy/install-azkaban.md
install-azkaban.md
@lurenx
lurenx / HdfsReader.java
Created May 3, 2016 05:49
use hadoop fs api to read file in case of hadoop namenode HA (QJM)
package com.hadoop.hdfs.reader;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@lurenx
lurenx / producer.py
Created March 24, 2016 10:20
kafka python producer
from pykafka import KafkaClient
import avro.schema
import io, random
from avro.io import DatumWriter
schema_path="user.avsc"
schema = avro.schema.parse(open(schema_path).read())
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['avro']
@lurenx
lurenx / Consumer.py
Created March 24, 2016 10:19
kafka Consumer
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
# To consume messages
consumer = KafkaConsumer('my-topic',
group_id='my_group',
bootstrap_servers=['localhost:9092'])
@lurenx
lurenx / user.avsv
Created March 24, 2016 10:19
avro schema
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
@lurenx
lurenx / kafka.md
Created March 24, 2016 10:08 — forked from ashrithr/kafka.md
kafka introduction

Introduction to Kafka

Kafka acts as a kind of write-ahead log (WAL) that records messages to a persistent store (disk) and allows subscribers to read and apply these changes to their own stores in a system appropriate time-frame.

Terminology:

  • Producers send messages to brokers
  • Consumers read messages from brokers
  • Messages are sent to a topic
@lurenx
lurenx / avro_kafka.rb
Created March 24, 2016 09:58
ruby avro kafka
require 'poseidon'
require 'avro'
require 'json'
schema = Avro::Schema.parse(File.open("item.avsc", "rb").read)
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
@lurenx
lurenx / items.avsc
Created March 24, 2016 09:47
avro schema
{
"type" : "record",
"name" : "Item",
"namespace" : "example.avro",
"fields" : [
{"name": "name", "type": "string"},
{"name": "description", "type":["string", "null"]},
{"name": "price", "type":["double", "null"]}
]
}
@lurenx
lurenx / deserialise_avro.rb
Created March 24, 2016 09:33
deserialise avro
require 'rubygems'
require 'avro'
# Open items.avro file in read mode
file = File.open('items.avro', 'rb')
# Create an instance of DatumReader
reader = Avro::IO::DatumReader.new()
# Equivalent to DataFileReader instance creation in Java
@lurenx
lurenx / serialise_avro.rb
Created March 24, 2016 09:28
serialise_avro.rb
require 'rubygems'
require 'avro'
# Below line creates items.avro file if it is not present otherwise opens it in write mode
file = File.open('items.avro', 'wb')
# Opens item.avsc in read mode and parses the schema.
schema = Avro::Schema.parse(File.open("item.avsc", "rb").read)
# Creates DatumWriter instance with required schema.