Skip to content

Instantly share code, notes, and snippets.

View kvnkho's full-sized avatar
👋
Feel free to message me. Contact info in profile

Kevin Kho kvnkho

👋
Feel free to message me. Contact info in profile
View GitHub Profile
from fugue import transform
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
results = transform(df,
transform_img,
schema="*",
engine=spark)
import requests
from typing import Any, Dict, Iterable
from PIL import Image
from io import BytesIO
def transform_img(df: List[Dict[str, Any]]) -> Iterable[str, Any]:
for row in df:
try:
response = requests.get(row["ImgUrl"], timeout=5)
img = Image.open(BytesIO(response.content))
@kvnkho
kvnkho / dask_duck.py
Created October 26, 2022 04:21
Dask orchestrating DuckDB jobs
import pandas as pd
df = pd.DataFrame({"col1": [1,2,3], "col2": ["a", "b", "c"]})
df2 = pd.DataFrame({"col1": [1,2,3], "col2": ["d", "e", "f"]})
df.to_parquet("/tmp/test1.parquet")
df2.to_parquet("/tmp/test2.parquet")
from fugue_sql import fsql
from typing import Iterable, List, Any, Dict
fugue_profile(dask_df, partition={"by":["a","b"]}, engine=client)
import coiled
coiled.create_software_environment(
name="profiling",
pip=["fugue[dask]", "whylogs"],
)
from dask.distributed import Client
from coiled import Cluster
from whylogs.api.fugue import fugue_profile
cluster = Cluster(name="quickstart", software="profiling", n_workers=2)
client = Client(cluster)
fugue_profile(df, engine=client).to_pandas()
fugue_profile(spark_df, partition={"by":["a","b"]}, engine=spark_session)
from whylogs.api.fugue import fugue_profile
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
fugue_profile(spark_df, engine=spark)
from whylogs.api.fugue import fugue_profile
fugue_profile(pandas_df).to_pandas()
import pandas as pd
data = {
"animal": ["cat", "hawk", "snake", "cat"],
"legs": [4, 2, 0, 4],
"weight": [4.3, 1.8, 1.3, 4.1],
}
df = pd.DataFrame(data)
import whylogs as why