Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Last active September 9, 2023 03:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save metadaddy/725fc8c20f2c4d3ac762a8754f366405 to your computer and use it in GitHub Desktop.
Save metadaddy/725fc8c20f2c4d3ac762a8754f366405 to your computer and use it in GitHub Desktop.
Python code for loading Drive Stats data into Backblaze B2 Cloud Storage
import copy
import os
import re
from datetime import date
from io import BytesIO
from zipfile import ZipFile
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.fs as fs
import pyarrow.parquet as parquet
import requests
from dotenv import load_dotenv
# Destination bucket and prefix
DST_BUCKET = "drivestats-parquet"
DST_PREFIX = "drivestats/"
# Match the CSV filenames within the zip files
# Various forms:
# 2014/2014-06-25.csv
# data_Q1_2016/2016-03-08.csv
# 2017-03-01.csv
FILENAME_PATTERN = re.compile(r"^(?:.*/)?(\d\d\d\d)-(\d\d)-(\d\d).csv$")
# First three years had annual data
ANNUAL_DATA = [2013, 2014, 2015]
# Quarterly data starts Q1 2016
FIRST_YEAR_OF_QUARTERLY_DATA = 2016
# Path to Drives Stats files
DRIVE_STATS_PATH = "https://f001.backblazeb2.com/file/Backblaze-Hard-Drive-Data/"
# Schema
INITIAL_COLUMN_TYPES = {
"date": pa.date32(),
"serial_number": pa.string(),
"model": pa.string(),
"capacity_bytes": pa.int64(),
"failure": pa.int8()
}
def write_as_parquet(filesystem, year, month, table, index):
"""
Write the given PyArrow table as a single Parquet file into the partition
specified by the month and year
:param filesystem: PyArrow filesystem via which data will be written
:param year: Year of the data
:param month: Month of the data
:param table: PyArrow table containing data
:param index: Index for the file within the year and month
"""
key = f'year={year}/month={month}/{year}-{month}-{index}.parquet'
print(f'Writing {key}')
parquet.write_table(table, key, filesystem=filesystem)
def delete_dir_if_exists(filesystem, year, month):
path = f'year={year}/month={month}/'
result = filesystem.get_file_info(path)
file_info = result[0] if isinstance(result, list) else result
if not file_info.type == pa.fs.FileType.NotFound:
print(f'Deleting {path}')
filesystem.delete_dir(path)
def convert_zipped_csv(url, filesystem):
"""
Read Drive Stats data from the given URL and write it to one or more files
:param url: URL of a ZIP file containing CSV-formatted Drive Stats data files
:param filesystem: PyArrow filesystem via which data will be written
"""
print(f"Getting content from {url}")
response = requests.get(url)
response.raise_for_status()
content = response.content
myzip = ZipFile(BytesIO(content))
index = None
current_year = None
current_month = None
current_table = None
for name in sorted(myzip.namelist()):
match = FILENAME_PATTERN.match(name)
if match:
print(f'Reading {name}')
table = csv.read_csv(myzip.open(name))
if table.num_rows == 0:
# Some days have no data!
print(f'Skipping {name} - no data')
continue
(year, month, day) = match.groups()
# Replace the date column to workaround data format issues
table = table.drop(['date'])
table = table.add_column(0, 'date', [[date(int(year), int(month), int(day))] * table.num_rows])
# Fix the schema
column_types = copy.deepcopy(INITIAL_COLUMN_TYPES)
for i in range(len(INITIAL_COLUMN_TYPES), len(table.schema.names)):
column_types[table.schema.names[i]] = pa.int64()
table = table.cast(target_schema=pa.schema(column_types))
# Append the day column
table = table.append_column('day', [[int(day)] * table.num_rows])
if current_year != year or current_month != month:
# We passed the end of the month. Delete the next month's data
delete_dir_if_exists(filesystem, year, month)
# Write out the current table and start a new month
if current_table:
write_as_parquet(filesystem, current_year, current_month, current_table, index)
current_table = table
current_year = year
current_month = month
index = 1
else:
if current_table.schema.equals(table.schema):
# Same schema - concatenate the month so far with the current day
current_table = pa.concat_tables([current_table, table])
else:
# Current day's schema differs from the month so far. Write out the
# month so far and start a new table.
print('Schema Change!')
print(f'Old schema: {len(current_table.schema.names)} fields', repr(current_table.schema))
print(f'New schema: {len(table.schema.names)} fields', repr(table.schema))
write_as_parquet(filesystem, current_year, current_month, current_table, index)
current_table = table
index += 1
else:
print(f'Skipping {name}')
if current_table:
write_as_parquet(filesystem, current_year, current_month, current_table, index)
def main():
# Never put credentials in source code!
load_dotenv()
# Extract region from endpoint - you could just configure it separately
endpoint_pattern = re.compile(r"^https://s3\.([a-zA-Z0-9-]+)\.backblazeb2\.com$")
region_match = endpoint_pattern.match(os.environ['B2_ENDPOINT'])
region_name = region_match.group(1)
# Instantiate a PyArrow file system to access B2
b2 = fs.S3FileSystem(
access_key=os.environ['B2_APPLICATION_KEY_ID'],
secret_key=os.environ['B2_APPLICATION_KEY'],
endpoint_override=os.environ['B2_ENDPOINT'],
region=region_name,
background_writes=True
)
# Write to a specific bucket/prefix within B2
destination = fs.SubTreeFileSystem(f"{DST_BUCKET}/{DST_PREFIX}", b2)
# Convert the annual data files
for year in ANNUAL_DATA:
path = f"{DRIVE_STATS_PATH}data_{year}.zip"
convert_zipped_csv(path, destination)
# Convert the quarterly data files
year = FIRST_YEAR_OF_QUARTERLY_DATA
# Loop until no more data
while True:
for quarter in range(1, 5):
path = f"{DRIVE_STATS_PATH}data_Q{quarter}_{year}.zip"
try:
convert_zipped_csv(path, destination)
except requests.HTTPError:
# We've run out of data
print(f"No data at {path} - exiting")
exit(0)
year += 1
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment