Skip to content

Instantly share code, notes, and snippets.

@ian-whitestone
Created September 24, 2018 01:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ian-whitestone/d615aa00c647d5ea4a84536383c8288b to your computer and use it in GitHub Desktop.
Save ian-whitestone/d615aa00c647d5ea4a84536383c8288b to your computer and use it in GitHub Desktop.
Generating fake data to compare dask and spark for reading avro files into a dataframe
"""Generate a bunch of fake avro data and upload to s3
Running in python 3.7. Installed the following:
- pip install Faker
- pip install fastavro
- pip install boto3
- pip install graphviz
- brew install graphviz
"""
import logging
import os
import time
from fastavro import writer, parse_schema
import boto3
from faker import Faker
from dask import delayed, compute
from numpy import random
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
CREDIT_LIMITS = [0, 100, 500, 1000, 1500, 2000, 3000, 5000, 10000, 15000, 20000]
BUCKET = 'dask-avro-data'
FAKE = Faker()
SESSION = boto3.Session(profile_name='default')
CLIENT = SESSION.client('s3')
SCHEMA = {
'name': 'application',
'type': 'record',
'fields': [
{
'name': 'payload',
'type': {
'type': 'record',
'name': 'payload',
'fields': [
{'name': 'applicationId', 'type': 'string'},
{'name': 'originationCountryCode', 'type': 'string'},
{'name': 'creationTimestamp', 'type': 'string'},
{
'name': 'applicant',
'type': {
'name': 'applicant',
'type': 'record',
'fields': [
{'name': 'name', 'type': 'string'},
{'name': 'addressLine1', 'type': 'string'},
{'name': 'zip', 'type': 'string'},
{'name': 'city', 'type': 'string'},
{'name': 'state', 'type': 'string'},
{'name': 'country', 'type': 'string'}
]
}
},
{
'name': 'phoneNumbers',
'type': {
'name': 'phoneNumber',
'type': 'array',
'items': {
'name': 'Child',
'type': 'record',
'fields': [
{'name': 'type', 'type': 'string'},
{'name': 'value', 'type': 'string'}
]
}
}
},
{'name': 'approved', 'type': 'boolean'},
{'name': 'creditLimit', 'type': 'int'},
]
}
},
{
'name': 'metadata',
'type': {
'type': 'record',
'name': 'metadata',
'fields': [
{'name': 'eventTimestamp', 'type': 'string'},
{'name': 'ruleId', 'type': 'int'},
{'name': 'rulePass', 'type': 'boolean'},
]
}
}
],
}
PARSED_SCHEMA = parse_schema(SCHEMA)
def upload_to_s3(bucket, fpath, prefix=''):
"""Upload a file to s3.
Parameters
----------
bucket : str
Name of the S3 bucket
fpath : str
Path to the file
prefix : str
Prefix to add to filename to create key
Returns
--------
str
Return the filepath so the next function in the task graph
will be reliant on upload_to_s3
"""
fname = os.path.basename(fpath)
key = os.path.join(prefix, fname)
with open(fpath, 'rb') as f_in:
CLIENT.put_object(Bucket=bucket, Key=key, Body=f_in)
return fpath
def parse_address(address):
"""Parse an address
Parameters
----------
address : str
Address generated from Faker().address()
Returns
--------
dict
Dict containing the following keys: addressLine1, city, state,
zip, country
"""
address_dict = {}
address_dict['addressLine1'] = address.split('\n')[0]
line_2 = address.split('\n')[1]
if len(line_2.split(',')) > 1:
address_dict['city'] = line_2.split(',')[0]
address_dict['state'] = line_2.split(',')[1].strip().split(' ')[0]
address_dict['zip'] = line_2.split(',')[1].strip().split(' ')[1]
else:
address_dict['city'] = line_2.split()[0]
address_dict['state'] = line_2.split()[1].strip()
address_dict['zip'] = line_2.split()[2].strip()
return address_dict
def generate_records(id):
address = parse_address(FAKE.address())
epoch = random.randint(1347517370000, 1397517370000)/1000
time_fmt = '%Y-%m-%dT%H:%M:%S.%fZ'
creationTimestamp = time.strftime(time_fmt, time.localtime(epoch))
eventTimestamp = time.strftime(time_fmt, time.localtime(epoch))
records = []
for x in range(0, random.randint(500, 1000)):
record = {
'payload': {
'applicationId': '{}-{}'.format(id, x),
'applicant': {
'name': FAKE.name(),
'addressLine1': address['addressLine1'],
'zip': address['zip'],
'city': address['city'],
'state': address['state'],
'country': 'USA'
},
'originationCountryCode': 'USA',
'phoneNumbers': [
{'type': 'home', 'value': '111-123-4321'},
{'type': 'mobile', 'value': '999-123-1234'},
],
'creationTimestamp': '2018-01-01T12:31:45.234Z',
'approved': bool(random.randint(0, 1)),
'creditLimit': random.choice(CREDIT_LIMITS)
},
'metadata': {
'eventTimestamp': '2018-01-01T12:31:45.234Z',
'ruleId': 123,
'rulePass': True
}
}
records.append(record)
return records
def write_avro_records(id, records):
fpath = '{}.avro'.format(id)
with open(fpath, 'wb') as out:
writer(out, PARSED_SCHEMA, records)
return fpath
delayeds = []
for x in range(0, 100000): # approximately 20 GB?
records = delayed(generate_records)(x)
fpath = delayed(write_avro_records)(x, records)
fpath = delayed(upload_to_s3)(BUCKET, fpath, 'application')
res = delayed(os.remove)(fpath)
delayeds.append(res)
# result.visualize()
compute(delayeds, scheduler='threads', num_workers=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment