|
import base64 |
|
import json |
|
import os |
|
import sys |
|
from contextlib import contextmanager |
|
from datetime import datetime, timedelta |
|
|
|
import requests |
|
from dateutil.parser import parse as parse_datetime |
|
from fire import Fire, core |
|
|
|
|
|
_DEFAULT_INDEX_GLOB = "filebeat-*" |
|
_DEFAULT_TIMESTAMP_FIELD = "@timestamp" |
|
|
|
|
|
def main( |
|
start: str = None, |
|
end: str = None, |
|
size: int = 1000, |
|
timestamp_field: str = _DEFAULT_TIMESTAMP_FIELD, |
|
index_glob: str = _DEFAULT_INDEX_GLOB, |
|
tail: bool = False, |
|
**filters, |
|
): |
|
"""Fetch logs from Elasticsearch. |
|
|
|
Fetch logs from Elasticsearch, and print them in chronological order. |
|
|
|
Requirements: |
|
Python 3.8 or higher |
|
|
|
pip install python-dateutil==2.8.2 fire==0.4.0 requests==2.26.0 |
|
|
|
Set the following environment variables (a tool such as envchain is recommended): |
|
- ELASTICSEARCH_URL |
|
- ELASTICSEARCH_USERNAME |
|
- ELASTICSEARCH_PASSWORD |
|
|
|
Examples: |
|
Fetch from a specific time period: |
|
|
|
logs --start=2021-10-01T00:00:00 --end=2021-10-01T12:00:00 |
|
|
|
Specify the number of records to fetch (default: 1000): |
|
|
|
logs --size=2000 |
|
|
|
Filter on certain document fields: |
|
|
|
logs --environment=staging --kubernetes.container.name=myapp |
|
|
|
Tail the logs (caution: can be slow): |
|
|
|
logs --tail |
|
|
|
Pretty-print with jq: |
|
|
|
logs | jq |
|
|
|
Print only a particular field: |
|
|
|
logs | jq '.message' |
|
|
|
Parse a json-encoded field (useful structured logs which aren't stored as such): |
|
|
|
logs | jq 'jq -r '(.message=(.message | fromjson))'' |
|
|
|
""" |
|
with handle_broken_pipe(): |
|
for hit in iter_hits( |
|
start=start, |
|
end=end, |
|
size=size, |
|
tail=tail, |
|
index_glob=index_glob, |
|
timestamp_field=timestamp_field, |
|
**filters |
|
): |
|
print(json.dumps(hit, sort_keys=True)) |
|
|
|
|
|
def iter_hits( |
|
start: str = None, |
|
end: str = None, |
|
size: int = 1000, |
|
tail=False, |
|
index_glob: str = _DEFAULT_INDEX_GLOB, |
|
timestamp_field: str = _DEFAULT_TIMESTAMP_FIELD, |
|
**filters, |
|
): |
|
seen: dict = {} |
|
def seen_add(value): |
|
# Like a maxlen deque with O(1) membership checks |
|
if len(seen) > size * 2: |
|
oldest = next(iter(seen)) # O(1) |
|
del seen[oldest] # O(1) |
|
seen[value] = value |
|
|
|
while True: |
|
result = fetch( |
|
body=build_query(start=start, end=end, size=size, timestamp_field=timestamp_field, **filters), |
|
index_glob=index_glob, |
|
) |
|
batch_seen: set = set() |
|
for hit in result["hits"]["hits"]: |
|
if hit["_id"] in seen: |
|
# When paginating, there may be some overlap |
|
continue |
|
yield hit["_source"] |
|
batch_seen.add(hit["_id"]) |
|
if not tail or not batch_seen: |
|
# If we aren't tailing the logs, or there were no new results, stop fetching pages |
|
return |
|
for hit_id in batch_seen: |
|
seen_add(hit["_id"]) |
|
start = hit["_source"][timestamp_field] |
|
|
|
|
|
def fetch(body: dict, index_glob: str = _DEFAULT_INDEX_GLOB) -> dict: |
|
username = os.environ["ELASTICSEARCH_USERNAME"] |
|
password = os.environ["ELASTICSEARCH_PASSWORD"] |
|
elasticsearch_url = os.environ["ELASTICSEARCH_URL"] |
|
response = requests.get( |
|
elasticsearch_url.rstrip("/") + f"/{index_glob}/_search/", |
|
headers={"Authorization": f"Basic {base64.b64encode(f'{username}:{password}'.encode()).decode()}"}, |
|
json=body, |
|
) |
|
assert response.status_code in range(200, 300), f"{response.status_code}: {response.text}" |
|
return response.json() |
|
|
|
|
|
def build_query( |
|
start: str = None, end: str = None, size: int = 1000, timestamp_field: str = _DEFAULT_TIMESTAMP_FIELD, **filters |
|
) -> dict: |
|
end_time = parse_datetime(end) if end else datetime.utcnow() |
|
start_time = parse_datetime(start) if start else end_time - timedelta(hours=1) |
|
return { |
|
"query": { |
|
"bool": { |
|
"must": [ |
|
{"range": {timestamp_field: {"gte": start_time.isoformat(), "lte": end_time.isoformat()}}}, |
|
*filter_queries(**filters), |
|
] |
|
} |
|
}, |
|
"sort": {timestamp_field: {"order": "asc"}}, |
|
"size": size, |
|
} |
|
|
|
|
|
def filter_queries(**filters) -> list[dict]: |
|
return [{"term": {field: value}} for field, value in filters.items()] |
|
|
|
|
|
@contextmanager |
|
def handle_broken_pipe(): |
|
"""We want this to be compatible with tools like `head`. |
|
|
|
https://docs.python.org/3/library/signal.html#note-on-sigpipe |
|
""" |
|
try: |
|
yield |
|
except BrokenPipeError: |
|
# Python flushes standard streams on exit; redirect remaining output |
|
# to devnull to avoid another BrokenPipeError at shutdown |
|
devnull = os.open(os.devnull, os.O_WRONLY) |
|
os.dup2(devnull, sys.stdout.fileno()) |
|
sys.exit(0) # Python exits with error code 1 on EPIPE |
|
|
|
|
|
# Override fire behaviour to not treat help as a filter: |
|
def extract_args(args): |
|
try: |
|
index = args.index("--help") |
|
args = args[:index] |
|
return args, ["--help"] |
|
except ValueError: |
|
return args, [] |
|
|
|
|
|
core.parser.SeparateFlagArgs = extract_args |
|
|
|
|
|
if __name__ == "__main__": |
|
Fire(main) |