Skip to content

Instantly share code, notes, and snippets.

View rohitdholakia's full-sized avatar

Rohit Dholakia rohitdholakia

View GitHub Profile
from confluent_kafka import admin, ConfigResource
config = {'bootstrap.servers': 'broker:9092'}
a = admin.AdminClient(config)
fs = a.describe_configs([ConfigResource(ConfigResource.Type.TOPIC, 'topic_name')])
for f, item in fs.items(): print(item.result()) # this works great
fs = a.describe_configs([ConfigResource.Type.GROUP, 'group_name'])
# this line causes it to die!
@rohitdholakia
rohitdholakia / FileSinkOperator.md
Last active February 1, 2016 23:08
FileSinkOperator documentation

What is a sink operator?

Hive runs a SQL query as a DAG of jobs or, before Hive 2.0, as MapReduce jobs as well. The final result of this job or set of jobs is always stored in HDFS. Why? Well, your cluster might have to be rebooted between the end of the job and sending of this output to client, or something else might go wrong. To protect against these, we write the final output to HDFS and when Hive gets a request for this output (typically over thrift), it sends the file across.

To write output to HDFS, a FileSinkOperator instance is used. FileSinkOperator is a TerminalOperator, meaning that it is called at the end of a job. If there are multiple tasks running, each of them will have their own instance of FileSink (FSO). Whenever you first take a look at an operator, look at 3 functions:

  • initializeOp()
  • process()
  • closeOp()
@rohitdholakia
rohitdholakia / Handson.md
Last active December 1, 2015 00:27
Hands-on assignments - Spark on EMR

#Preamble

If you are reading this, congratulations! You have already setup your first cluster on EMR with Spark installed. Today, we will get hands-on with several features of Spark. Mainly, we will focus on pipelining transformations and actions.

##Shakespeare

  • There is a text file on S3, in bucket datateched, Shakespeare_all.txt. I want you to copy it from S3 to your local system. To do this, you will need to use the aws cli library. This would already be setup on your machine as you all installed ipython and other libraries (yay!)

  • aws s3 cp s3://datateched/

Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
def three_way(a, lo, hi):
''' dutch national flag problem for 3-way partitioning in quicksort'''
key = a[random.randint(lo, hi)]
less, equal = lo
greater = hi
while equal < high:
if a[less] < key:
a[less], a[equal] = a[equal], a[less]
less += 1
def insertion_sort(a):
''' implement insertion sort'''
for i in xrange(1, len(a)):
key = a[i]
j = i - 1
while j >= 0:
if a[j] > key:
a[j], a[j+1] = a[j + 1], a[j]
j = j - 1
import random
import random
import copy
#first for vanilla_quicksort, second for pivot that is midpoint, and third for random pivot selection
rand_data = [random.randint(0, 10000) for i in xrange(1000000)]
rand_mid = copy.deepcopy(rand_data)
rand_random = copy.deepcopy(rand_data)
@rohitdholakia
rohitdholakia / partition.py
Last active August 29, 2015 13:57
Partition algorithm
def partition(a, lo, hi):
pivot = a[lo]
left = lo + 1
right = hi
while left <= right:
while a[left] < pivot:
left += 1
while a[right] > pivot:
right -= 1
if left <= right:
@rohitdholakia
rohitdholakia / BloomFilter12.py
Created February 21, 2014 05:58
Bloom Filter - 2** 12
import mmh3
import sys
import os
import math
with open(sys.argv[1]) as usernames:
MAX = 2 ** 12
bitarray = [0 for i in xrange(MAX)]
for line in usernames:
line = line.rstrip()
@rohitdholakia
rohitdholakia / GettingUsernames.py
Created February 21, 2014 05:51
Reading usernames from the Twitter file
import sys
import os
with os.popen('zcat ' + sys.argv[1]) as tweet_file, open(sys.argv[2], 'w') as output:
#skip all lines with anything but U in them
for line in tweet_file:
parts = line.rstrip().split('\t')
if 'U' not in parts[0]:
continue
output.write(parts[1].lstrip('http://twitter.com/') + '\n')