Created
March 20, 2021 16:47
-
-
Save prathje/862cc96b531e0dcf904f5ea71f7812a1 to your computer and use it in GitHub Desktop.
Extract Graylog Messages of a Stream for a Day Through API - Circumvents Elasticsearch Search Window Size By Splitting into multiple Timeranges
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib3 | |
import sys | |
from datetime import datetime, timezone, timedelta, date | |
import requests | |
#urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
# Usage: python3 main.py http://example.org/api <token> <stream_id> 2020-01-01 | |
base_url = sys.argv[1] | |
token = sys.argv[2] | |
stream_id = sys.argv[3] | |
export_day_str = sys.argv[4] | |
export_daytime = datetime.strptime(export_day_str, '%Y-%m-%d') | |
export_day = date(export_daytime.year, export_daytime.month, export_daytime.day) | |
create_search_url = base_url+"/api/views/search" | |
session = requests.Session() | |
session.verify = False | |
session.auth = (token, "token") | |
session.headers.update({ | |
'X-Requested-By': '', | |
'Accept': 'application/json' | |
}) | |
ES_MAX_SEARCH_WINDOW_SIZE = 10000 | |
BATCH_SIZE = 1000 | |
def datetime_to_string(dt): | |
return dt.astimezone(timezone.utc).isoformat(timespec='milliseconds') | |
def datetime_from_string(datetime_string): | |
datetime_string = datetime_string.replace("Z", "+00:00") | |
return datetime.fromisoformat(datetime_string).astimezone(timezone.utc) | |
# Create an interval [from, to) | |
def create_interval(from_datetime, start_datetime): | |
return from_datetime, start_datetime | |
def split_interval(iv): | |
from_date = iv[0] | |
to_date = iv[1] | |
half = from_date+(to_date-from_date) / 2 | |
iv1 = create_interval(from_date, half) | |
iv2 = create_interval(half, to_date) | |
return iv1, iv2 | |
def exec_sync_search(iv, offset, limit): | |
search_id = "6051fc776cd6f7d268411594" | |
query_id = "233d1fd5-9e02-4e1a-9d46-d96275ddf9c8" | |
search_type_id = "76d63310-2dc0-4c22-acac-101707c3542c" | |
date_from = datetime_to_string(iv[0]) | |
date_to = datetime_to_string(iv[1]-timedelta(milliseconds=1)) | |
search_params = { | |
"id": search_id, | |
"parameters": [], | |
"queries": [ | |
{"id": query_id, | |
"query": {"type": "elasticsearch", "query_string": ""}, | |
"timerange": {"type": "absolute", "from": date_from, "to": date_to}, | |
"filter": {"type": "or", "filters": [{"type": "stream", "id": stream_id}]}, | |
"search_types": [ | |
{ | |
"timerange": None, | |
"query": None, | |
"streams": [], | |
"id": search_type_id, | |
"name": None, | |
"limit": limit, "offset": offset, "sort": [{"field": "timestamp", "order": "ASC"}], | |
"decorators": [], | |
"type": "messages", "filter": None | |
} | |
] | |
} | |
] | |
} | |
search_resp = session.post(url=base_url+"/api/views/search/sync", json=search_params) | |
if search_resp.ok: | |
data = search_resp.json() | |
search_results = data['results'][query_id]['search_types'][search_type_id] | |
total_results = search_results['total_results'] | |
messages = map(lambda msg : msg['message'], search_results['messages']) | |
return total_results, messages | |
else: | |
return None, None | |
def gen_entries(iv): | |
# Request the size inside the wanted interval | |
# While this might be a bit excessive it lowers complexity on this script's side | |
(total_count, _res) = exec_sync_search(iv, 0, 0) | |
# print(datetime_to_string(iv[0]), datetime_to_string(iv[1])) | |
# If total_count is higher than the maximum supported elastic search window size | |
# We split the interval in half: see https://github.com/Graylog2/graylog2-server/issues/3571 | |
if total_count > ES_MAX_SEARCH_WINDOW_SIZE: | |
(iv_first, iv_second) = split_interval(iv) | |
yield from gen_entries(iv_first) | |
yield from gen_entries(iv_second) | |
else: | |
# Otherwise, we simply make batch_requests | |
offset = 0 | |
while offset < total_count: | |
limit = min(BATCH_SIZE, total_count-offset) | |
(_count, batch_result) = exec_sync_search(iv, offset, limit) | |
offset += limit | |
yield from batch_result | |
# we first create a search | |
def create_date_interval(d): | |
begin = datetime(d.year, d.month, d.day, tzinfo=timezone.utc) | |
end = begin+timedelta(days=1) | |
return create_interval(begin, end) | |
for entry in gen_entries(create_date_interval(export_day)): | |
print(entry) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment