Last active
June 8, 2024 17:57
-
-
Save kuzaxak/ce6178c185073f69f9836c30e0ea08d9 to your computer and use it in GitHub Desktop.
Proxy app to make QuickWit api compatible with Opensearch Dashboards
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sentry_sdk | |
from flask import Flask, jsonify, request, make_response | |
import requests | |
import json | |
import sys | |
sentry_sdk.init() | |
app = Flask(__name__) | |
TARGET_URL = "http://quickwit-searcher:7280" | |
class HealthzFilter(logging.Filter): | |
def filter(self, record): | |
return '/healthz' not in record.getMessage() | |
class FieldCapsProxyException(Exception): | |
def __init__(self, status_code, response_text): | |
self.status_code = status_code | |
self.response_text = response_text | |
message = f"Proxy Error: {status_code}, {response_text}" | |
super().__init__(message) | |
class SearchProxyException(Exception): | |
def __init__(self, status_code, response_text, data): | |
self.status_code = status_code | |
self.response_text = response_text | |
self.data = data | |
message = f"Proxy Error: {status_code}, {response_text}, {data}" | |
super().__init__(message) | |
@app.route('/healthz', methods=['GET']) | |
def healthz(): | |
return 'OK', 200 | |
@app.route('/', methods=['GET']) | |
def home(): | |
return 'QuickWit Proxy', 200 | |
# Not yet merged. PR: https://github.com/quickwit-oss/quickwit/pull/5096 | |
# @app.route('/_resolve/index/<index_pattern>') | |
# def resolve_index(index_pattern): | |
# # TODO: Convert `*:*` into `*` | |
# target_url = f'{TARGET_URL}/api/v1/indexes?index_id_patterns={index_pattern}' | |
# response = requests.get(target_url) | |
# data = response.json() | |
# | |
# if response.status_code >= 400: | |
# return make_response(jsonify(response.json()), response.status_code) | |
# | |
# indices = [] | |
# for item in data: | |
# index_id = item['index_config']['index_id'] | |
# indices.append({ | |
# 'name': index_id, | |
# 'attributes': ['open'] | |
# }) | |
# | |
# result = { | |
# 'indices': indices | |
# } | |
# | |
# return make_response(jsonify(result), response.status_code) | |
@app.route('/_resolve/index/<index_pattern>') | |
def resolve_index(index_pattern): | |
# TODO: Convert `*:*` into `*` | |
target_url = f"{TARGET_URL}/api/v1/_elastic/_resolve/index/{index_pattern}" | |
response = requests.get(target_url) | |
return make_response(jsonify(response.json()), response.status_code) | |
def filter_range_fields(obj): | |
allowed_range_fields = ["gt", "gte", "lt", "lte", "boost"] | |
remove_list = ["minimum_should_match"] | |
if isinstance(obj, dict): | |
if "range" in obj: | |
for field_name in obj["range"]: | |
obj["range"][field_name] = {k: v for k, v in obj["range"][field_name].items() if | |
k in allowed_range_fields} | |
if "multi_match" in obj: | |
if obj["multi_match"]["type"] not in ["phrase", "phrase_prefix"]: | |
obj["multi_match"]["type"] = "most_fields" | |
if "fields" not in obj["multi_match"]: | |
obj["multi_match"]["fields"] = ["log"] # TODO: Replace with default fields | |
obj["multi_match"] = {k: v for k, v in obj["multi_match"].items() if k in ["query", "type", "fields"]} | |
# In some cases, OpenSearch Dashboards requesting `calendar_interval` | |
# instead of `fixed_interval` if interval is <= 1m | |
if "date_histogram" in obj: | |
if "calendar_interval" in obj["date_histogram"]: | |
obj["date_histogram"]["fixed_interval"] = obj["date_histogram"]["calendar_interval"] | |
del obj["date_histogram"]["calendar_interval"] | |
return {k: filter_range_fields(v) for k, v in obj.items() if k not in remove_list} | |
elif isinstance(obj, list): | |
return [filter_range_fields(item) for item in obj] | |
else: | |
return obj | |
@app.route('/<index_id>/_search', methods=['POST']) | |
def search(index_id): | |
try: | |
target_url = f"{TARGET_URL}/api/v1/_elastic/{index_id}/_search" | |
allowed_fields = ["from", "size", "query", "sort", "aggs", | |
"track_total_hits", "stored_fields", "search_after"] | |
data = request.json | |
print(json.dumps(data)) | |
data = {k: filter_range_fields(v) | |
for k, v in data.items() if k in allowed_fields} | |
response = requests.post(target_url, json=data) | |
if response.status_code >= 400: | |
raise SearchProxyException(response.status_code, response.text, data) | |
return make_response(jsonify(response.json()), response.status_code) | |
except SearchProxyException as e: | |
print(e, file=sys.stderr) | |
sentry_sdk.capture_exception(e) | |
return make_response(jsonify({ | |
"error": "SearchProxyException", | |
"status_code": e.status_code, | |
"response_text": e.response_text, | |
"data": e.data | |
}), e.status_code) | |
@app.route("/<index_pattern>/_field_caps", methods=["GET", "POST"]) | |
def field_caps(index_pattern): | |
try: | |
response = requests.post(f"{TARGET_URL}/api/v1/_elastic/{index_pattern}/_field_caps") | |
if response.status_code >= 400: | |
raise FieldCapsProxyException(response.status_code, response.text) | |
return make_response(jsonify(response.json()), response.status_code) | |
except FieldCapsProxyException as e: | |
print(e, file=sys.stderr) | |
sentry_sdk.capture_exception(e) | |
return make_response(jsonify({ | |
"error": "FieldCapsProxyException", | |
"status_code": e.status_code, | |
"response_text": e.response_text, | |
}), e.status_code) | |
@app.before_request | |
def before_request(): | |
if request.path == '/healthz': | |
return | |
if __name__ == '__main__': | |
log = logging.getLogger('werkzeug') | |
healthz_filter = HealthzFilter() | |
log.addFilter(healthz_filter) | |
app.run(host='0.0.0.0') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment