Skip to content

Instantly share code, notes, and snippets.

@mpetr1
Last active May 15, 2026 12:58
Show Gist options
  • Select an option

  • Save mpetr1/3808fe9bcd92365fc0c6cffc650dec39 to your computer and use it in GitHub Desktop.

Select an option

Save mpetr1/3808fe9bcd92365fc0c6cffc650dec39 to your computer and use it in GitHub Desktop.
Load Amazon product data into dlt warehouses (Postgres, BigQuery, Snowflake, DuckDB) via amazonscraperapi.com

Amazon Scraper API → dlt

A working dlt pipeline that loads Amazon product, search, and batch data into any dlt destination (Postgres, BigQuery, Snowflake, DuckDB, Redshift, MotherDuck, etc.) via amazonscraperapi.com.

Prerequisites

pip install "dlt[duckdb]" dlt-init
# or for production:
# pip install "dlt[postgres]"
# pip install "dlt[bigquery]"
# pip install "dlt[snowflake]"

You'll need an Amazon Scraper API key. Sign up at https://amazonscraperapi.com — the free tier gives you 1000 requests with no card. Set it in your environment:

export AMAZON_SCRAPER_API_KEY=your_key_here

Run

python amazon_scraper_api_dlt_config.py

By default this writes to a local DuckDB file (amazon_scraper.duckdb). To swap destinations, edit the destination= argument in pipeline = dlt.pipeline(...):

# Postgres
pipeline = dlt.pipeline(
    pipeline_name="amazon_scraper",
    destination="postgres",
    dataset_name="amazon_data",
)
# requires DESTINATION__POSTGRES__CREDENTIALS=postgresql://... in env or .dlt/secrets.toml
# BigQuery
pipeline = dlt.pipeline(
    pipeline_name="amazon_scraper",
    destination="bigquery",
    dataset_name="amazon_data",
)
# requires DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID=... etc. or a service-account JSON

See https://dlthub.com/docs/dlt-ecosystem/destinations for full destination credential patterns.

What this loads

The pipeline as written has two resources:

Resource Endpoint Primary key Write disposition
products GET /v1/amazon/product asin merge
search_results POST /v1/amazon/search query, asin, position append

products uses dlt's resolve to walk a seed list of ASINs. By default the seed list is empty — you'll want to either:

  • Hardcode an ASIN list inline in the config, or
  • Replace the asin_seeds resource with a real source (a Postgres query, a CSV file, the output of the search endpoint, etc.).

Example with a hardcoded seed list:

import dlt

@dlt.resource(name="asin_seeds", selected=False)
def asin_seeds():
    for asin in ["B09HN3Q81F", "B000ALVUM6", "B08N5WRWNW"]:
        yield {"asin": asin}

…then replace the asin_seeds block in the rest_api config with this resource.

Known limitations

Batch endpoint not natively supported

The /v1/amazon/batch endpoint accepts up to 1000 ASINs per call and returns a job_id that you poll for completion. dlt's rest_api source can't natively model the async job pattern. Use a custom dlt.resource for this:

import time
import dlt
import requests

@dlt.resource(name="batch_lookup", primary_key="asin", write_disposition="merge")
def batch_lookup(api_key=dlt.secrets.value, asins=None, marketplace="US", poll_interval=3):
    headers = {"X-API-Key": api_key}
    base = "https://api.amazonscraperapi.com/v1/amazon"

    # 1. Submit the batch
    resp = requests.post(f"{base}/batch", headers=headers,
                         json={"asins": asins, "marketplace": marketplace})
    resp.raise_for_status()
    job_id = resp.json()["job_id"]

    # 2. Poll until done
    while True:
        status = requests.get(f"{base}/batch/{job_id}", headers=headers).json()
        if status["status"] == "completed":
            break
        if status["status"] == "failed":
            raise RuntimeError(f"batch {job_id} failed: {status.get('error')}")
        time.sleep(poll_interval)

    # 3. Yield results
    results = requests.get(f"{base}/batch/{job_id}/results", headers=headers).json()
    yield from results["products"]

Then add batch_lookup(asins=["B09HN3Q81F", ...]) to your pipeline run alongside the rest_api source.

Marketplace schema differences

The product endpoint returns subtly different field sets across marketplaces. For example, price.was and subscribe_save_price are US-only; tax_inclusive_price is EU-only. If you load multiple marketplaces into one table, dlt's schema evolution will union the columns, but you should set a marketplace column on every row downstream of dlt so analytics queries can scope correctly.

Rate limits

The API enforces concurrency limits per plan tier. dlt's rest_api source doesn't expose retry configuration directly — for a long pipeline run you may want to wrap the source in dlt's parallelism settings and set max_workers to match your API plan.

Status

This config is the "Plan A" fallback while a verified-source proposal is open at dlt-hub/verified-sources#684. If the proposal is accepted, this file will be superseded by an official dlt init amazon_scraper_api duckdb template.

Links

"""
Amazon Scraper API → dlt pipeline.
Usage:
pip install dlt[duckdb,postgres,bigquery] dlt-init
# Set your API key:
export AMAZON_SCRAPER_API_KEY=your_key_here # get one at https://amazonscraperapi.com (1000 free)
python amazon_scraper_api_dlt_config.py
Loads Amazon product, search, and batch data into your warehouse of choice.
"""
import os
import dlt
from dlt.sources.rest_api import rest_api_source
# Build the rest_api config inline so it's a single self-contained file
source = rest_api_source({
"client": {
"base_url": "https://api.amazonscraperapi.com/v1/",
"auth": {
"type": "api_key",
"name": "X-API-Key",
"api_key": os.environ["AMAZON_SCRAPER_API_KEY"],
"location": "header",
},
"headers": {"User-Agent": "dlt-amazon-scraper/0.1"},
},
"resource_defaults": {
"primary_key": "asin",
"write_disposition": "merge",
},
"resources": [
{
"name": "products",
"endpoint": {
"path": "amazon/product",
"method": "GET",
"params": {
"asin": {"type": "resolve", "resource": "asin_seeds", "field": "asin"},
"marketplace": "US",
},
"data_selector": "$", # response is the product object itself
},
},
{
"name": "asin_seeds",
# Static list of ASINs to seed - replace with your own ASIN list / DB query
"endpoint": {
"path": "_seeds", # this resource is filled by user code, not the API
},
"selected": False, # internal use only
},
{
"name": "search_results",
"endpoint": {
"path": "amazon/search",
"method": "POST",
"json": {"query": "wireless headphones", "marketplace": "US", "page": 1},
"data_selector": "results",
},
"primary_key": ["query", "asin", "position"],
"write_disposition": "append",
},
# For batch, see the notes in README.md - rest_api source can't natively handle the
# async job-id + poll pattern. Use a custom dlt.resource for that.
],
})
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="amazon_scraper",
destination="duckdb",
dataset_name="amazon_data",
)
info = pipeline.run(source)
print(info)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment