Skip to content

Instantly share code, notes, and snippets.

@kuzaxak
Last active June 8, 2024 17:57
Show Gist options
  • Save kuzaxak/ce6178c185073f69f9836c30e0ea08d9 to your computer and use it in GitHub Desktop.
Save kuzaxak/ce6178c185073f69f9836c30e0ea08d9 to your computer and use it in GitHub Desktop.
Proxy app to make QuickWit api compatible with Opensearch Dashboards
import logging
import sentry_sdk
from flask import Flask, jsonify, request, make_response
import requests
import json
import sys
sentry_sdk.init()
app = Flask(__name__)
TARGET_URL = "http://quickwit-searcher:7280"
class HealthzFilter(logging.Filter):
def filter(self, record):
return '/healthz' not in record.getMessage()
class FieldCapsProxyException(Exception):
def __init__(self, status_code, response_text):
self.status_code = status_code
self.response_text = response_text
message = f"Proxy Error: {status_code}, {response_text}"
super().__init__(message)
class SearchProxyException(Exception):
def __init__(self, status_code, response_text, data):
self.status_code = status_code
self.response_text = response_text
self.data = data
message = f"Proxy Error: {status_code}, {response_text}, {data}"
super().__init__(message)
@app.route('/healthz', methods=['GET'])
def healthz():
return 'OK', 200
@app.route('/', methods=['GET'])
def home():
return 'QuickWit Proxy', 200
# Not yet merged. PR: https://github.com/quickwit-oss/quickwit/pull/5096
# @app.route('/_resolve/index/<index_pattern>')
# def resolve_index(index_pattern):
# # TODO: Convert `*:*` into `*`
# target_url = f'{TARGET_URL}/api/v1/indexes?index_id_patterns={index_pattern}'
# response = requests.get(target_url)
# data = response.json()
#
# if response.status_code >= 400:
# return make_response(jsonify(response.json()), response.status_code)
#
# indices = []
# for item in data:
# index_id = item['index_config']['index_id']
# indices.append({
# 'name': index_id,
# 'attributes': ['open']
# })
#
# result = {
# 'indices': indices
# }
#
# return make_response(jsonify(result), response.status_code)
@app.route('/_resolve/index/<index_pattern>')
def resolve_index(index_pattern):
# TODO: Convert `*:*` into `*`
target_url = f"{TARGET_URL}/api/v1/_elastic/_resolve/index/{index_pattern}"
response = requests.get(target_url)
return make_response(jsonify(response.json()), response.status_code)
def filter_range_fields(obj):
allowed_range_fields = ["gt", "gte", "lt", "lte", "boost"]
remove_list = ["minimum_should_match"]
if isinstance(obj, dict):
if "range" in obj:
for field_name in obj["range"]:
obj["range"][field_name] = {k: v for k, v in obj["range"][field_name].items() if
k in allowed_range_fields}
if "multi_match" in obj:
if obj["multi_match"]["type"] not in ["phrase", "phrase_prefix"]:
obj["multi_match"]["type"] = "most_fields"
if "fields" not in obj["multi_match"]:
obj["multi_match"]["fields"] = ["log"] # TODO: Replace with default fields
obj["multi_match"] = {k: v for k, v in obj["multi_match"].items() if k in ["query", "type", "fields"]}
# In some cases, OpenSearch Dashboards requesting `calendar_interval`
# instead of `fixed_interval` if interval is <= 1m
if "date_histogram" in obj:
if "calendar_interval" in obj["date_histogram"]:
obj["date_histogram"]["fixed_interval"] = obj["date_histogram"]["calendar_interval"]
del obj["date_histogram"]["calendar_interval"]
return {k: filter_range_fields(v) for k, v in obj.items() if k not in remove_list}
elif isinstance(obj, list):
return [filter_range_fields(item) for item in obj]
else:
return obj
@app.route('/<index_id>/_search', methods=['POST'])
def search(index_id):
try:
target_url = f"{TARGET_URL}/api/v1/_elastic/{index_id}/_search"
allowed_fields = ["from", "size", "query", "sort", "aggs",
"track_total_hits", "stored_fields", "search_after"]
data = request.json
print(json.dumps(data))
data = {k: filter_range_fields(v)
for k, v in data.items() if k in allowed_fields}
response = requests.post(target_url, json=data)
if response.status_code >= 400:
raise SearchProxyException(response.status_code, response.text, data)
return make_response(jsonify(response.json()), response.status_code)
except SearchProxyException as e:
print(e, file=sys.stderr)
sentry_sdk.capture_exception(e)
return make_response(jsonify({
"error": "SearchProxyException",
"status_code": e.status_code,
"response_text": e.response_text,
"data": e.data
}), e.status_code)
@app.route("/<index_pattern>/_field_caps", methods=["GET", "POST"])
def field_caps(index_pattern):
try:
response = requests.post(f"{TARGET_URL}/api/v1/_elastic/{index_pattern}/_field_caps")
if response.status_code >= 400:
raise FieldCapsProxyException(response.status_code, response.text)
return make_response(jsonify(response.json()), response.status_code)
except FieldCapsProxyException as e:
print(e, file=sys.stderr)
sentry_sdk.capture_exception(e)
return make_response(jsonify({
"error": "FieldCapsProxyException",
"status_code": e.status_code,
"response_text": e.response_text,
}), e.status_code)
@app.before_request
def before_request():
if request.path == '/healthz':
return
if __name__ == '__main__':
log = logging.getLogger('werkzeug')
healthz_filter = HealthzFilter()
log.addFilter(healthz_filter)
app.run(host='0.0.0.0')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment