Skip to content

Instantly share code, notes, and snippets.

@alexm1309
Created June 11, 2019 18:52
Show Gist options
  • Save alexm1309/7d7587c4cdcfb709491a1cffa770d682 to your computer and use it in GitHub Desktop.
Save alexm1309/7d7587c4cdcfb709491a1cffa770d682 to your computer and use it in GitHub Desktop.
import os
import boto3
import civis
import civis.utils
import pandas as pd
# # 1. Upload a File to S3
# We start by writing a sample CSV to an S3 bucket, so that we have a known
# path to import from later.
# Note: This assumes an AWS keypair as Civis credential environment variables
# for a credential called AWS
session = boto3.Session(
aws_access_key_id=os.getenv('AWS_USERNAME'),
aws_secret_access_key=os.getenv('AWS_PASSWORD')
)
s3 = session.resource('s3')
bucket_name = 'a-bucket'
bucket = s3.Bucket(bucket_name)
object_path = 'path/to/test_sal_upload.csv'
target_object = bucket.Object(object_path)
df = pd.DataFrame(
[[1, 2.3, 'four'], [2, 3.4, 'five'], [3, 4.5, 'six']],
columns=['foo', 'bar', 'baz']
)
target_object.put(Body=df.to_csv(path_or_buf=None, index=False).encode())
# # 2. Configure and Run Import From S3
# We leverage the `storage_paths` option for the CSV Imports API, providing it
# a previously created Storage Host and Credential ID for the bucket.
# Since we have not pre-created the table to import to, we provide the
# `table_columns` as a part of this import configuration.
client = civis.APIClient()
storage_host = client.storage_hosts.post(
provider='s3',
bucket=bucket_name,
name='A Bucket',
s3_options={
'region': 'us-east-1'
}
)
# Storage credentials can be created in the UI or API:
# For more info on creating credentials, see:
# https://civis.zendesk.com/hc/en-us/articles/115004403183-Credentials
CREDENTIAL_ID = 42
source = {
'storage_paths': {
'storage_host_id': storage_host.id,
'credential_id': CREDENTIAL_ID,
# Additional paths could be specified here, so this is a list
'file_paths': [object_path]
}
}
schema = 'scratch'
table = 'cloud_storage_test'
destination = {
'schema': schema,
'table': table,
'remote_host_id': client.get_database_id('redshift-general'),
'credential_id': client.default_credential,
}
table_columns = [
{'name': 'foo', 'sql_type': 'INT'},
{'name': 'bar', 'sql_type': 'FLOAT'},
{'name': 'baz', 'sql_type': 'VARCHAR(5)'}
]
import_ = client.imports.post_files_csv(
source,
destination,
first_row_is_header=True,
table_columns=table_columns,
existing_table_rows='drop'
)
print(f'Import job ID is {import_.id}')
future = civis.utils.run_job(import_.id, client=client)
future.result()
# # 3. Verify Success
# We can now read the table we have created above, and verify that it contains
# what we expect.
df = civis.io.read_civis(f'{schema}.{table}',
'redshift-general',
use_pandas=True)
# Caveats
# This endpoint is for _cleaned files only_ - your data should have
# consistent line endings, column counts, proper quoting, etc.
# For full reference on this endpoint, see:
# https://civis.zendesk.com/hc/en-us/articles/360027016851-Advanced-CSV-Imports-via-the-Civis-API
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment