Skip to content

Instantly share code, notes, and snippets.

@ychennay
ychennay / tempfile.py
Created Aug 8, 2021
temporary file implicit close example
View tempfile.py
from tempfile import NamedTemporaryFile
import os
def test():
tf = NamedTemporaryFile()
yield tf.name
if __name__ == "__main__":
View start_query.py
# this is a custom processor class that we create to handle real-time inference
# we'll show the skeleton code for it below
processor = RealTimeInferenceProcessor()
query = df.writeStream \
.option("checkpointLocation", "dbfs://pathToYourCheckpoint") \ # configure checkpointing in case of job failure
.foreachBatch(processor.process_batch) \ # for each micro-batch, apply this method
.outputMode("append") \
.start() # start the stream query
View process_micro_batch_pseudocode.py
from abc import ABC, abstractmethod
from pyspark.sql.dataframe import DataFrame as SparkFrame
class Processor(ABC):
@abstractmethod
def process_batch(self, df: SparkFrame, epochID: str)-> None:
raise NotImplementedError
class RealTimeInferenceProcessor(Processor):
@ychennay
ychennay / structured_streaming_kinesis_watermark.py
Created Aug 5, 2021
Spark Structured Streaming Kinesis Watermarks
View structured_streaming_kinesis_watermark.py
from pyspark.sql.functions import window
# configure reading from the stream
kinesis_df = spark.readStream.format("kinesis")
.option("streamName", KINESIS_STREAM_NAME)
.option("region", AWS_REGION)
.option("roleArn", KINESIS_ACCESS_ROLE_ARN
.option("initialPosition", "latest")
.load()
@ychennay
ychennay / saving_mlflow_model.py
Created Aug 5, 2021
Example of Saving an MLFlow
View saving_mlflow_model.py
from sklearn.linear_model import ElasticNet
# these are internal wrapper/utility classes that we have developed to streamline the ML lifecycle process
from hs_mllib.model_lifecycle.packaging import MLModel, ScikitLearnModel
# this context MLFlow context manager allows experiment runs (parameters and metrics) to be tracked and easily queryable
with MLModel.mlflow.start_run() as run:
# data transformations and feature pre-processing code omitted (boiler-plate code)
...
@ychennay
ychennay / main.go
Created Mar 22, 2021
Simple Golang Desktop Cleaner
View main.go
package main
import (
"fmt"
"log"
"os"
"os/user"
"path/filepath"
)
@ychennay
ychennay / custom_find_all.py
Created Apr 6, 2020
Regex: Custom find all using re.search and groups()
View custom_find_all.py
import re
file = open("SSH_2k.log.txt")
logs = file.readlines() # returns a list of strings
find_events = r'sshd\[(24200|24206)\]: (.+)' #regex to parse out event messages from process ID 24200 or 24206
def find_all(expression, strings):
return [re.search(expression, string).groups() for string in strings
if re.search(expression, string)]
View cached_property.py
class cached_property:
'''
Decorator that converts a method with a single self argument into a
property cached on the instance.
A cached property can be made out of an existing method:
(e.g. ``url = cached_property(get_absolute_url)``).
'''
# ...
def __get__(self, instance, cls=None):
View queryset.py
class QuerySet:
"""Represent a lazy database lookup for a set of objects."""
# ...
@property
def ordered(self):
"""
Return True if the QuerySet is ordered -- i.e. has an order_by()
clause or a default ordering on the model (or is empty).
"""
if isinstance(self, EmptyQuerySet):
View model_from_db.py
class Model(metaclass=ModelBase):
# ...
@classmethod
def from_db(cls, db, field_names, values):
if len(values) != len(cls._meta.concrete_fields):
values_iter = iter(values)
values = [
next(values_iter) if f.attname in field_names else DEFERRED