Skip to content

Instantly share code, notes, and snippets.

@thunderpoot
Last active July 13, 2024 18:24
Show Gist options
  • Save thunderpoot/58a748565d2e5b2582520fa535821908 to your computer and use it in GitHub Desktop.
Save thunderpoot/58a748565d2e5b2582520fa535821908 to your computer and use it in GitHub Desktop.
An example of fetching a page from Common Crawl using the Common Crawl Index
import requests
import json
from urllib.parse import quote_plus
# Please note: f-strings require Python 3.6+
# The URL of the Common Crawl Index server
CC_INDEX_SERVER = 'http://index.commoncrawl.org/'
# The Common Crawl index you want to query
INDEX_NAME = 'CC-MAIN-2023-40' # Replace with the latest index name
# The URL you want to look up in the Common Crawl index
target_url = 'commoncrawl.org/faq' # Replace with your target URL
# Function to search the Common Crawl Index
def search_cc_index(url):
encoded_url = quote_plus(url)
index_url = f'{CC_INDEX_SERVER}{INDEX_NAME}-index?url={encoded_url}&output=json'
response = requests.get(index_url)
print("Response from CCI:", response.text) # Output the response from the server
if response.status_code == 200:
records = response.text.strip().split('\n')
return [json.loads(record) for record in records]
else:
return None
# Function to fetch the content from Common Crawl
def fetch_page_from_cc(records):
for record in records:
offset, length = int(record['offset']), int(record['length'])
prefix = record['filename'].split('/')[0]
s3_url = f'https://data.commoncrawl.org/{record["filename"]}'
response = requests.get(s3_url, headers={'Range': f'bytes={offset}-{offset+length-1}'})
if response.status_code == 206:
# Process the response content if necessary
# For example, you can use warcio to parse the WARC record
return response.content
else:
print(f"Failed to fetch data: {response.status_code}")
return None
# Search the index for the target URL
records = search_cc_index(target_url)
if records:
print(f"Found {len(records)} records for {target_url}")
# Fetch the page content from the first record
content = fetch_page_from_cc(records)
if content:
print(f"Successfully fetched content for {target_url}")
# You can now process the 'content' variable as needed
else:
print(f"No records found for {target_url}")
@thunderpoot
Copy link
Author

thunderpoot commented Jul 13, 2024

how would one construct the record object inline on line #37 of the example code?

# Of course import `ArchiveIterator` from `warcio`...

from warcio.archiveiterator import ArchiveIterator

# Use `stream=True` in the call to `requests.get()` to get a raw byte stream
# because it's gzip compressed data...

    response = requests.get(
        s3_url,
        headers={'Range': f'bytes={offset}-{offset+length-1}'},
        stream=True
    )

# Create an `ArchiveIterator` object directly from `response.raw`
# which handles the gzipped WARC content...

# Iterate through the WARC records, looking for a 'response' type record and
# when hitting a 'response' record we return the content
# using `warc_record.content_stream().read()` like so...

    if response.status_code == 206:
        stream = ArchiveIterator(response.raw)
        for warc_record in stream:
            if warc_record.rec_type == 'response':
                return warc_record.content_stream().read()
    else:
        print(f"Failed to fetch data: {response.status_code}")

Hope this helps!

@sanjayk-github-dev
Copy link

sanjayk-github-dev commented Jul 13, 2024

Thanks. I tried this, but it doesn't find any warc_record in the stream.

@sanjayk-github-dev
Copy link

Sorry, never mind. I forgot to add stream=True.

Thanks for your help!

@thunderpoot
Copy link
Author

Sorry, never mind. I forgot to add stream=True.

Thanks for your help!

Great, glad to hear it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment