Skip to content

Instantly share code, notes, and snippets.

@cypris75
Last active June 2, 2024 07:05
Show Gist options
  • Save cypris75/dc8baf5ea67b87d87ae839468ffdaa0c to your computer and use it in GitHub Desktop.
Save cypris75/dc8baf5ea67b87d87ae839468ffdaa0c to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "7978c4c4-9b11-452f-a86a-37dea39508da",
"metadata": {},
"outputs": [],
"source": [
"# Based on: https://github.com/dcwangmit01/amazon-invoice-downloader\n",
"\n",
"# Create an .env file with AMAZON_EMAIL and AMAZON_PASSWORD to make this work\n",
"\n",
"#!pip install playwright\n",
"#!playwright install\n",
"#!playwright install-deps"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "90bfa347-585e-455f-a40b-bc12f4f0f88e",
"metadata": {},
"outputs": [],
"source": [
"from playwright.async_api import async_playwright, TimeoutError\n",
"from datetime import datetime\n",
"import random\n",
"import time\n",
"import os\n",
"import sys\n",
"from dotenv import load_dotenv\n",
"import locale"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "693c1c58-1b3a-43f3-a16e-30a098c50993",
"metadata": {},
"outputs": [],
"source": [
"locale.setlocale(locale.LC_TIME, 'de_DE')\n",
"\n",
"# Read from .env file\n",
"load_dotenv()\n",
"email = os.getenv('AMAZON_EMAIL')\n",
"password = os.getenv('AMAZON_PASSWORD')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "09977ed4-7898-4b4f-b966-790ade8eec49",
"metadata": {},
"outputs": [],
"source": [
"def sleep():\n",
" # Add human latency\n",
" # Generate a random sleep time between 3 and 5 seconds\n",
" sleep_time = random.uniform(2, 5)\n",
" # Sleep for the generated time\n",
" time.sleep(sleep_time)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3ade90a4-c855-484b-a7ac-313603c3c74e",
"metadata": {},
"outputs": [],
"source": [
"# Parse date ranges int start_date and end_date\n",
"year = str(datetime.now().year) # current year\n",
"\n",
"start_date, end_date = year + \"0101\", year + \"1231\"\n",
"start_date = datetime.strptime(start_date, \"%Y%m%d\")\n",
"end_date = datetime.strptime(end_date, \"%Y%m%d\")\n",
"\n",
"# Debug\n",
"print(start_date, end_date)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "890dac49-03c7-4ab6-86ec-8d5be3a06fbe",
"metadata": {},
"outputs": [],
"source": [
"# Ensure the location exists for where we will save our downloads\n",
"#target_dir = os.getcwd() + \"/\" + \"downloads\"\n",
"target_dir = \"/Users/abcd/Code/amazon-invoice-downloader/downloads/\" # Trailing slash at the end!\n",
"os.makedirs(target_dir, exist_ok=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c0cbe27d-32d6-4135-b854-0a789be94cc0",
"metadata": {},
"outputs": [],
"source": [
"# Create Playwright context with Chromium (ASYNC)\n",
"pw = await async_playwright().start()\n",
"browser = await pw.chromium.launch(headless=False)\n",
"context = await browser.new_context()\n",
"page = await context.new_page()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5158366d-15df-4b43-9496-a3759d754e09",
"metadata": {},
"outputs": [],
"source": [
"await page.goto(\"https://www.amazon.de/\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "376f64da-1bce-4de0-94be-2a1b56028fd4",
"metadata": {},
"outputs": [],
"source": [
"# Sometimes, we are interrupted by a bot check, so let the user solve it\n",
"await (await page.wait_for_selector('span >> text=Hallo, anmelden', timeout=0)).click()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "36826e72-9a55-4fac-9686-2b528478ce31",
"metadata": {},
"outputs": [],
"source": [
"if email:\n",
" await page.get_by_label(\"E-Mail-Adresse oder Mobiltelefonnummer\").click()\n",
" await page.get_by_label(\"E-Mail-Adresse oder Mobiltelefonnummer\").fill(email)\n",
" await page.get_by_role(\"button\", name=\"Weiter\").click()\n",
" sleep()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "57e67981-346b-4ead-912b-569fe59d60b8",
"metadata": {},
"outputs": [],
"source": [
"if password:\n",
" await page.get_by_label(\"Passwort\").click()\n",
" await page.get_by_label(\"Passwort\").fill(password)\n",
" await page.get_by_label(\"Angemeldet bleiben\").check()\n",
" await page.get_by_role(\"button\", name=\"Anmelden\").click()\n",
" sleep()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4b55325b-1da0-4beb-a770-96e795fcbd1e",
"metadata": {},
"outputs": [],
"source": [
"# Enter OTP now if requested"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "be5cf25e-18ad-4747-9a4f-b32a481b0a23",
"metadata": {},
"outputs": [],
"source": [
"await (await page.wait_for_selector('a >> text=Warenrücksendungen', timeout=0)).click()\n",
"sleep()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9d9ede52-c900-4dc1-9f93-23aa4b93fd40",
"metadata": {},
"outputs": [],
"source": [
"# Get a list of years from the select options\n",
"select = await page.query_selector('select#time-filter')\n",
"years = (await select.inner_text()).split(\"\\n\") # skip the first two text options\n",
"\n",
"# Filter years to include only numerical years (YYYY)\n",
"years = [year for year in years if year.isnumeric()]\n",
"\n",
"# Filter years to the include only the years between start_date and end_date inclusively\n",
"years = [year for year in years if start_date.year <= int(year) <= end_date.year]\n",
"years.sort(reverse=True)\n",
"\n",
"print(years)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6d3e1862-041d-4929-b222-1a7fa3ec4f90",
"metadata": {},
"outputs": [],
"source": [
"# Year Loop (Run backwards through the time range from years to pages to orders)\n",
"for year in years:\n",
" # Select the year in the order filter\n",
" await page.select_option('form[action=\"/your-orders/orders\"] select#time-filter', value=f\"year-{year}\")\n",
" sleep()\n",
"\n",
" # Page Loop\n",
" first_page = True\n",
" done = False\n",
" page_number = 1\n",
" i = 0 # Running invoice number\n",
" while not done:\n",
" # Go to the next page pagination, and continue downloading\n",
" # if there is not a next page then break\n",
" try:\n",
" if first_page:\n",
" first_page = False\n",
" else:\n",
" print(\"Moving to next page ...\")\n",
" await page.click('.a-last a')\n",
" page_number = page_number + 1\n",
" sleep() # sleep after every page load\n",
" except TimeoutError:\n",
" # There are no more pages\n",
" print(\"Finished. No more pages.\")\n",
" break\n",
"\n",
" print(f\"Analyzing cards on page {page_number}\")\n",
"\n",
" order_cards = await page.query_selector_all(\".order.js-order-card\")\n",
"\n",
" for order_card in order_cards:\n",
" # Parse the order card to create the date and file_name\n",
" spans = await order_card.query_selector_all(\"span\")\n",
" date = datetime.strptime(await spans[1].inner_text(), \"%d. %B %Y\")\n",
" print('Order Card for date: ' + str(date))\n",
" \n",
" total = (await spans[3].inner_text()).replace(\"€\", \"\").replace(\".\", \"\") # remove dollar sign and commas\n",
" orderid = await spans[9].inner_text()\n",
" date_str = date.strftime(\"%Y%m%d\")\n",
" file_name = f\"{target_dir}/{date_str}_{total}_amazon_{orderid}_\"\n",
"\n",
" if date > end_date:\n",
" continue\n",
" elif date < start_date:\n",
" done = True\n",
" break\n",
"\n",
" invoice_popover = await order_card.query_selector('xpath=//a[contains(text(), \"Rechnung\")]')\n",
" await invoice_popover.click()\n",
" sleep()\n",
"\n",
" # invoice_selector = 'xpath=//div[contains(@class, \"a-popover-content\")]//a[contains(text(), \"Rechnung \")]'\n",
" invoice_selector = 'xpath=//div[contains(@class, \"a-popover-content\") and not(contains(@style, \"display: none\"))]//a[contains(text(), \"Rechnung\") and not(ancestor::*[contains(@style, \"display: none\")])]'\n",
" invoices = await page.query_selector_all(invoice_selector)\n",
" \n",
" # Download all invoices\n",
" for invoice in invoices:\n",
" href = await invoice.get_attribute(\"href\")\n",
" if \".pdf\" in href:\n",
" i = i + 1\n",
" invoice_number = f'{i:03}'\n",
" link = \"https://www.amazon.de\" + href\n",
" print(link)\n",
" \n",
" # Start waiting for the download\n",
" async with page.expect_download() as download_info:\n",
" # Perform the action that initiates download\n",
" await invoice.click(modifiers=[\"Alt\"])\n",
" sleep()\n",
" download = await download_info.value\n",
" \n",
" # Wait for the download process to complete and save the downloaded file somewhere\n",
" await download.save_as(file_name + invoice_number + \".pdf\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d6ac1596-339e-472a-a26d-82ef330bd3bc",
"metadata": {},
"outputs": [],
"source": [
"# Close the browser\n",
"await context.close()\n",
"await browser.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "47dc0cbe-ff33-4e7f-92ad-4f540442fdb4",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@cypris75
Copy link
Author

This is a Jupyter Notebook which uses Playwright to download Amazon invoices from amazon.de

Based on: https://github.com/dcwangmit01/amazon-invoice-downloader

@cypris75
Copy link
Author

This works for Amazon.de (Germany) and is currently hard-coded to download all invoices from the current year.

To make it work for other marketplaces you would need to adjust the following:

  • Locale
  • Label "Hallo, anmelden"
  • Labels for login fields and buttons
  • Label for your orders
  • Date format
  • All amazon.de references
  • All XPath references to German labels
  • Currency and number formatting references
  • ... and probably more

You also need an .env file with your login data:

AMAZON_EMAIL=jeff@amazon.com
AMAZON_PASSWORD=itsstilldayone

Hope this helps someone.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment