Skip to content

Instantly share code, notes, and snippets.

@shehriar-awan
Created March 5, 2025 12:31
import requests
import os
import sys
import time
import logging
from typing import Optional, Dict, Any
from urllib.parse import urljoin
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TwitterTrendsScraper:
"""A class to handle Twitter trends scraping using the Lobstr API."""
def __init__(self):
"""Initialize the scraper with environment variables and session setup."""
load_dotenv()
self.api_key = os.getenv('LOBSTR_API_KEY')
self.x_auth_token = os.getenv('TWITTER_AUTH_TOKEN')
self.x_ct0 = os.getenv('TWITTER_CT0')
self.account_ids = os.getenv('ACCOUNT_IDS')
if not all([self.api_key, self.x_auth_token, self.x_ct0, self.account_ids]):
logger.error("Missing required environment variables")
sys.exit(1)
self.base_url = 'https://api.lobstr.io/v1/'
self.crawler_hash = '1b16ff414d27920fb325b68436dbf5fc'
self.session = self._setup_session()
def _setup_session(self) -> requests.Session:
"""Set up and return a requests session with proper headers."""
session = requests.Session()
session.headers.update({'Authorization': f'Token {self.api_key}'})
return session
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Make an HTTP request with error handling."""
url = urljoin(self.base_url, endpoint)
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
return None
def sync_account(self) -> Optional[str]:
"""Sync Twitter account with Lobstr."""
logger.info('Syncing X account...')
payload = {
'type': 'twitter-sync',
'cookies': {
'auth_token': self.x_auth_token,
'ct0': self.x_ct0
},
}
response = self._make_request('POST', 'accounts/cookies', json=payload)
if not response:
return None
sync_id = response.get('id')
if not sync_id:
logger.error('Sync ID not found in response')
return None
check_response = self._make_request('GET', f'synchronize/{sync_id}')
if not check_response:
logger.error('Checking Sync Failed...')
return None
logger.info('Account synced successfully!')
return sync_id
def create_squid(self) -> Optional[str]:
"""Create a new squid instance."""
payload = {'crawler': self.crawler_hash}
logger.info('Creating squid...')
response = self._make_request('POST', 'squids', json=payload)
if not response:
return None
squid_id = response.get('id')
if not squid_id:
logger.error('Squid creation failed...')
return None
logger.info(f'Squid created successfully with ID: {squid_id}')
return squid_id
def update_squid(self, squid_id: str) -> bool:
"""Update squid configuration."""
payload = {
'params': {'max_results': 30},
'accounts': [self.account_ids],
}
logger.info('Updating squid...')
response = self._make_request('POST', f'squids/{squid_id}', json=payload)
if not response:
logger.error('Error updating the Squid...')
return False
logger.info('Squid updated successfully...')
return True
def add_tasks(self, squid_id: str) -> bool:
"""Add search tasks to the squid."""
payload = {
'tasks': [{
'url': 'https://x.com/search?q=Democrats&src=trend_click&vertical=trends'
}],
'squid': squid_id
}
logger.info('Adding task...')
response = self._make_request('POST', 'tasks', json=payload)
if not response:
logger.error('Task adding error encountered')
return False
logger.info('Task added successfully')
return True
def start_run(self, squid_id: str) -> Optional[str]:
"""Start a new run for the squid."""
payload = {'squid': squid_id}
logger.info('Starting run...')
response = self._make_request('POST', 'runs', json=payload)
if not response:
return None
run_id = response.get('id')
if not run_id:
logger.error('Run creation failed')
return None
logger.info(f'Run {run_id} created successfully')
return run_id
def monitor_run_progress(self, run_id: str) -> bool:
"""Monitor the progress of a run."""
logger.info('Checking run progress')
while True:
response = self._make_request('GET', f'runs/{run_id}/stats')
if not response:
return False
run_stats = response
logger.info(f"Progress: {run_stats.get('percent_done', '0%')}")
if run_stats.get('is_done'):
logger.info('Run is Complete')
return True
time.sleep(3)
def get_s3_url(self, run_id: str) -> Optional[str]:
"""Get the S3 URL for the run results."""
max_wait = 60
interval = 5
elapsed = 0
logger.info('Checking export status...')
while elapsed < max_wait:
response = self._make_request('GET', f'runs/{run_id}')
if not response:
return None
if response.get('export_done', False):
logger.info(response.get('status'))
break
logger.info('Waiting for export to complete...')
time.sleep(interval)
elapsed += interval
if elapsed >= max_wait:
logger.error('Export did not complete within the maximum wait time')
return None
s3_response = self._make_request('GET', f'runs/{run_id}/download')
if not s3_response:
return None
s3_url = s3_response.get('s3')
if not s3_url:
logger.error('S3 URL not found')
return None
logger.info(f'S3 URL: {s3_url}')
return s3_url
def download_csv(self, s3_url: str) -> bool:
"""Download the CSV file from S3 URL."""
try:
response = requests.get(s3_url)
response.raise_for_status()
filename = 'output.csv'
with open(filename, 'wb') as f:
f.write(response.content)
logger.info(f'CSV saved as {filename}')
return True
except Exception as e:
logger.error(f'Error downloading CSV: {str(e)}')
return False
def run(self) -> bool:
"""Execute the complete scraping process."""
try:
if not self.sync_account():
return False
squid_id = self.create_squid()
if not squid_id:
return False
if not self.add_tasks(squid_id):
return False
if not self.update_squid(squid_id):
return False
run_id = self.start_run(squid_id)
if not run_id:
return False
if not self.monitor_run_progress(run_id):
return False
s3_url = self.get_s3_url(run_id)
if not s3_url:
return False
return self.download_csv(s3_url)
except Exception as e:
logger.error(f"An unexpected error occurred: {str(e)}")
return False
def main():
"""Main entry point for the script."""
try:
scraper = TwitterTrendsScraper()
success = scraper.run()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("Script interrupted by user")
sys.exit(1)
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment