Skip to content

Instantly share code, notes, and snippets.

Last active November 27, 2023 17:26
Show Gist options
  • Save kgriffs/e717b8669b9b099b82ac40e11ed25e1a to your computer and use it in GitHub Desktop.
Save kgriffs/e717b8669b9b099b82ac40e11ed25e1a to your computer and use it in GitHub Desktop.
DataDog Download Logs Example
import requests
import time
class DatadogLogIterator:
_MAX_LIMIT = 1_000
def __init__(self, query, start, end, api_key, app_key):
self._cursor = None
self._query = query
self._api_key = api_key
self._app_key = app_key
self._start_timestamp = start.isoformat()
self._end_timestamp = end.isoformat()
def __iter__(self):
return self
def __next__(self):
url = f""
headers = {"Content-Type": "application/json", "DD-API-KEY": self._api_key, "DD-APPLICATION-KEY": self._app_key}
params = {
"filter": {"query": self._query, "from": self._start_timestamp, "to": self._end_timestamp},
"page": {"limit": self._FETCH_LIMIT},
if self._cursor:
params["page"]["cursor"] = self._cursor
response =, headers=headers, json=params)
if response.status_code != 200:
print(f"ERROR: Datadog API returned {response.status_code}")
return None
result = response.json()
self._cursor = result["meta"]["page"]["after"]
except KeyError:
raise StopIteration
event_batch = result["data"]
if not event_batch:
raise StopIteration
return event_batch
class DatadogLogFetcher:
def __init__(self, api_key, app_key):
self._api_key = api_key
self._app_key = app_key
def fetch(self, query, start, end, limit):
log_iterator = DatadogLogIterator(query, start, end, self._api_key, self._app_key)
all_events = []
for batch in log_iterator:
all_events += batch
print(f"Fetched a batch of {len(batch)} log events; {len(all_events):>7} total fetched so far...")
if len(all_events) >= limit:
return all_events
Copy link

kgriffs commented Nov 21, 2023

Example usage:

end =
start = end - timedelta(hours=2)
fetcher = DatadogLogFetcher(API_KEY, APP_KEY)

log_events = fetcher.fetch('service:my-app-name', start, end, 10_000)

from collections import defaultdict
import json

def analyze_events(log_events):
    size_by_message = defaultdict(int)
    count_by_message = defaultdict(int)

    for event in log_events:
        attributes = event['attributes']
        size_by_message[attributes['message']] += len(json.dumps(attributes))
        count_by_message[attributes['message']] += 1

    for message, size_bytes in sorted(size_by_message.items(), key=lambda x: x[1], reverse=True):
        size_kb = size_bytes / 1024
        size_mb = size_kb / 1024
        count = count_by_message[message]

        print(f"{size_mb:>8.2f} MiB - {size_kb/count:>6.2f} KiB per Event - {message}")


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment