Skip to content

Instantly share code, notes, and snippets.

@jacksmith15
Last active December 21, 2021 10:51
Show Gist options
  • Save jacksmith15/ecb894e01319e99263ea55e0a753bd12 to your computer and use it in GitHub Desktop.
Save jacksmith15/ecb894e01319e99263ea55e0a753bd12 to your computer and use it in GitHub Desktop.
Command-line log streaming from Elastic.

Logs

Stream logs from Elasticsearch to stdout.

Requirements

Python 3.8 or higher

pip install python-dateutil==2.8.2 fire==0.4.0 requests==2.26.0

Set the following environment variables (a tool such as envchain is recommended):

  • ELASTICSEARCH_URL
  • ELASTICSEARCH_USERNAME
  • ELASTICSEARCH_PASSWORD

Examples:

Fetch from a specific time period:

logs --start=2021-10-01T00:00:00 --end=2021-10-01T12:00:00

Specify the number of records to fetch (default: 1000):

logs --size=2000

Filter on certain document fields:

logs --environment=staging --kubernetes.container.name=myapp

Tail the logs (caution: can be slow):

logs --tail

Pretty-print with jq:

logs | jq

Print only a particular field:

logs | jq '.message'

Parse a json-encoded field (useful structured logs which aren't stored as such):

logs | jq 'jq -r '(.message=(.message | fromjson))''
import base64
import json
import os
import sys
from contextlib import contextmanager
from datetime import datetime, timedelta
import requests
from dateutil.parser import parse as parse_datetime
from fire import Fire, core
_DEFAULT_INDEX_GLOB = "filebeat-*"
_DEFAULT_TIMESTAMP_FIELD = "@timestamp"
def main(
start: str = None,
end: str = None,
size: int = 1000,
timestamp_field: str = _DEFAULT_TIMESTAMP_FIELD,
index_glob: str = _DEFAULT_INDEX_GLOB,
tail: bool = False,
**filters,
):
"""Fetch logs from Elasticsearch.
Fetch logs from Elasticsearch, and print them in chronological order.
Requirements:
Python 3.8 or higher
pip install python-dateutil==2.8.2 fire==0.4.0 requests==2.26.0
Set the following environment variables (a tool such as envchain is recommended):
- ELASTICSEARCH_URL
- ELASTICSEARCH_USERNAME
- ELASTICSEARCH_PASSWORD
Examples:
Fetch from a specific time period:
logs --start=2021-10-01T00:00:00 --end=2021-10-01T12:00:00
Specify the number of records to fetch (default: 1000):
logs --size=2000
Filter on certain document fields:
logs --environment=staging --kubernetes.container.name=myapp
Tail the logs (caution: can be slow):
logs --tail
Pretty-print with jq:
logs | jq
Print only a particular field:
logs | jq '.message'
Parse a json-encoded field (useful structured logs which aren't stored as such):
logs | jq 'jq -r '(.message=(.message | fromjson))''
"""
with handle_broken_pipe():
for hit in iter_hits(
start=start,
end=end,
size=size,
tail=tail,
index_glob=index_glob,
timestamp_field=timestamp_field,
**filters
):
print(json.dumps(hit, sort_keys=True))
def iter_hits(
start: str = None,
end: str = None,
size: int = 1000,
tail=False,
index_glob: str = _DEFAULT_INDEX_GLOB,
timestamp_field: str = _DEFAULT_TIMESTAMP_FIELD,
**filters,
):
seen: dict = {}
def seen_add(value):
# Like a maxlen deque with O(1) membership checks
if len(seen) > size * 2:
oldest = next(iter(seen)) # O(1)
del seen[oldest] # O(1)
seen[value] = value
while True:
result = fetch(
body=build_query(start=start, end=end, size=size, timestamp_field=timestamp_field, **filters),
index_glob=index_glob,
)
batch_seen: set = set()
for hit in result["hits"]["hits"]:
if hit["_id"] in seen:
# When paginating, there may be some overlap
continue
yield hit["_source"]
batch_seen.add(hit["_id"])
if not tail or not batch_seen:
# If we aren't tailing the logs, or there were no new results, stop fetching pages
return
for hit_id in batch_seen:
seen_add(hit["_id"])
start = hit["_source"][timestamp_field]
def fetch(body: dict, index_glob: str = _DEFAULT_INDEX_GLOB) -> dict:
username = os.environ["ELASTICSEARCH_USERNAME"]
password = os.environ["ELASTICSEARCH_PASSWORD"]
elasticsearch_url = os.environ["ELASTICSEARCH_URL"]
response = requests.get(
elasticsearch_url.rstrip("/") + f"/{index_glob}/_search/",
headers={"Authorization": f"Basic {base64.b64encode(f'{username}:{password}'.encode()).decode()}"},
json=body,
)
assert response.status_code in range(200, 300), f"{response.status_code}: {response.text}"
return response.json()
def build_query(
start: str = None, end: str = None, size: int = 1000, timestamp_field: str = _DEFAULT_TIMESTAMP_FIELD, **filters
) -> dict:
end_time = parse_datetime(end) if end else datetime.utcnow()
start_time = parse_datetime(start) if start else end_time - timedelta(hours=1)
return {
"query": {
"bool": {
"must": [
{"range": {timestamp_field: {"gte": start_time.isoformat(), "lte": end_time.isoformat()}}},
*filter_queries(**filters),
]
}
},
"sort": {timestamp_field: {"order": "asc"}},
"size": size,
}
def filter_queries(**filters) -> list[dict]:
return [{"term": {field: value}} for field, value in filters.items()]
@contextmanager
def handle_broken_pipe():
"""We want this to be compatible with tools like `head`.
https://docs.python.org/3/library/signal.html#note-on-sigpipe
"""
try:
yield
except BrokenPipeError:
# Python flushes standard streams on exit; redirect remaining output
# to devnull to avoid another BrokenPipeError at shutdown
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
sys.exit(0) # Python exits with error code 1 on EPIPE
# Override fire behaviour to not treat help as a filter:
def extract_args(args):
try:
index = args.index("--help")
args = args[:index]
return args, ["--help"]
except ValueError:
return args, []
core.parser.SeparateFlagArgs = extract_args
if __name__ == "__main__":
Fire(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment