Skip to content

Instantly share code, notes, and snippets.

@svalo
Created January 27, 2022 17:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save svalo/0553e595c6ef4dfd46355524e2811b8b to your computer and use it in GitHub Desktop.
Save svalo/0553e595c6ef4dfd46355524e2811b8b to your computer and use it in GitHub Desktop.
Download Opensearch reports by name from CLI
#!/usr/bin/env python3
import requests
import json
import sys
from datetime import datetime
import base64
ES_URL="https://kibanaurl.example.com"
REPORT_LIST_ENDPOINT="/api/reporting/reports"
REPORT_GEN_ENDPOINT="/api/reporting/generateReport"
USER="report_download"
PASSWORD="surelylongandcomplex"
def get_report_list():
try:
report_list=requests.get(ES_URL+REPORT_LIST_ENDPOINT, auth=(USER, PASSWORD), verify=False).json()
return report_list["data"]
except Exception as e:
print('Errors retrieving reports')
def get_last_report(report_name=None):
report_list = get_report_list()
last_report = {}
if len(report_list) > 0:
for report in report_list:
if report["_source"]["report_definition"]["report_params"]["report_name"] == report_name:
current_report = {
"id": report["_id"],
"name": report["_source"]["report_definition"]["report_params"]["report_name"],
"time_from": report["_source"]["time_from"],
"time_to": report["_source"]["time_to"],
"time_created": report["_source"]["report_definition"]["time_created"]
}
if "time_created" in last_report.keys() and current_report["time_created"] > last_report["time_created"]:
last_report = current_report
else:
last_report = current_report
return last_report
def download_report(report_id=None):
try:
s = requests.Session()
s.get(ES_URL,auth=(USER, PASSWORD),
verify=False)
report_data=s.get(ES_URL+REPORT_GEN_ENDPOINT+"/"+report_id+"?timezone=Europe/Rome",
auth=(USER, PASSWORD),
verify=False
).json()
if "data" in report_data.keys():
with open(report_data["filename"], 'wb') as report_pdf:
report_pdf.write(base64.b64decode(report_data["data"]))
else:
print(report_data)
except Exception as e:
print('Error downloading export')
if __name__ == "__main__":
if len(sys.argv) == 2:
report_name=sys.argv[1]
print("Download of latest report named \"{}\" requested".format(report_name))
report = get_last_report(report_name=report_name)
if "id" in report.keys():
download_report(report_id=report["id"])
else:
print("No report found")
else:
print("Not enough args: pass report name as argument")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment