Skip to content

Instantly share code, notes, and snippets.

@shehriar-awan
Created March 5, 2025 12:31
Show Gist options
  • Select an option

  • Save shehriar-awan/a9aa6c8c404c3148390dd8000f33d71a to your computer and use it in GitHub Desktop.

Select an option

Save shehriar-awan/a9aa6c8c404c3148390dd8000f33d71a to your computer and use it in GitHub Desktop.
import requests
import os
import sys
import time
import logging
from typing import Optional, Dict, Any
from urllib.parse import urljoin
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TwitterTrendsScraper:
"""A class to handle Twitter trends scraping using the Lobstr API."""
def __init__(self):
"""Initialize the scraper with environment variables and session setup."""
load_dotenv()
self.api_key = os.getenv('LOBSTR_API_KEY')
self.x_auth_token = os.getenv('TWITTER_AUTH_TOKEN')
self.x_ct0 = os.getenv('TWITTER_CT0')
self.account_ids = os.getenv('ACCOUNT_IDS')
if not all([self.api_key, self.x_auth_token, self.x_ct0, self.account_ids]):
logger.error("Missing required environment variables")
sys.exit(1)
self.base_url = 'https://api.lobstr.io/v1/'
self.crawler_hash = '1b16ff414d27920fb325b68436dbf5fc'
self.session = self._setup_session()
def _setup_session(self) -> requests.Session:
"""Set up and return a requests session with proper headers."""
session = requests.Session()
session.headers.update({'Authorization': f'Token {self.api_key}'})
return session
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Make an HTTP request with error handling."""
url = urljoin(self.base_url, endpoint)
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
return None
def sync_account(self) -> Optional[str]:
"""Sync Twitter account with Lobstr."""
logger.info('Syncing X account...')
payload = {
'type': 'twitter-sync',
'cookies': {
'auth_token': self.x_auth_token,
'ct0': self.x_ct0
},
}
response = self._make_request('POST', 'accounts/cookies', json=payload)
if not response:
return None
sync_id = response.get('id')
if not sync_id:
logger.error('Sync ID not found in response')
return None
check_response = self._make_request('GET', f'synchronize/{sync_id}')
if not check_response:
logger.error('Checking Sync Failed...')
return None
logger.info('Account synced successfully!')
return sync_id
def create_squid(self) -> Optional[str]:
"""Create a new squid instance."""
payload = {'crawler': self.crawler_hash}
logger.info('Creating squid...')
response = self._make_request('POST', 'squids', json=payload)
if not response:
return None
squid_id = response.get('id')
if not squid_id:
logger.error('Squid creation failed...')
return None
logger.info(f'Squid created successfully with ID: {squid_id}')
return squid_id
def update_squid(self, squid_id: str) -> bool:
"""Update squid configuration."""
payload = {
'params': {'max_results': 30},
'accounts': [self.account_ids],
}
logger.info('Updating squid...')
response = self._make_request('POST', f'squids/{squid_id}', json=payload)
if not response:
logger.error('Error updating the Squid...')
return False
logger.info('Squid updated successfully...')
return True
def add_tasks(self, squid_id: str) -> bool:
"""Add search tasks to the squid."""
payload = {
'tasks': [{
'url': 'https://x.com/search?q=Democrats&src=trend_click&vertical=trends'
}],
'squid': squid_id
}
logger.info('Adding task...')
response = self._make_request('POST', 'tasks', json=payload)
if not response:
logger.error('Task adding error encountered')
return False
logger.info('Task added successfully')
return True
def start_run(self, squid_id: str) -> Optional[str]:
"""Start a new run for the squid."""
payload = {'squid': squid_id}
logger.info('Starting run...')
response = self._make_request('POST', 'runs', json=payload)
if not response:
return None
run_id = response.get('id')
if not run_id:
logger.error('Run creation failed')
return None
logger.info(f'Run {run_id} created successfully')
return run_id
def monitor_run_progress(self, run_id: str) -> bool:
"""Monitor the progress of a run."""
logger.info('Checking run progress')
while True:
response = self._make_request('GET', f'runs/{run_id}/stats')
if not response:
return False
run_stats = response
logger.info(f"Progress: {run_stats.get('percent_done', '0%')}")
if run_stats.get('is_done'):
logger.info('Run is Complete')
return True
time.sleep(3)
def get_s3_url(self, run_id: str) -> Optional[str]:
"""Get the S3 URL for the run results."""
max_wait = 60
interval = 5
elapsed = 0
logger.info('Checking export status...')
while elapsed < max_wait:
response = self._make_request('GET', f'runs/{run_id}')
if not response:
return None
if response.get('export_done', False):
logger.info(response.get('status'))
break
logger.info('Waiting for export to complete...')
time.sleep(interval)
elapsed += interval
if elapsed >= max_wait:
logger.error('Export did not complete within the maximum wait time')
return None
s3_response = self._make_request('GET', f'runs/{run_id}/download')
if not s3_response:
return None
s3_url = s3_response.get('s3')
if not s3_url:
logger.error('S3 URL not found')
return None
logger.info(f'S3 URL: {s3_url}')
return s3_url
def download_csv(self, s3_url: str) -> bool:
"""Download the CSV file from S3 URL."""
try:
response = requests.get(s3_url)
response.raise_for_status()
filename = 'output.csv'
with open(filename, 'wb') as f:
f.write(response.content)
logger.info(f'CSV saved as {filename}')
return True
except Exception as e:
logger.error(f'Error downloading CSV: {str(e)}')
return False
def run(self) -> bool:
"""Execute the complete scraping process."""
try:
if not self.sync_account():
return False
squid_id = self.create_squid()
if not squid_id:
return False
if not self.add_tasks(squid_id):
return False
if not self.update_squid(squid_id):
return False
run_id = self.start_run(squid_id)
if not run_id:
return False
if not self.monitor_run_progress(run_id):
return False
s3_url = self.get_s3_url(run_id)
if not s3_url:
return False
return self.download_csv(s3_url)
except Exception as e:
logger.error(f"An unexpected error occurred: {str(e)}")
return False
def main():
"""Main entry point for the script."""
try:
scraper = TwitterTrendsScraper()
success = scraper.run()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("Script interrupted by user")
sys.exit(1)
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment