Skip to content

Instantly share code, notes, and snippets.

@tistaharahap
Created January 8, 2021 14:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tistaharahap/8fbf51c905758a005b2452788d86c4dd to your computer and use it in GitHub Desktop.
Save tistaharahap/8fbf51c905758a005b2452788d86c4dd to your computer and use it in GitHub Desktop.
from os import environ
from typing import Optional, Dict
from datetime import datetime, timedelta
import requests
API_URL = 'https://api.clevertap.com'
API_EVENTS_URI = '/1/events.json'
MAX_BATCH_SIZE = 5000
PROJECT_ID = environ.get('PROJECT_ID')
PASSCODE = environ.get('PASSCODE')
MONGO_CONN_STRING = environ.get('MONGO_CONN_STRING')
def build_events_url() -> str:
return f'{API_URL}{API_EVENTS_URI}'
def get_current_events_cursor(event_name: str, date_from: int, date_to: int, batch_size: int, project_id: str, passcode: str) -> Optional[str]:
headers = {
'X-CleverTap-Account-Id': project_id,
'X-CleverTap-Passcode': passcode
}
params = {
'batch_size': batch_size,
'app': True
}
query = {
'event_name': event_name,
'from': date_from,
'to': date_to
}
response = requests.post(url=build_events_url(),
json=query,
headers=headers,
params=params)
response_body = response.json()
if not response_body:
return None
cursor = response_body.get('cursor')
if not cursor:
return None
return cursor
def get_events_using_cursor(cursor: str, project_id: str, passcode: str) -> Optional[Dict]:
headers = {
'X-CleverTap-Account-Id': project_id,
'X-CleverTap-Passcode': passcode,
'Content-Type': 'application/json'
}
params = {
'cursor': cursor
}
response = requests.get(url=build_events_url(),
headers=headers,
params=params)
response_body = response.json()
if not response_body:
return None
status: str = response_body.get('status')
if not status:
return None
if status.lower() != 'success':
return None
return response_body
def main(account_id: str, passcode: str):
now = datetime.utcnow()
yesterday = now - timedelta(days=1)
date_from = int(f'{yesterday.year}{yesterday.month}{yesterday.day}')
date_to = int(f'{now.year}{now.month}{now.day}')
cursor = get_current_events_cursor(event_name='App Launched',
date_from=date_from,
date_to=date_to,
batch_size=MAX_BATCH_SIZE,
project_id=account_id,
passcode=passcode)
while True:
batch = get_events_using_cursor(cursor=cursor,
project_id=account_id,
passcode=passcode)
print(batch)
break
if __name__ == "__main__":
main(PROJECT_ID, PASSCODE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment