Last active
September 9, 2023 03:41
-
-
Save metadaddy/725fc8c20f2c4d3ac762a8754f366405 to your computer and use it in GitHub Desktop.
Python code for loading Drive Stats data into Backblaze B2 Cloud Storage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
import os | |
import re | |
from datetime import date | |
from io import BytesIO | |
from zipfile import ZipFile | |
import pyarrow as pa | |
import pyarrow.csv as csv | |
import pyarrow.fs as fs | |
import pyarrow.parquet as parquet | |
import requests | |
from dotenv import load_dotenv | |
# Destination bucket and prefix | |
DST_BUCKET = "drivestats-parquet" | |
DST_PREFIX = "drivestats/" | |
# Match the CSV filenames within the zip files | |
# Various forms: | |
# 2014/2014-06-25.csv | |
# data_Q1_2016/2016-03-08.csv | |
# 2017-03-01.csv | |
FILENAME_PATTERN = re.compile(r"^(?:.*/)?(\d\d\d\d)-(\d\d)-(\d\d).csv$") | |
# First three years had annual data | |
ANNUAL_DATA = [2013, 2014, 2015] | |
# Quarterly data starts Q1 2016 | |
FIRST_YEAR_OF_QUARTERLY_DATA = 2016 | |
# Path to Drives Stats files | |
DRIVE_STATS_PATH = "https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/" | |
# Schema | |
INITIAL_COLUMN_TYPES = { | |
"date": pa.date32(), | |
"serial_number": pa.string(), | |
"model": pa.string(), | |
"capacity_bytes": pa.int64(), | |
"failure": pa.int8() | |
} | |
def write_as_parquet(filesystem, year, month, table, index): | |
""" | |
Write the given PyArrow table as a single Parquet file into the partition | |
specified by the month and year | |
:param filesystem: PyArrow filesystem via which data will be written | |
:param year: Year of the data | |
:param month: Month of the data | |
:param table: PyArrow table containing data | |
:param index: Index for the file within the year and month | |
""" | |
key = f'year={year}/month={month}/{year}-{month}-{index}.parquet' | |
print(f'Writing {key}') | |
parquet.write_table(table, key, filesystem=filesystem) | |
def delete_dir_if_exists(filesystem, year, month): | |
path = f'year={year}/month={month}/' | |
result = filesystem.get_file_info(path) | |
file_info = result[0] if isinstance(result, list) else result | |
if not file_info.type == pa.fs.FileType.NotFound: | |
print(f'Deleting {path}') | |
filesystem.delete_dir(path) | |
def convert_zipped_csv(url, filesystem): | |
""" | |
Read Drive Stats data from the given URL and write it to one or more files | |
:param url: URL of a ZIP file containing CSV-formatted Drive Stats data files | |
:param filesystem: PyArrow filesystem via which data will be written | |
""" | |
print(f"Getting content from {url}") | |
response = requests.get(url) | |
response.raise_for_status() | |
content = response.content | |
myzip = ZipFile(BytesIO(content)) | |
index = None | |
current_year = None | |
current_month = None | |
current_table = None | |
for name in sorted(myzip.namelist()): | |
match = FILENAME_PATTERN.match(name) | |
if match: | |
print(f'Reading {name}') | |
table = csv.read_csv(myzip.open(name)) | |
if table.num_rows == 0: | |
# Some days have no data! | |
print(f'Skipping {name} - no data') | |
continue | |
(year, month, day) = match.groups() | |
# Replace the date column to workaround data format issues | |
table = table.drop(['date']) | |
table = table.add_column(0, 'date', [[date(int(year), int(month), int(day))] * table.num_rows]) | |
# Fix the schema | |
column_types = copy.deepcopy(INITIAL_COLUMN_TYPES) | |
for i in range(len(INITIAL_COLUMN_TYPES), len(table.schema.names)): | |
column_types[table.schema.names[i]] = pa.int64() | |
table = table.cast(target_schema=pa.schema(column_types)) | |
# Append the day column | |
table = table.append_column('day', [[int(day)] * table.num_rows]) | |
if current_year != year or current_month != month: | |
# We passed the end of the month. Delete the next month's data | |
delete_dir_if_exists(filesystem, year, month) | |
# Write out the current table and start a new month | |
if current_table: | |
write_as_parquet(filesystem, current_year, current_month, current_table, index) | |
current_table = table | |
current_year = year | |
current_month = month | |
index = 1 | |
else: | |
if current_table.schema.equals(table.schema): | |
# Same schema - concatenate the month so far with the current day | |
current_table = pa.concat_tables([current_table, table]) | |
else: | |
# Current day's schema differs from the month so far. Write out the | |
# month so far and start a new table. | |
print('Schema Change!') | |
print(f'Old schema: {len(current_table.schema.names)} fields', repr(current_table.schema)) | |
print(f'New schema: {len(table.schema.names)} fields', repr(table.schema)) | |
write_as_parquet(filesystem, current_year, current_month, current_table, index) | |
current_table = table | |
index += 1 | |
else: | |
print(f'Skipping {name}') | |
if current_table: | |
write_as_parquet(filesystem, current_year, current_month, current_table, index) | |
def main(): | |
# Never put credentials in source code! | |
load_dotenv() | |
# Extract region from endpoint - you could just configure it separately | |
endpoint_pattern = re.compile(r"^https://s3\.([a-zA-Z0-9-]+)\.backblazeb2\.com$") | |
region_match = endpoint_pattern.match(os.environ['B2_ENDPOINT']) | |
region_name = region_match.group(1) | |
# Instantiate a PyArrow file system to access B2 | |
b2 = fs.S3FileSystem( | |
access_key=os.environ['B2_APPLICATION_KEY_ID'], | |
secret_key=os.environ['B2_APPLICATION_KEY'], | |
endpoint_override=os.environ['B2_ENDPOINT'], | |
region=region_name, | |
background_writes=True | |
) | |
# Write to a specific bucket/prefix within B2 | |
destination = fs.SubTreeFileSystem(f"{DST_BUCKET}/{DST_PREFIX}", b2) | |
# Convert the annual data files | |
for year in ANNUAL_DATA: | |
path = f"{DRIVE_STATS_PATH}data_{year}.zip" | |
convert_zipped_csv(path, destination) | |
# Convert the quarterly data files | |
year = FIRST_YEAR_OF_QUARTERLY_DATA | |
# Loop until no more data | |
while True: | |
for quarter in range(1, 5): | |
path = f"{DRIVE_STATS_PATH}data_Q{quarter}_{year}.zip" | |
try: | |
convert_zipped_csv(path, destination) | |
except requests.HTTPError: | |
# We've run out of data | |
print(f"No data at {path} - exiting") | |
exit(0) | |
year += 1 | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment