Skip to content

Instantly share code, notes, and snippets.

View timothyrenner's full-sized avatar

Timothy Renner timothyrenner

View GitHub Profile
@timothyrenner
timothyrenner / TableJoinKafkaStream.java
Created June 16, 2016 23:05
Example of KTable-KTable join in Kafka Streams
package io.github.timothyrenner.kstreamex.tablejoin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
@timothyrenner
timothyrenner / tweet_utils.py
Last active July 29, 2021 22:32
Python Utilities for Tweets
from datetime import datetime
import string
from nltk.stem.lancaster import LancasterStemmer
from nltk.corpus import stopwords
#Gets the tweet time.
def get_time(tweet):
return datetime.strptime(tweet['created_at'], "%a %b %d %H:%M:%S +0000 %Y")
@timothyrenner
timothyrenner / bigfoot_streamlit_app.py
Last active June 23, 2020 13:16
Bigfoot Sightings Streamlit App
import streamlit as st
import pandas as pd
import altair as alt
import pydeck as pdk
import os
from dateutil.parser import parse
try:
from dotenv import load_dotenv, find_dotenv
@timothyrenner
timothyrenner / gfl_with_h3.py
Created December 11, 2019 13:26
Graph Fused Lasso + H3
from h3 import h3
from pygfl.easy import solve_gfl
def build_neighbor_edges(hexids):
# Hash the hexid to the position so we can easily look
# up where the original hexid position is in the array.
hexid_to_position = {h:ii for ii,h in enumerate(hexids)}
edges = []
for h in hexids:
for n in h3.k_ring(h,1):
@timothyrenner
timothyrenner / pyspark_pandas_udf_call.py
Created January 30, 2019 22:26
Pyspark Pandas UDF Call
data_frame.withColumn(
"prediction",
predict_pandas_udf(col("feature1"), col("feature2"), ...)
)
@timothyrenner
timothyrenner / pyspark_pandas_udf_creation.py
Created January 30, 2019 22:19
Pyspark Pandas UDF Creation
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
@pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*features):
""" Executes the prediction using numpy arrays.
@timothyrenner
timothyrenner / pyspark_partition_call.py
Created January 30, 2019 22:13
Pyspark Partition Call
my_data.rdd.mapPartitions(predict_partition).toDF()
@timothyrenner
timothyrenner / pyspark_partition_definition.py
Created January 30, 2019 16:06
Pyspark Partition Definition
import pandas as pd
# We'll need this handy list more than once. It enforces the
# column order required by the model.
FEATURES = ["feature1", "feature2", "feature3", ...]
def predict_partition(rows):
""" Calls a vectorized prediction by loading the partition into memory.
@timothyrenner
timothyrenner / pyspark_udf_call.py
Created January 30, 2019 16:02
Pyspark UDF Call
my_df.withColumn(
"predicted_score",
predict_udf(col("feature1"), col("feature2"), ...)
)
@timothyrenner
timothyrenner / pyspark_udf_creation.py
Created January 30, 2019 15:59
Pyspark UDF Creation
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
predict_udf = udf(predict, DoubleType())