Created
March 5, 2025 12:31
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
import sys | |
import time | |
import logging | |
from typing import Optional, Dict, Any | |
from urllib.parse import urljoin | |
from dotenv import load_dotenv | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class TwitterTrendsScraper: | |
"""A class to handle Twitter trends scraping using the Lobstr API.""" | |
def __init__(self): | |
"""Initialize the scraper with environment variables and session setup.""" | |
load_dotenv() | |
self.api_key = os.getenv('LOBSTR_API_KEY') | |
self.x_auth_token = os.getenv('TWITTER_AUTH_TOKEN') | |
self.x_ct0 = os.getenv('TWITTER_CT0') | |
self.account_ids = os.getenv('ACCOUNT_IDS') | |
if not all([self.api_key, self.x_auth_token, self.x_ct0, self.account_ids]): | |
logger.error("Missing required environment variables") | |
sys.exit(1) | |
self.base_url = 'https://api.lobstr.io/v1/' | |
self.crawler_hash = '1b16ff414d27920fb325b68436dbf5fc' | |
self.session = self._setup_session() | |
def _setup_session(self) -> requests.Session: | |
"""Set up and return a requests session with proper headers.""" | |
session = requests.Session() | |
session.headers.update({'Authorization': f'Token {self.api_key}'}) | |
return session | |
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]: | |
"""Make an HTTP request with error handling.""" | |
url = urljoin(self.base_url, endpoint) | |
try: | |
response = self.session.request(method, url, **kwargs) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Request failed: {str(e)}") | |
return None | |
def sync_account(self) -> Optional[str]: | |
"""Sync Twitter account with Lobstr.""" | |
logger.info('Syncing X account...') | |
payload = { | |
'type': 'twitter-sync', | |
'cookies': { | |
'auth_token': self.x_auth_token, | |
'ct0': self.x_ct0 | |
}, | |
} | |
response = self._make_request('POST', 'accounts/cookies', json=payload) | |
if not response: | |
return None | |
sync_id = response.get('id') | |
if not sync_id: | |
logger.error('Sync ID not found in response') | |
return None | |
check_response = self._make_request('GET', f'synchronize/{sync_id}') | |
if not check_response: | |
logger.error('Checking Sync Failed...') | |
return None | |
logger.info('Account synced successfully!') | |
return sync_id | |
def create_squid(self) -> Optional[str]: | |
"""Create a new squid instance.""" | |
payload = {'crawler': self.crawler_hash} | |
logger.info('Creating squid...') | |
response = self._make_request('POST', 'squids', json=payload) | |
if not response: | |
return None | |
squid_id = response.get('id') | |
if not squid_id: | |
logger.error('Squid creation failed...') | |
return None | |
logger.info(f'Squid created successfully with ID: {squid_id}') | |
return squid_id | |
def update_squid(self, squid_id: str) -> bool: | |
"""Update squid configuration.""" | |
payload = { | |
'params': {'max_results': 30}, | |
'accounts': [self.account_ids], | |
} | |
logger.info('Updating squid...') | |
response = self._make_request('POST', f'squids/{squid_id}', json=payload) | |
if not response: | |
logger.error('Error updating the Squid...') | |
return False | |
logger.info('Squid updated successfully...') | |
return True | |
def add_tasks(self, squid_id: str) -> bool: | |
"""Add search tasks to the squid.""" | |
payload = { | |
'tasks': [{ | |
'url': 'https://x.com/search?q=Democrats&src=trend_click&vertical=trends' | |
}], | |
'squid': squid_id | |
} | |
logger.info('Adding task...') | |
response = self._make_request('POST', 'tasks', json=payload) | |
if not response: | |
logger.error('Task adding error encountered') | |
return False | |
logger.info('Task added successfully') | |
return True | |
def start_run(self, squid_id: str) -> Optional[str]: | |
"""Start a new run for the squid.""" | |
payload = {'squid': squid_id} | |
logger.info('Starting run...') | |
response = self._make_request('POST', 'runs', json=payload) | |
if not response: | |
return None | |
run_id = response.get('id') | |
if not run_id: | |
logger.error('Run creation failed') | |
return None | |
logger.info(f'Run {run_id} created successfully') | |
return run_id | |
def monitor_run_progress(self, run_id: str) -> bool: | |
"""Monitor the progress of a run.""" | |
logger.info('Checking run progress') | |
while True: | |
response = self._make_request('GET', f'runs/{run_id}/stats') | |
if not response: | |
return False | |
run_stats = response | |
logger.info(f"Progress: {run_stats.get('percent_done', '0%')}") | |
if run_stats.get('is_done'): | |
logger.info('Run is Complete') | |
return True | |
time.sleep(3) | |
def get_s3_url(self, run_id: str) -> Optional[str]: | |
"""Get the S3 URL for the run results.""" | |
max_wait = 60 | |
interval = 5 | |
elapsed = 0 | |
logger.info('Checking export status...') | |
while elapsed < max_wait: | |
response = self._make_request('GET', f'runs/{run_id}') | |
if not response: | |
return None | |
if response.get('export_done', False): | |
logger.info(response.get('status')) | |
break | |
logger.info('Waiting for export to complete...') | |
time.sleep(interval) | |
elapsed += interval | |
if elapsed >= max_wait: | |
logger.error('Export did not complete within the maximum wait time') | |
return None | |
s3_response = self._make_request('GET', f'runs/{run_id}/download') | |
if not s3_response: | |
return None | |
s3_url = s3_response.get('s3') | |
if not s3_url: | |
logger.error('S3 URL not found') | |
return None | |
logger.info(f'S3 URL: {s3_url}') | |
return s3_url | |
def download_csv(self, s3_url: str) -> bool: | |
"""Download the CSV file from S3 URL.""" | |
try: | |
response = requests.get(s3_url) | |
response.raise_for_status() | |
filename = 'output.csv' | |
with open(filename, 'wb') as f: | |
f.write(response.content) | |
logger.info(f'CSV saved as {filename}') | |
return True | |
except Exception as e: | |
logger.error(f'Error downloading CSV: {str(e)}') | |
return False | |
def run(self) -> bool: | |
"""Execute the complete scraping process.""" | |
try: | |
if not self.sync_account(): | |
return False | |
squid_id = self.create_squid() | |
if not squid_id: | |
return False | |
if not self.add_tasks(squid_id): | |
return False | |
if not self.update_squid(squid_id): | |
return False | |
run_id = self.start_run(squid_id) | |
if not run_id: | |
return False | |
if not self.monitor_run_progress(run_id): | |
return False | |
s3_url = self.get_s3_url(run_id) | |
if not s3_url: | |
return False | |
return self.download_csv(s3_url) | |
except Exception as e: | |
logger.error(f"An unexpected error occurred: {str(e)}") | |
return False | |
def main(): | |
"""Main entry point for the script.""" | |
try: | |
scraper = TwitterTrendsScraper() | |
success = scraper.run() | |
sys.exit(0 if success else 1) | |
except KeyboardInterrupt: | |
logger.info("Script interrupted by user") | |
sys.exit(1) | |
except Exception as e: | |
logger.error(f"Fatal error: {str(e)}") | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment