Skip to content

Instantly share code, notes, and snippets.

@kvnkho
Created October 26, 2022 04:21
Show Gist options
  • Save kvnkho/7d8bc4c9dffd38705cc095e8dedc7896 to your computer and use it in GitHub Desktop.
Save kvnkho/7d8bc4c9dffd38705cc095e8dedc7896 to your computer and use it in GitHub Desktop.
Dask orchestrating DuckDB jobs
import pandas as pd
df = pd.DataFrame({"col1": [1,2,3], "col2": ["a", "b", "c"]})
df2 = pd.DataFrame({"col1": [1,2,3], "col2": ["d", "e", "f"]})
df.to_parquet("/tmp/test1.parquet")
df2.to_parquet("/tmp/test2.parquet")
from fugue_sql import fsql
from typing import Iterable, List, Any, Dict
def myquery(df: List[List[Any]]) -> pd.DataFrame:
# assume data is already partitioned by id and filename
_id, _path = df[0]
query = """
df = LOAD '{{filepath}}'
SELECT *
FROM df
WHERE col1 = 2
YIELD DATAFRAME AS result
"""
res = fsql(query, filepath=_path).run("duck")
return res["result"].as_pandas()
#test
myquery([["file1", "/tmp/test1.parquet"]])
# Dask call (will work on Spark and Ray)
from fugue import transform
list_of_files = [["file1", "/tmp/test1.parquet"], ["file2", "/tmp/test2.parquet"]]
file_paths = pd.DataFrame(list_of_files, columns=["id", "filepath"])
transform(file_paths, myquery, schema="col1:int, col2:str", partition="per_row", engine="dask").compute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment