Skip to content

Instantly share code, notes, and snippets.

Created March 20, 2021 16:47
Show Gist options
  • Save prathje/862cc96b531e0dcf904f5ea71f7812a1 to your computer and use it in GitHub Desktop.
Save prathje/862cc96b531e0dcf904f5ea71f7812a1 to your computer and use it in GitHub Desktop.
Extract Graylog Messages of a Stream for a Day Through API - Circumvents Elasticsearch Search Window Size By Splitting into multiple Timeranges
import urllib3
import sys
from datetime import datetime, timezone, timedelta, date
import requests
# Usage: python3 <token> <stream_id> 2020-01-01
base_url = sys.argv[1]
token = sys.argv[2]
stream_id = sys.argv[3]
export_day_str = sys.argv[4]
export_daytime = datetime.strptime(export_day_str, '%Y-%m-%d')
export_day = date(export_daytime.year, export_daytime.month,
create_search_url = base_url+"/api/views/search"
session = requests.Session()
session.verify = False
session.auth = (token, "token")
'X-Requested-By': '',
'Accept': 'application/json'
def datetime_to_string(dt):
return dt.astimezone(timezone.utc).isoformat(timespec='milliseconds')
def datetime_from_string(datetime_string):
datetime_string = datetime_string.replace("Z", "+00:00")
return datetime.fromisoformat(datetime_string).astimezone(timezone.utc)
# Create an interval [from, to)
def create_interval(from_datetime, start_datetime):
return from_datetime, start_datetime
def split_interval(iv):
from_date = iv[0]
to_date = iv[1]
half = from_date+(to_date-from_date) / 2
iv1 = create_interval(from_date, half)
iv2 = create_interval(half, to_date)
return iv1, iv2
def exec_sync_search(iv, offset, limit):
search_id = "6051fc776cd6f7d268411594"
query_id = "233d1fd5-9e02-4e1a-9d46-d96275ddf9c8"
search_type_id = "76d63310-2dc0-4c22-acac-101707c3542c"
date_from = datetime_to_string(iv[0])
date_to = datetime_to_string(iv[1]-timedelta(milliseconds=1))
search_params = {
"id": search_id,
"parameters": [],
"queries": [
{"id": query_id,
"query": {"type": "elasticsearch", "query_string": ""},
"timerange": {"type": "absolute", "from": date_from, "to": date_to},
"filter": {"type": "or", "filters": [{"type": "stream", "id": stream_id}]},
"search_types": [
"timerange": None,
"query": None,
"streams": [],
"id": search_type_id,
"name": None,
"limit": limit, "offset": offset, "sort": [{"field": "timestamp", "order": "ASC"}],
"decorators": [],
"type": "messages", "filter": None
search_resp ="/api/views/search/sync", json=search_params)
if search_resp.ok:
data = search_resp.json()
search_results = data['results'][query_id]['search_types'][search_type_id]
total_results = search_results['total_results']
messages = map(lambda msg : msg['message'], search_results['messages'])
return total_results, messages
return None, None
def gen_entries(iv):
# Request the size inside the wanted interval
# While this might be a bit excessive it lowers complexity on this script's side
(total_count, _res) = exec_sync_search(iv, 0, 0)
# print(datetime_to_string(iv[0]), datetime_to_string(iv[1]))
# If total_count is higher than the maximum supported elastic search window size
# We split the interval in half: see
if total_count > ES_MAX_SEARCH_WINDOW_SIZE:
(iv_first, iv_second) = split_interval(iv)
yield from gen_entries(iv_first)
yield from gen_entries(iv_second)
# Otherwise, we simply make batch_requests
offset = 0
while offset < total_count:
limit = min(BATCH_SIZE, total_count-offset)
(_count, batch_result) = exec_sync_search(iv, offset, limit)
offset += limit
yield from batch_result
# we first create a search
def create_date_interval(d):
begin = datetime(d.year, d.month,, tzinfo=timezone.utc)
end = begin+timedelta(days=1)
return create_interval(begin, end)
for entry in gen_entries(create_date_interval(export_day)):
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment