Skip to content

Instantly share code, notes, and snippets.

Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@spektom
spektom / consul-ephemeral-key.sh
Last active September 11, 2017 18:45
Ephemeral nodes in Consul
#!/bin/bash -x
if [ -z "$1" ]; then
echo "USAGE: $0 <session ID>"
exit 1
fi
read -d '' data <<EOF
{
}
@spektom
spektom / spark-submit-flamegraph
Created June 5, 2017 12:23
Profiling Spark applications in one click
#!/bin/bash
set -e
trap 'kill $(jobs -p) 2>/dev/null' EXIT
function find_unused_port() {
for port in $(seq $1 65000); do
echo -ne "\035" | telnet 127.0.0.1 $port >/dev/null 2>&1;
if [ $? -eq 1 ]; then
echo $port
/**
* Protect cardinality of a column in a data frame
*
* @param df Input data frame
* @param col Column which cardinality will be protected
* @param metric Metric to use when ranking column values
* @param groupBy Additional columns to group by when
* calculating column value ranks
* @param limit Max number of distinct values in a column
* @param replaceWith Static text to replace all values that
@spektom
spektom / airflow-exclusive-operator.py
Created September 26, 2016 19:25
Extending Airflow's Bash operator and making it exclusive using Consul locking mechanism
import consul
import consul_lock
from airflow.operators import BashOperator
class ExclusiveMixin(BashOperator):
"""
Acquires lock on this operator using either task ID or provided lock name,
so no other instance will be eligible to run. This mixin is disabled by default,
in order to enable it pass exclusive=True.
(defn- db-connect []
(couchbase/create-client {:bucket "default"
:uris ["http://couchbase001.local:8091/pools"
"http://couchbase002.local:8091/pools"
"http://couchbase003.local:8091/pools"
"http://couchbase004.local:8091/pools"]}))
(defn make-key [app-id device-id]
(str app-id ":" device-id))
(defn- db-connect []
(let [client-policy (new AsyncClientPolicy)
hosts (map #(new Host % 3000) ["aerospike001.local"
"aerospike002.local"
"aerospike003.local"
"aerospike004.local"])]
(set! (. client-policy asyncMaxCommands) 1000)
(set! (. client-policy asyncSelectorThreads) (.availableProcessors (Runtime/getRuntime)))
(new AsyncClient client-policy (into-array Host hosts))))
(defn- db-connect [aerospike-conf]
(log/info (str "Initiating connection with Aerospike DB"))
(let [client-policy (new AsyncClientPolicy)
hosts (map #(new Host % (:port aerospike-conf)) (:hosts aerospike-conf))]
(set! (. client-policy asyncMaxCommands) (:async-max-commands aerospike-conf))
(set! (. client-policy asyncSelectorThreads) (.availableProcessors (Runtime/getRuntime)))
(new AsyncClient client-policy (into-array Host hosts))))
(defn- make-write-policy [config]
"Creates write policy for device records"
(defn- db-connect [couchbase-conf]
(log/info "Initiating connection with Couchbase")
(couchbase/create-client couchbase-conf))
(defn make-key [app-id device-id]
"Returns key under which device record will be written to Couchbase"
(str app-id ":" device-id))
(defn make-data [msg]
"Create data bins that will be stored under device key"
val q = new org.apache.spark.sql.SQLContext(sc);
q.load("parquet", Map("path" -> "s3://raw-data/...", "mergeSchema" -> "false"))
.registerTempTable("organic")
q.sql("SELECT * FROM organic ...")
.map(_.mkString("\t"))
.coalesce(1, true)
.saveAsTextFile("s3://results-bucket/...")