Skip to content

Instantly share code, notes, and snippets.

View msukmanowsky's full-sized avatar
🥳
Building the future of how companies work with elvex!

Mike Sukmanowsky msukmanowsky

🥳
Building the future of how companies work with elvex!
View GitHub Profile
@msukmanowsky
msukmanowsky / custom_code_bolt.py
Last active August 29, 2015 14:05
A custom code execution bolt, not yet tested.
import logging
from streamparse.bolt import Bolt
log = logging.getLogger("custom_code_bolt")
class CustomCodeBolt(Bolt):
@msukmanowsky
msukmanowsky / storm_version.py
Last active August 29, 2015 14:07
Parse Apache Storm versions in Python and do easy comparisons on them. You could probably even import something from here https://github.com/pypa/pip/blob/19e29fc2e8e57a671e584726655bbb42c6e15eee/pip/_vendor/distlib/version.py and it'd work just fine but haven't tested.
import re
class InvalidVersionException(Exception): pass
class StormVersion(object):
VERSION_RE = re.compile(r"(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)"
"(?P<older_patch>\.\d+)?(?P<other>.*)")
RC_RE = re.compile(r"-rc(?P<release_candidate>\d+)", re.IGNORECASE)
@msukmanowsky
msukmanowsky / CassandraConverters.scala
Last active August 29, 2015 14:08
Custom version of CassandraConverters.scala in the spark/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala. Provides better (though not perfect) serialization of keys and values for CqlOutputFormat.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@msukmanowsky
msukmanowsky / pyspark_cassandra.py
Last active August 29, 2015 14:08
Work in progress ideas for a PySpark binding to the DataStax Cassandra-Spark Connector.
from pyspark.context import SparkContext
from pyspark.serializers import BatchedSerializer, PickleSerializer
from pyspark.rdd import RDD
from py4j.java_gateway import java_import
class CassandraSparkContext(SparkContext):
def _do_init(self, *args, **kwargs):
@msukmanowsky
msukmanowsky / spark_gzip.py
Created November 14, 2014 01:32
Example of how to save Spark RDDs to disk using GZip compression in response to https://twitter.com/rjurney/status/533061960128929793.
from pyspark import SparkContext
def main():
sc = SparkContext(appName="Test Compression")
# RDD has to be key, value pairs
data = sc.parallelize([
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
install.packages("jsonlite", dependencies = TRUE)
install.packages("RCurl", dependencies = TRUE)
library("jsonlite")
library("RCurl")
base_url <- "https://api.parsely.com/v2"
apikey <- "computerworld.com"
api_secret <- "YOUR SECRET KEY"
#!/usr/bin/env bash
# Hitting CTRL-C kills the Django server as well as all tunnels that were created
TUNNEL_PIDS=()
function kill_tunnels() {
for tunnel_pid in "${TUNNEL_PIDS[@]}"
do
kill $tunnel_pid
done
}

Basics

Sort the output of a command

By 3rd column (1-indexed) in reverse order

sort -k3 -r

Spark / PySpark aggregateByKey Example

The existing examples for this are good, but they miss a pretty critical observation, the number of partitions and how this affects things.

Assume we have the following script, aggregate_by_key.py:

import pprint
from pyspark.context import SparkContext
import datetime as dt
import pprint
import pytz
print(pytz.__version__)
# '2015.4'
timezone = pytz.timezone('Europe/London')
tmsp = dt.datetime(2015, 3, 29, 1, tzinfo=pytz.utc)