Skip to content

Instantly share code, notes, and snippets.

@2tony2
Created July 5, 2024 16:15
Show Gist options
  • Save 2tony2/f52be97f86ebba112f9539c7e1eeb0f2 to your computer and use it in GitHub Desktop.
Save 2tony2/f52be97f86ebba112f9539c7e1eeb0f2 to your computer and use it in GitHub Desktop.
with connection:
if file_path is not None:
counter = 0
cursor_execution_config.command = f"PUT file://{snowflake_file_name}_{counter} @{full_qualified_stage_name} OVERWRITE = {overwrite}"
with open(file_path, "rb") as f:
if chunk_size == 0:
raise ValueError(
"Chunk size of 0 is not allowed for filestream pushing."
)
elif chunk_size > 0:
lines = []
for (
line
) in (
f
): # NOTE: This only works for file types that are splittable line by line like CSVs.
lines.append(line)
if len(lines) >= chunk_size:
counter += 1
filestream = io.BytesIO(b"".join(lines))
cursor = cursor.execute(
*cursor_execution_config.dict(),
file_stream=filestream,
)
lines = []
if lines:
counter += 1
filestream = io.BytesIO(b"".join(lines))
cursor = cursor.execute(
**cursor_execution_config.dict(),
file_stream=filestream,
)
debug(f"Final chunk of {len(lines)} lines pushed.")
else:
cursor = cursor.execute(**cursor_execution_config.dict())
if isinstance(cursor, SnowflakeCursor):
# SnowflakeExecuteEvent(vars(self.cursor)).log()
logger.info(f"cursor is {cursor}")
else:
logger.info(f"SQL execution results is {cursor}")
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment