Skip to content

Instantly share code, notes, and snippets.

@reizist
Created August 26, 2022 09:07
Show Gist options
  • Save reizist/15d1093843fcbce073c6e626d98056e8 to your computer and use it in GitHub Desktop.
Save reizist/15d1093843fcbce073c6e626d98056e8 to your computer and use it in GitHub Desktop.
特定のpartitionのみデータコピーをするためにpartitionごとにgcsに吐くマン
from datetime import datetime, timedelta
import subprocess
BUCKET = ""
TABLES = [
{ "dest_table": "project:dataset.table", "start": "2019/03/01", "end": "2019/03/02" },
]
def daterange(_start, _end):
for n in range((_end - _start).days):
yield _start + timedelta(n)
def load_to_gcs(table_name, src_table, target_date_nodash):
gcs_path = "gs://{bucket}/{table_name}/{target_date_nodash}".format(
bucket=BUCKET,
table_name=table_name,
target_date_nodash=target_date_nodash,
)
# gsutil rm -r {gcs_path};
cmd = """
bq extract --destination_format=NEWLINE_DELIMITED_JSON '{src_table}' '{gcs_path}/*.jsonl'
""".format(src_table=src_table, gcs_path=gcs_path)
print(cmd)
return subprocess.run(cmd, shell=True, check=True)
def main():
for table in TABLES:
start = datetime.strptime(table["start"], '%Y/%m/%d')
end = datetime.strptime(table["end"], '%Y/%m/%d')
table_name = table["dest_table"].split(".")[-1]
for date in daterange(start, end):
date_nodash = datetime.strftime(date, "%Y%m%d")
actual_dest_table = table["dest_table"] + "$" + date_nodash
load_to_gcs(table_name, actual_dest_table, date_nodash)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment