Skip to content

Instantly share code, notes, and snippets.

@russellpierce
Last active November 11, 2022 16:17
Show Gist options
  • Save russellpierce/1b32ba38cfac68e23efb350c5a135c86 to your computer and use it in GitHub Desktop.
Save russellpierce/1b32ba38cfac68e23efb350c5a135c86 to your computer and use it in GitHub Desktop.
DuckDb ds hang
docker run --rm -it python:3.10-buster /bin/bash
## Can't test
pyarrow == 5.0.0
## Happy
???
## Unhappy python:3.10-buster
pa.__version__='10.0.0'
duckdb.__version__='0.5.1'
fsspec.__version__='2022.11.0'
adlfs.__version__='2022.10.0'
## Unhappy r-rrunner (vscode) &
pa.__version__='10.0.0'
duckdb.__version__='0.5.1'
fsspec.__version__='2022.3.0'
adlfs.__version__='2021.10.0'
pa.__version__='10.0.0'
duckdb.__version__='0.5.1'
fsspec.__version__='2021.11.0'
adlfs.__version__='2021.10.0'
# requires an env var `azure_connection_string_secret` that is acceptable to adlfs
# tries four cases, local fs with and without partitioning and then the adlfs with and without partitioning
# it only seems to fail for the last case
import pyarrow as pa
from pyarrow.dataset import HivePartitioning
import pyarrow.parquet as pq
import duckdb
import fsspec
import datetime as dt
import os
from fsspec.implementations.local import LocalFileSystem
fs_local = LocalFileSystem()
import adlfs
azure_fs = adlfs.AzureBlobFileSystem(connection_string=os.getenv("azure_connection_string_secret"))
# need to write example files
base_dir = "ds-general-cool/russellrepex"
con = duckdb.connect()
print(f"{pa.__version__=}")
print(f"{duckdb.__version__=}")
print(f"{fsspec.__version__=}")
print(f"{adlfs.__version__=}")
# Create fake data
example_dag_time = dt.datetime.now().replace(hour=0,minute=0, second=0, microsecond=0)
example_dag_time_not_break = dt.datetime.now().replace(hour=1,minute=30, second=0, microsecond=0)
example_dict = {
"timestamp": [dt.datetime.now(), dt.datetime.now()-dt.timedelta(days=1), dt.datetime.now()],
"stringfield": ["a", "b", "dsadsa"],
"floatfield": [1, None, 3.14],
"DAG_TIME": [example_dag_time,example_dag_time, example_dag_time_not_break]
}
df_a = pa.Table.from_pydict(example_dict)
example_dict_2 = {
"timestamp": [dt.datetime.now()-dt.timedelta(days=2), dt.datetime.now()-dt.timedelta(days=3), dt.datetime.now()],
"stringfield": ["a", None, "example_dict_2"],
"floatfield": [1, None, 3.14],
"DAG_TIME": [example_dag_time-dt.timedelta(days=1),example_dag_time-dt.timedelta(days=1), example_dag_time_not_break-dt.timedelta(days=1)]
}
df_b = pa.Table.from_pydict(example_dict_2)
# Define schema
pa_schema = pa.schema([("DAG_TIME", pa.timestamp('ms'))])
part_var = HivePartitioning(pa_schema)
for fs, name in zip([fs_local,azure_fs],["local","azure"]):
for do_partition in (False, True):
if do_partition:
part_arg = part_var
else:
part_arg = None
print(f"trying: {name} with partitioning as {do_partition}")
print("Writing data remotely")
# existing_data_behavior arg not available in pyarrow==6.0.0 but is in 10.0.0
pa.dataset.write_dataset(
df_a,
base_dir=base_dir,
partitioning = part_arg,
#partitioning=['DAG_TIME'],
#partitioning_flavor="hive",
format="parquet",
existing_data_behavior="delete_matching",
filesystem=fs
)
pa.dataset.write_dataset(
df_b,
base_dir=base_dir,
partitioning = part_arg,
#partitioning=['DAG_TIME'],
#partitioning_flavor="hive",
format="parquet",
existing_data_behavior="delete_matching",
filesystem=fs
)
print("Populating dataset")
example_ds=pa.dataset.dataset(
base_dir,
filesystem=fs,
partitioning=part_arg
)
# the data is there and loads
print("demonstrating full table load")
print(example_ds.to_table())
# duckdb is there and responsive
print("demonstrating duckdb connection")
print(con.execute("select 1").arrow())
# This command hangs - hard, no escape via ctrl-c --- under some package combos
print("Checking for hang")
print(con.execute("select * from example_ds").arrow())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment