Created
September 24, 2018 01:18
-
-
Save ian-whitestone/d615aa00c647d5ea4a84536383c8288b to your computer and use it in GitHub Desktop.
Generating fake data to compare dask and spark for reading avro files into a dataframe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Generate a bunch of fake avro data and upload to s3 | |
Running in python 3.7. Installed the following: | |
- pip install Faker | |
- pip install fastavro | |
- pip install boto3 | |
- pip install graphviz | |
- brew install graphviz | |
""" | |
import logging | |
import os | |
import time | |
from fastavro import writer, parse_schema | |
import boto3 | |
from faker import Faker | |
from dask import delayed, compute | |
from numpy import random | |
LOGGER = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
CREDIT_LIMITS = [0, 100, 500, 1000, 1500, 2000, 3000, 5000, 10000, 15000, 20000] | |
BUCKET = 'dask-avro-data' | |
FAKE = Faker() | |
SESSION = boto3.Session(profile_name='default') | |
CLIENT = SESSION.client('s3') | |
SCHEMA = { | |
'name': 'application', | |
'type': 'record', | |
'fields': [ | |
{ | |
'name': 'payload', | |
'type': { | |
'type': 'record', | |
'name': 'payload', | |
'fields': [ | |
{'name': 'applicationId', 'type': 'string'}, | |
{'name': 'originationCountryCode', 'type': 'string'}, | |
{'name': 'creationTimestamp', 'type': 'string'}, | |
{ | |
'name': 'applicant', | |
'type': { | |
'name': 'applicant', | |
'type': 'record', | |
'fields': [ | |
{'name': 'name', 'type': 'string'}, | |
{'name': 'addressLine1', 'type': 'string'}, | |
{'name': 'zip', 'type': 'string'}, | |
{'name': 'city', 'type': 'string'}, | |
{'name': 'state', 'type': 'string'}, | |
{'name': 'country', 'type': 'string'} | |
] | |
} | |
}, | |
{ | |
'name': 'phoneNumbers', | |
'type': { | |
'name': 'phoneNumber', | |
'type': 'array', | |
'items': { | |
'name': 'Child', | |
'type': 'record', | |
'fields': [ | |
{'name': 'type', 'type': 'string'}, | |
{'name': 'value', 'type': 'string'} | |
] | |
} | |
} | |
}, | |
{'name': 'approved', 'type': 'boolean'}, | |
{'name': 'creditLimit', 'type': 'int'}, | |
] | |
} | |
}, | |
{ | |
'name': 'metadata', | |
'type': { | |
'type': 'record', | |
'name': 'metadata', | |
'fields': [ | |
{'name': 'eventTimestamp', 'type': 'string'}, | |
{'name': 'ruleId', 'type': 'int'}, | |
{'name': 'rulePass', 'type': 'boolean'}, | |
] | |
} | |
} | |
], | |
} | |
PARSED_SCHEMA = parse_schema(SCHEMA) | |
def upload_to_s3(bucket, fpath, prefix=''): | |
"""Upload a file to s3. | |
Parameters | |
---------- | |
bucket : str | |
Name of the S3 bucket | |
fpath : str | |
Path to the file | |
prefix : str | |
Prefix to add to filename to create key | |
Returns | |
-------- | |
str | |
Return the filepath so the next function in the task graph | |
will be reliant on upload_to_s3 | |
""" | |
fname = os.path.basename(fpath) | |
key = os.path.join(prefix, fname) | |
with open(fpath, 'rb') as f_in: | |
CLIENT.put_object(Bucket=bucket, Key=key, Body=f_in) | |
return fpath | |
def parse_address(address): | |
"""Parse an address | |
Parameters | |
---------- | |
address : str | |
Address generated from Faker().address() | |
Returns | |
-------- | |
dict | |
Dict containing the following keys: addressLine1, city, state, | |
zip, country | |
""" | |
address_dict = {} | |
address_dict['addressLine1'] = address.split('\n')[0] | |
line_2 = address.split('\n')[1] | |
if len(line_2.split(',')) > 1: | |
address_dict['city'] = line_2.split(',')[0] | |
address_dict['state'] = line_2.split(',')[1].strip().split(' ')[0] | |
address_dict['zip'] = line_2.split(',')[1].strip().split(' ')[1] | |
else: | |
address_dict['city'] = line_2.split()[0] | |
address_dict['state'] = line_2.split()[1].strip() | |
address_dict['zip'] = line_2.split()[2].strip() | |
return address_dict | |
def generate_records(id): | |
address = parse_address(FAKE.address()) | |
epoch = random.randint(1347517370000, 1397517370000)/1000 | |
time_fmt = '%Y-%m-%dT%H:%M:%S.%fZ' | |
creationTimestamp = time.strftime(time_fmt, time.localtime(epoch)) | |
eventTimestamp = time.strftime(time_fmt, time.localtime(epoch)) | |
records = [] | |
for x in range(0, random.randint(500, 1000)): | |
record = { | |
'payload': { | |
'applicationId': '{}-{}'.format(id, x), | |
'applicant': { | |
'name': FAKE.name(), | |
'addressLine1': address['addressLine1'], | |
'zip': address['zip'], | |
'city': address['city'], | |
'state': address['state'], | |
'country': 'USA' | |
}, | |
'originationCountryCode': 'USA', | |
'phoneNumbers': [ | |
{'type': 'home', 'value': '111-123-4321'}, | |
{'type': 'mobile', 'value': '999-123-1234'}, | |
], | |
'creationTimestamp': '2018-01-01T12:31:45.234Z', | |
'approved': bool(random.randint(0, 1)), | |
'creditLimit': random.choice(CREDIT_LIMITS) | |
}, | |
'metadata': { | |
'eventTimestamp': '2018-01-01T12:31:45.234Z', | |
'ruleId': 123, | |
'rulePass': True | |
} | |
} | |
records.append(record) | |
return records | |
def write_avro_records(id, records): | |
fpath = '{}.avro'.format(id) | |
with open(fpath, 'wb') as out: | |
writer(out, PARSED_SCHEMA, records) | |
return fpath | |
delayeds = [] | |
for x in range(0, 100000): # approximately 20 GB? | |
records = delayed(generate_records)(x) | |
fpath = delayed(write_avro_records)(x, records) | |
fpath = delayed(upload_to_s3)(BUCKET, fpath, 'application') | |
res = delayed(os.remove)(fpath) | |
delayeds.append(res) | |
# result.visualize() | |
compute(delayeds, scheduler='threads', num_workers=10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment