Skip to content

Instantly share code, notes, and snippets.

@englehardt
Created August 16, 2019 20:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save englehardt/f8f0a6c325b16c7bc8750c5bed37edcd to your computer and use it in GitHub Desktop.
Save englehardt/f8f0a6c325b16c7bc8750c5bed37edcd to your computer and use it in GitHub Desktop.
A file for converting OpenWPM sqlite databases to parquet on S3. This also requires the appropriate `parquet_schema.py` file that matches the sqlite schema. See: https://github.com/mozilla/OpenWPM/blob/master/automation/DataAggregator/parquet_schema.py
""" This script reads a sqlite database and writes the content to a parquet
database on S3 formatted as OpenWPM would format. It's best to just run this
on AWS as it bottlenecks on the S3 upload. This is a lightly modified version
of OpenWPM's S3Aggregator class.
"""
import os
import sqlite3
import sys
from collections import defaultdict
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
import six
from parquet_schema import PQ_SCHEMAS
from pyarrow.filesystem import S3FSWrapper # noqa
from tqdm import tqdm
BATCH_SIZE = 500
SITE_VISITS_INDEX = '_site_visits_index'
BUCKET = 'openwpm-crawls'
class Sqlite2Parquet(object):
def __init__(self, sqlite_db, s3_directory):
self.dir = s3_directory
self._records = defaultdict(list) # maps visit_id and table to records
self._batches = dict() # maps table_name to a list of batches
self._visit_ids = set()
self._bucket = BUCKET
self._s3 = boto3.client('s3')
self._s3_resource = boto3.resource('s3')
self._fs = s3fs.S3FileSystem()
self._s3_bucket_uri = 's3://%s/%s/visits/%%s' % (
self._bucket, self.dir)
self._sqlite_connection = sqlite3.connect(sqlite_db)
self._sqlite_connection.row_factory = sqlite3.Row
self._sqlite_cursor = self._sqlite_connection.cursor()
return
def _fetchiter(self, arraysize=10000):
"""Generator for sqlite row results"""
while True:
rows = self._sqlite_cursor.fetchmany(arraysize)
if rows == []:
break
for row in rows:
yield row
def _write_record(self, table, data):
"""Insert data into a RecordBatch"""
# Add nulls
for item in PQ_SCHEMAS[table].names:
if item not in data:
data[item] = None
# Add instance_id (for partitioning)
data['instance_id'] = data['crawl_id']
self._records[table].append(data)
def _upload_batch_to_s3(self):
"""Copy in-memory data to s3"""
for table_name, data in self._records.items():
if table_name not in self._batches:
self._batches[table_name] = list()
try:
df = pd.DataFrame(data)
batch = pa.RecordBatch.from_pandas(
df, schema=PQ_SCHEMAS[table_name], preserve_index=False
)
self._batches[table_name].append(batch)
print(
"\nSuccessfully created batch for table %s, "
"consisting of %d records." % (table_name, len(data))
)
except pa.lib.ArrowInvalid as e:
print(
"Error while creating record batch:\n%s\n%s\n%s\n"
% (table_name, type(e), e)
)
pass
self._records = defaultdict(list)
for table_name, batches in self._batches.items():
try:
table = pa.Table.from_batches(batches)
pq.write_to_dataset(
table, self._s3_bucket_uri % table_name,
filesystem=self._fs,
preserve_index=False,
partition_cols=['instance_id'],
compression='snappy',
flavor='spark'
)
print(
"\nSuccessfully uploaded batch for table %s, "
"to file %s." %
(table_name, self._s3_bucket_uri % table_name)
)
except pa.lib.ArrowInvalid as e:
print(
"Error while sending record:\n%s\n%s\n%s\n"
% (table_name, type(e), e)
)
pass
self._batches[table_name] = list()
def _process_record(self, table, data):
"""Add `record` to database"""
# Upload data every 500 sites
self._visit_ids.add(data['visit_id'])
if len(self._visit_ids) > BATCH_SIZE:
self._upload_batch_to_s3()
self._visit_ids = set()
# Convert data to text type
for k, v in data.items():
if isinstance(v, six.binary_type):
data[k] = six.text_type(v, errors='ignore')
elif callable(v):
data[k] = six.text_type(v)
# Save record to disk
self._write_record(table, data)
def process_table(self, table):
self._sqlite_cursor.execute("SELECT MAX(id) as max_id FROM %s" % table)
total_rows = self._sqlite_cursor.fetchone()['max_id']
self._sqlite_cursor.execute("SELECT * FROM %s" % table)
pbar = tqdm(total=total_rows)
for row in self._fetchiter():
pbar.update(1)
self._process_record(table, dict(row))
self._upload_batch_to_s3()
def close(self):
self._sqlite_connection.close()
def main():
tables = [
'javascript',
'http_requests',
'http_responses',
'http_redirects',
'site_visits'
]
if len(sys.argv) < 2:
print(
"Usage: sqlite2parquet.py [SQLITE_DB] [S3 FOLDER] "
"(optional: [TABLES])"
)
sys.exit(1)
converter = Sqlite2Parquet(
os.path.expanduser(sys.argv[1]),
sys.argv[2]
)
if len(sys.argv) >= 4:
tables = sys.argv[3].split(',')
print("Processing tables: %s" % tables)
else:
print("Processing default set of tables: %s" % tables)
for table in tables:
print("\nStarting processing of %s" % table)
converter.process_table(table)
converter.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment