A working dlt pipeline that loads Amazon product, search, and batch data into any dlt destination (Postgres, BigQuery, Snowflake, DuckDB, Redshift, MotherDuck, etc.) via amazonscraperapi.com.
pip install "dlt[duckdb]" dlt-init
# or for production:
# pip install "dlt[postgres]"
# pip install "dlt[bigquery]"
# pip install "dlt[snowflake]"You'll need an Amazon Scraper API key. Sign up at https://amazonscraperapi.com — the free tier gives you 1000 requests with no card. Set it in your environment:
export AMAZON_SCRAPER_API_KEY=your_key_herepython amazon_scraper_api_dlt_config.pyBy default this writes to a local DuckDB file (amazon_scraper.duckdb). To swap destinations,
edit the destination= argument in pipeline = dlt.pipeline(...):
# Postgres
pipeline = dlt.pipeline(
pipeline_name="amazon_scraper",
destination="postgres",
dataset_name="amazon_data",
)
# requires DESTINATION__POSTGRES__CREDENTIALS=postgresql://... in env or .dlt/secrets.toml# BigQuery
pipeline = dlt.pipeline(
pipeline_name="amazon_scraper",
destination="bigquery",
dataset_name="amazon_data",
)
# requires DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID=... etc. or a service-account JSONSee https://dlthub.com/docs/dlt-ecosystem/destinations for full destination credential patterns.
The pipeline as written has two resources:
| Resource | Endpoint | Primary key | Write disposition |
|---|---|---|---|
products |
GET /v1/amazon/product |
asin |
merge |
search_results |
POST /v1/amazon/search |
query, asin, position |
append |
products uses dlt's resolve to walk a seed list of ASINs. By default the seed list is empty
— you'll want to either:
- Hardcode an ASIN list inline in the config, or
- Replace the
asin_seedsresource with a real source (a Postgres query, a CSV file, the output of the search endpoint, etc.).
Example with a hardcoded seed list:
import dlt
@dlt.resource(name="asin_seeds", selected=False)
def asin_seeds():
for asin in ["B09HN3Q81F", "B000ALVUM6", "B08N5WRWNW"]:
yield {"asin": asin}…then replace the asin_seeds block in the rest_api config with this resource.
The /v1/amazon/batch endpoint accepts up to 1000 ASINs per call and returns a job_id
that you poll for completion. dlt's rest_api source can't natively model the async
job pattern. Use a custom dlt.resource for this:
import time
import dlt
import requests
@dlt.resource(name="batch_lookup", primary_key="asin", write_disposition="merge")
def batch_lookup(api_key=dlt.secrets.value, asins=None, marketplace="US", poll_interval=3):
headers = {"X-API-Key": api_key}
base = "https://api.amazonscraperapi.com/v1/amazon"
# 1. Submit the batch
resp = requests.post(f"{base}/batch", headers=headers,
json={"asins": asins, "marketplace": marketplace})
resp.raise_for_status()
job_id = resp.json()["job_id"]
# 2. Poll until done
while True:
status = requests.get(f"{base}/batch/{job_id}", headers=headers).json()
if status["status"] == "completed":
break
if status["status"] == "failed":
raise RuntimeError(f"batch {job_id} failed: {status.get('error')}")
time.sleep(poll_interval)
# 3. Yield results
results = requests.get(f"{base}/batch/{job_id}/results", headers=headers).json()
yield from results["products"]Then add batch_lookup(asins=["B09HN3Q81F", ...]) to your pipeline run alongside the
rest_api source.
The product endpoint returns subtly different field sets across marketplaces. For example,
price.was and subscribe_save_price are US-only; tax_inclusive_price is EU-only. If you
load multiple marketplaces into one table, dlt's schema evolution will union the columns,
but you should set a marketplace column on every row downstream of dlt so analytics queries
can scope correctly.
The API enforces concurrency limits per plan tier. dlt's rest_api source doesn't expose retry
configuration directly — for a long pipeline run you may want to wrap the source in dlt's
parallelism settings and set
max_workers to match your API plan.
This config is the "Plan A" fallback while a verified-source proposal is open at
dlt-hub/verified-sources#684. If
the proposal is accepted, this file will be superseded by an official dlt init amazon_scraper_api duckdb template.
- Amazon Scraper API — the data source
- dlt docs — pipeline framework
- dlt rest_api source — the building block this config uses
- dlt destinations — every warehouse this can write to