Skip to content

Instantly share code, notes, and snippets.

View ychennay's full-sized avatar

Yu Chen ychennay

View GitHub Profile
@ychennay
ychennay / metaclass_example.py
Last active December 23, 2022 05:46
Example of metaclass
class ModelBase(type):
a_variable_from_metaclass = 3
_instances = dict() # keep track of the instances created per class
@classmethod
def __prepare__(metacls, name, bases, **kwargs): # called first
print(f"\nInside MetaClass __prepare__, name: {name}, bases: {bases}, metacls: {metacls}, kwargs: {kwargs}")
namespace = super().__prepare__(name, bases, **kwargs)
print(f"Returned namespace: {namespace}")
return namespace
@ychennay
ychennay / collab_filter.py
Created April 5, 2022 15:39
Collaborative Filter Example in Spark
import pandas as pd
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql import SparkSession, SQLContext
from sklearn.metrics.pairwise import cosine_similarity
if __name__ == "__main__": # run this by typing "python collaborative_filter.py"
app_name = "collab_filter_example"
# create a Spark context
spark = SparkSession.builder.master("local").appName(app_name).getOrCreate()
@ychennay
ychennay / saving_mlflow_model.py
Created August 5, 2021 19:08
Example of Saving an MLFlow
from sklearn.linear_model import ElasticNet
# these are internal wrapper/utility classes that we have developed to streamline the ML lifecycle process
from hs_mllib.model_lifecycle.packaging import MLModel, ScikitLearnModel
# this context MLFlow context manager allows experiment runs (parameters and metrics) to be tracked and easily queryable
with MLModel.mlflow.start_run() as run:
# data transformations and feature pre-processing code omitted (boiler-plate code)
...
@ychennay
ychennay / tempfile.py
Created August 8, 2021 17:30
temporary file implicit close example
from tempfile import NamedTemporaryFile
import os
def test():
tf = NamedTemporaryFile()
yield tf.name
if __name__ == "__main__":
@ychennay
ychennay / start_query.py
Created August 5, 2021 19:56
Starting Query
# this is a custom processor class that we create to handle real-time inference
# we'll show the skeleton code for it below
processor = RealTimeInferenceProcessor()
query = df.writeStream \
.option("checkpointLocation", "dbfs://pathToYourCheckpoint") \ # configure checkpointing in case of job failure
.foreachBatch(processor.process_batch) \ # for each micro-batch, apply this method
.outputMode("append") \
.start() # start the stream query
@ychennay
ychennay / process_micro_batch_pseudocode.py
Created August 5, 2021 19:53
Process MicroBatch OOP Example
from abc import ABC, abstractmethod
from pyspark.sql.dataframe import DataFrame as SparkFrame
class Processor(ABC):
@abstractmethod
def process_batch(self, df: SparkFrame, epochID: str)-> None:
raise NotImplementedError
class RealTimeInferenceProcessor(Processor):
@ychennay
ychennay / structured_streaming_kinesis_watermark.py
Created August 5, 2021 19:23
Spark Structured Streaming Kinesis Watermarks
from pyspark.sql.functions import window
# configure reading from the stream
kinesis_df = spark.readStream.format("kinesis")
.option("streamName", KINESIS_STREAM_NAME)
.option("region", AWS_REGION)
.option("roleArn", KINESIS_ACCESS_ROLE_ARN
.option("initialPosition", "latest")
.load()
@ychennay
ychennay / main.go
Created March 22, 2021 01:29
Simple Golang Desktop Cleaner
package main
import (
"fmt"
"log"
"os"
"os/user"
"path/filepath"
)
@ychennay
ychennay / data_descriptor_behavior.py
Created March 17, 2020 21:18
Data Descriptor Behavior
student = Student()
print(f"\nFirst access of name attribute on student: {student.name}")
student.__setattr__("name", "Yu Chen")
print(f"\n{student} instance symbol table: {student.__dict__}")
print(f"\nSecond access of name attribute on student: {student.name}")
@ychennay
ychennay / custom_find_all.py
Created April 6, 2020 16:14
Regex: Custom find all using re.search and groups()
import re
file = open("SSH_2k.log.txt")
logs = file.readlines() # returns a list of strings
find_events = r'sshd\[(24200|24206)\]: (.+)' #regex to parse out event messages from process ID 24200 or 24206
def find_all(expression, strings):
return [re.search(expression, string).groups() for string in strings
if re.search(expression, string)]