Skip to content

Instantly share code, notes, and snippets.

ychennay /
Created Apr 5, 2022
Collaborative Filter Example in Spark
import pandas as pd
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql import SparkSession, SQLContext
from sklearn.metrics.pairwise import cosine_similarity
if __name__ == "__main__": # run this by typing "python"
app_name = "collab_filter_example"
# create a Spark context
spark = SparkSession.builder.master("local").appName(app_name).getOrCreate()
ychennay /
Created Aug 8, 2021
temporary file implicit close example
from tempfile import NamedTemporaryFile
import os
def test():
tf = NamedTemporaryFile()
if __name__ == "__main__":
# this is a custom processor class that we create to handle real-time inference
# we'll show the skeleton code for it below
processor = RealTimeInferenceProcessor()
query = df.writeStream \
.option("checkpointLocation", "dbfs://pathToYourCheckpoint") \ # configure checkpointing in case of job failure
.foreachBatch(processor.process_batch) \ # for each micro-batch, apply this method
.outputMode("append") \
.start() # start the stream query
from abc import ABC, abstractmethod
from pyspark.sql.dataframe import DataFrame as SparkFrame
class Processor(ABC):
def process_batch(self, df: SparkFrame, epochID: str)-> None:
raise NotImplementedError
class RealTimeInferenceProcessor(Processor):
ychennay /
Created Aug 5, 2021
Spark Structured Streaming Kinesis Watermarks
from pyspark.sql.functions import window
# configure reading from the stream
kinesis_df = spark.readStream.format("kinesis")
.option("streamName", KINESIS_STREAM_NAME)
.option("region", AWS_REGION)
.option("roleArn", KINESIS_ACCESS_ROLE_ARN
.option("initialPosition", "latest")
ychennay /
Created Aug 5, 2021
Example of Saving an MLFlow
from sklearn.linear_model import ElasticNet
# these are internal wrapper/utility classes that we have developed to streamline the ML lifecycle process
from hs_mllib.model_lifecycle.packaging import MLModel, ScikitLearnModel
# this context MLFlow context manager allows experiment runs (parameters and metrics) to be tracked and easily queryable
with MLModel.mlflow.start_run() as run:
# data transformations and feature pre-processing code omitted (boiler-plate code)
ychennay / main.go
Created Mar 22, 2021
Simple Golang Desktop Cleaner
View main.go
package main
import (
ychennay /
Created Apr 6, 2020
Regex: Custom find all using and groups()
import re
file = open("SSH_2k.log.txt")
logs = file.readlines() # returns a list of strings
find_events = r'sshd\[(24200|24206)\]: (.+)' #regex to parse out event messages from process ID 24200 or 24206
def find_all(expression, strings):
return [, string).groups() for string in strings
if, string)]
class cached_property:
Decorator that converts a method with a single self argument into a
property cached on the instance.
A cached property can be made out of an existing method:
(e.g. ``url = cached_property(get_absolute_url)``).
# ...
def __get__(self, instance, cls=None):
class QuerySet:
"""Represent a lazy database lookup for a set of objects."""
# ...
def ordered(self):
Return True if the QuerySet is ordered -- i.e. has an order_by()
clause or a default ordering on the model (or is empty).
if isinstance(self, EmptyQuerySet):