Skip to content

Instantly share code, notes, and snippets.

@fabiand
Last active July 14, 2022 11:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabiand/6ab55588487a2ce00320cc560b74d136 to your computer and use it in GitHub Desktop.
Save fabiand/6ab55588487a2ce00320cc560b74d136 to your computer and use it in GitHub Desktop.
Index pod/container logs into loki running locally
# loki from
# https://github.com/grafana/loki/tree/v2.6.0
# run:
# $ pwd
# <somewhere>/ocp-must-gather.local.614777<...>openshift-origin-must-gather-sha256-<...>77c9c15e76eca
# $ python3 mgloki.py
# namespaces/openshift-apiserver-operator/pods/openshift-apiserver-operator-76bd66d74b-v22tc/openshift-apiserver-operator-55bdd9d74b-v63tc.yaml
# …
#
# query:
# $ ./logcli-linux-amd64 query 'rate({host_name =~ ".+"}[5m] |= "client-side")'
import yaml
from pprint import pprint
from glob import glob
import os.path
import json
import re
import requests
import datetime
import timestring
import time
def yffile(name):
with open(name, "r") as fd:
return yaml.safe_load(fd)
class Container():
name = None
logsfn = None
def logs(self):
with open(self.logsfn, "r") as logstream:
for line in logstream.readlines():
yield line
class Pod():
yaml = None
containers = []
def from_file(fn):
p = Pod()
p.yaml = yffile(fn)
print(fn)
for cy in p.yaml["spec"]["containers"]:
c = Container()
c.yaml = cy
c.logsfn = os.path.join(os.path.dirname(fn), c.yaml["name"], c.yaml["name"], "logs", "current.log")
assert os.path.exists(c.logsfn)
p.containers.append(c)
return p
def structured_logs(self):
assert len(self.containers) > 0
for c in self.containers:
for line in c.logs():
# This line below needs to be specific per container!
splitLine = re.split("\s+", line)
# Originally for ES, then in LokiIngester mangled to match loki
yield {"@timestamp": splitLine[0],
"message": " ".join(splitLine[1:]),
"host.name": self.yaml["spec"]["nodeName"],
"host.ip": self.yaml["status"]["hostIP"],
"pod.uid": self.yaml["metadata"]["uid"],
"pod.name": self.yaml["metadata"]["name"],
"pod.namespace": self.yaml["metadata"]["namespace"],
"container.name": c.yaml["name"],
# "pod.ownerReferences": self.yaml["metadata"]["ownerReferences"],
}
class Pods():
glob = "namespaces/*/pods/*/*.yaml"
def structured_logs(self):
for pfn in glob(self.glob):
pod = Pod.from_file(pfn)
for line in pod.structured_logs():
yield line
class LokiIngester():
def __init__(self):
self.session = requests.Session()
def post(self, json):
#pprint(json)
resp = self.session.post("http://localhost:3100/loki/api/v1/push", json=json)
assert resp.status_code == 204
#print(resp.content)
return resp
def ingestOne(self, line):
labels = {k.replace(".", "_"): v for k, v in line.items()}
msg = line["message"]
# FIXME this must be parsed from the logfile
# How do we circumvent that loki just accepts "recent" logs?
#ts = line["@timestamp"]
ts = time.time_ns()
del labels["message"]
del labels["@timestamp"]
streams = {"streams": [{
"stream": labels,
"values": [[str(ts), msg]]
}]}
resp = self.post(json=streams)
def ingest(self, structuredLogs):
for line in structuredLogs:
self.ingestOne(self)
loki = LokiIngester()
#loki.post({"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ str(int(time.time() * 1e9)), "fizzbuzz" ]]}]})
for line in Pods().structured_logs():
loki.ingestOne(line)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment