Skip to content

Instantly share code, notes, and snippets.

@kuzaxak
Created April 4, 2024 19:09
Show Gist options
  • Save kuzaxak/a67ec1507714150c8d0245d16c7e675f to your computer and use it in GitHub Desktop.
Save kuzaxak/a67ec1507714150c8d0245d16c7e675f to your computer and use it in GitHub Desktop.
OpenDashboards to Quickwit proxy to make them compatible
from flask import Flask, jsonify, request
import requests
import json
app = Flask(__name__)
TARGET_URL = "http://quickwit-control-plane:7280"
@app.route('/_resolve/index/<index_pattern>')
def resolve_index(index_pattern):
target_url = f'{TARGET_URL}/api/v1/indexes?index_id_patterns={index_pattern}'
response = requests.get(target_url)
data = response.json()
indices = []
for item in data:
index_id = item['index_config']['index_id']
indices.append({
'name': index_id,
'attributes': ['open']
})
result = {
'indices': indices
}
return jsonify(result)
def filter_range_fields(obj):
allowed_range_fields = ["gt", "gte", "lt", "lte", "boost"]
remove_list = ["minimum_should_match"]
if isinstance(obj, dict):
if "range" in obj:
for field_name in obj["range"]:
obj["range"][field_name] = {k: v for k, v in obj["range"][field_name].items() if k in allowed_range_fields}
if "multi_match" in obj:
if obj["multi_match"]["type"] not in ["phrase", "phrase_prefix"]:
obj["multi_match"]["type"] = "most_fields"
if "fields" not in obj["multi_match"]:
obj["multi_match"]["fields"] = ["log"] # TODO: Replace with default fields
obj["multi_match"] = {k: v for k, v in obj["multi_match"].items() if k in ["query", "type", "fields"]}
return {k: filter_range_fields(v) for k, v in obj.items() if k not in remove_list}
elif isinstance(obj, list):
return [filter_range_fields(item) for item in obj]
else:
return obj
@app.route('/<index_id>/_search', methods=['POST'])
def search(index_id):
target_url = f"{TARGET_URL}/api/v1/_elastic/{index_id}/_search"
allowed_fields = ["from", "size", "query", "sort", "aggs", "track_total_hits", "stored_fields", "search_after"]
data = request.json
print(json.dumps(data))
data = {k: filter_range_fields(v) for k, v in data.items() if k in allowed_fields}
response = requests.post(target_url, json=data)
return jsonify(response.json())
def transform_field(field, timestamp_field, index_id):
field_name = field["name"]
field_type = field["type"]
if field_type == "datetime":
field_type = "date"
elif field_type == "text" and field.get("tokenizer") == "lowercase":
field_type = "keyword"
if field_name.startswith("kubernetes"):
metadata_field = True
else:
metadata_field = False
return {
"metadata_field": metadata_field,
"searchable": field.get("indexed", False),
"aggregatable": timestamp_field == field_name,
"indices": [index_id]
}, field_type, field_name
def transform_index_to_fields(data):
field_mappings = {}
for item in data:
timestamp_field = item["index_config"]["doc_mapping"].get("timestamp_field")
for field in item["index_config"]["doc_mapping"]["field_mappings"]:
field_map, field_type, field_name = transform_field(field, timestamp_field, item["index_config"]["index_id"])
if field_name not in field_mappings:
field_mappings[field_name] = {}
if field_type not in field_mappings[field_name]:
field_mappings[field_name][field_type] = field_map
else:
field_mappings[field_name][field_type]["indices"].append(item["index_config"]["index_id"])
if item["index_config"]["doc_mapping"].get("timestamp_field") == field_name:
field_mappings[field_name][field_type]["time_series_dimension"] = True
# handle nested fields
if field_type == "object":
for sub_field in field["field_mappings"]:
sub_field_map, sub_field_type, sub_field_name = transform_field(sub_field, timestamp_field, item["index_config"]["index_id"])
nested_field_name = f"{field_name}.{sub_field_name}"
if nested_field_name not in field_mappings:
field_mappings[nested_field_name] = {}
if sub_field_type not in field_mappings[nested_field_name]:
field_mappings[nested_field_name][sub_field_type] = sub_field_map
else:
field_mappings[nested_field_name][sub_field_type]["indices"].append(item["index_config"]["index_id"])
return field_mappings
@app.route("/<index_pattern>/_field_caps", methods=["GET", "POST"])
def field_caps(index_pattern):
response = requests.get(f"{TARGET_URL}/api/v1/indexes?index_id_patterns={index_pattern}")
data = response.json()
indices = [item["index_config"]["index_id"] for item in data]
return jsonify({"indices": indices, "fields": transform_index_to_fields(data)})
if __name__ == '__main__':
app.run(host='0.0.0.0')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment