Skip to content

Instantly share code, notes, and snippets.

@Querela
Last active November 17, 2023 11:08
Show Gist options
  • Save Querela/b3ca66d2cde785100c0e84caca5f3944 to your computer and use it in GitHub Desktop.
Save Querela/b3ca66d2cde785100c0e84caca5f3944 to your computer and use it in GitHub Desktop.
Simple static FCS endpoint
import logging
import os.path
from dataclasses import dataclass
from typing import Dict, List, Optional
from clarin.sru.constants import SRUDiagnostics, SRUResultCountPrecision, SRUVersion
from clarin.sru.diagnostic import SRUDiagnostic, SRUDiagnosticList
from clarin.sru.exception import SRUException
from clarin.sru.fcs.constants import FCS_NS, FCSQueryType
from clarin.sru.fcs.queryparser import FCSQuery
from clarin.sru.fcs.server.search import (
DataView,
EndpointDescription,
ResourceInfo,
SimpleEndpointDescription,
SimpleEndpointSearchEngineBase,
)
from clarin.sru.fcs.xml.writer import (
AdvancedDataViewWriter,
FCSRecordXMLStreamWriter,
SpanOffsetUnit,
)
from clarin.sru.queryparser import CQLQuery, SRUQuery, SRUQueryParserRegistry
from clarin.sru.server.config import SRUServerConfig, SRUServerConfigKey
from clarin.sru.server.request import SRURequest
from clarin.sru.server.result import SRUSearchResultSet
from clarin.sru.server.wsgi import SRUServerApp
from clarin.sru.xml.writer import SRUXMLStreamWriter
from clarin.sru.fcs.server.search import Layer
# ---------------------------------------------------------------------------
LOGGER = logging.getLogger(__name__)
RESOURCE_INVENTORY_URL_KEY = "my_fcs_endpoint.resourceInventoryURL"
ENDPOINTDESCRIPTION_FILENAME = "endpoint-description.xml"
# ---------------------------------------------------------------------------
class MyMinimalEmptyFCSSRUSearchResultSet(SRUSearchResultSet):
def __init__(self, request: SRURequest, diagnostics: SRUDiagnosticList) -> None:
super().__init__(diagnostics)
self.request = request
def get_record_count(self) -> int:
return 0
def get_total_record_count(self) -> int:
return -1
def get_record_identifier(self) -> str | None:
return None
def get_record_schema_identifier(self) -> str:
if self.request:
rsid = self.request.get_record_schema_identifier()
if rsid:
return rsid
return FCS_NS # CLARIN_FCS_RECORD_SCHEMA
def next_record(self) -> bool:
return False
def write_record(self, writer: SRUXMLStreamWriter) -> None:
pass
@dataclass
class SingleLeftHitRightResult:
left: str
hit: str
right: str
class MyHITSFCSSRUSearchResultSet(SRUSearchResultSet):
def __init__(
self,
pid: str,
total: int,
kwic_results: List[SingleLeftHitRightResult],
request: SRURequest,
diagnostics: SRUDiagnosticList,
) -> None:
super().__init__(diagnostics)
self.request = request
self.pid = pid
self.total = total
self.results = kwic_results
self.results_cursor = -1
# we use this to limit our returned results to the maximum number requested
# just in case our local search endpoint return more records
self.maximum_records = request.get_maximum_records()
def get_record_count(self) -> int:
return len(self.results)
def get_total_record_count(self) -> int:
return self.total
def get_result_count_precision(self) -> SRUResultCountPrecision | None:
return SRUResultCountPrecision.EXACT
def get_record_identifier(self) -> str | None:
return None
# boilerplate
def get_record_schema_identifier(self) -> str:
if self.request:
rsid = self.request.get_record_schema_identifier()
if rsid:
return rsid
return FCS_NS # CLARIN_FCS_RECORD_SCHEMA
# boilerplate
def get_surrogate_diagnostic(self) -> Optional[SRUDiagnostic]:
if (
self.get_record_schema_identifier()
and FCS_NS != self.get_record_schema_identifier()
):
raise SRUDiagnostic(
SRUDiagnostics.RECORD_NOT_AVAILABLE_IN_THIS_SCHEMA,
self.get_record_schema_identifier(),
message=f'Record is not available in record schema "{self.get_record_schema_identifier()}".',
)
return None
def next_record(self) -> bool:
if (self.results_cursor + 1) >= min(len(self.results), self.maximum_records):
return False
self.results_cursor += 1
return True
def write_record(self, writer: SRUXMLStreamWriter) -> None:
cur_result = self.results[self.results_cursor]
FCSRecordXMLStreamWriter.startResource(writer, pid=self.pid, ref=None)
# PIDs/Refs are generated dynamically,
# they might also be stored on the `cur_result` object
FCSRecordXMLStreamWriter.startResourceFragment(
writer,
pid=f"{self.pid}#frag{self.results_cursor}",
ref=f"http://my-resource/#frag{self.results_cursor}",
)
# we use the variant to write left, hit, right
FCSRecordXMLStreamWriter.writeSingleHitHitsDataView(
writer, cur_result.left, cur_result.hit, cur_result.right
)
# instead of text with hits ranges
# note that we required at least one hit range tuple
# FCSRecordXMLStreamWriter.writeHitsDataView(
# writer,
# " ".join([cur_result.left, cur_result.hit, cur_result.right]),
# [(len(cur_result.left) + 1, len(cur_result.hit))],
# second_is_length=True,
# )
FCSRecordXMLStreamWriter.endResourceFragment(writer)
FCSRecordXMLStreamWriter.endResource(writer)
@dataclass
class SingleAdvancedResult:
# mapping layer id to layer URI
layers: Dict[str, str]
# mapping layer id to later tokens
spans: Dict[str, List[str]]
# indices of token hits
hits: List[int]
class MyADVFCSSRUSearchResultSet(SRUSearchResultSet):
def __init__(
self,
pid: str,
total: int,
kwic_results: List[SingleAdvancedResult],
request: SRURequest,
diagnostics: SRUDiagnosticList,
) -> None:
super().__init__(diagnostics)
self.request = request
self.pid = pid
self.total = total
self.results = kwic_results
self.results_cursor = -1
# we use this to limit our returned results to the maximum number requested
# just in case our local search endpoint return more records
self.maximum_records = request.get_maximum_records()
def get_record_count(self) -> int:
return len(self.results)
def get_total_record_count(self) -> int:
return self.total
def get_result_count_precision(self) -> SRUResultCountPrecision | None:
return SRUResultCountPrecision.EXACT
def get_record_identifier(self) -> str | None:
return None
def get_record_schema_identifier(self) -> str:
if self.request:
rsid = self.request.get_record_schema_identifier()
if rsid:
return rsid
return FCS_NS # CLARIN_FCS_RECORD_SCHEMA
def get_surrogate_diagnostic(self) -> Optional[SRUDiagnostic]:
if (
self.get_record_schema_identifier()
and FCS_NS != self.get_record_schema_identifier()
):
raise SRUDiagnostic(
SRUDiagnostics.RECORD_NOT_AVAILABLE_IN_THIS_SCHEMA,
self.get_record_schema_identifier(),
message=f'Record is not available in record schema "{self.get_record_schema_identifier()}".',
)
return None
def next_record(self) -> bool:
if (self.results_cursor + 1) >= min(len(self.results), self.maximum_records):
return False
self.results_cursor += 1
return True
def write_record(self, writer: SRUXMLStreamWriter) -> None:
cur_result = self.results[self.results_cursor]
if not LAYER_ID_TEXT in cur_result.layers:
raise SRUException(
SRUDiagnostics.GENERAL_SYSTEM_ERROR,
f"Current result does not seem to have a '{LAYER_ID_TEXT}' layer!",
)
# helper for writing advanced dataviews
helper = AdvancedDataViewWriter(SpanOffsetUnit.ITEM)
FCSRecordXMLStreamWriter.startResource(writer, pid=self.pid, ref=None)
# PIDs/Refs are generated dynamically,
# they might also be stored on the `cur_result` object
FCSRecordXMLStreamWriter.startResourceFragment(
writer,
pid=f"{self.pid}#frag{self.results_cursor}",
ref=f"http://my-resource/#frag{self.results_cursor}",
)
# we need to know our segmentation,
# for this we use the "LAYER_ID_TEXT" layer and single whitespace for separation
ranges = []
# we need to compute our own offsets for span positions in text
offset = 1 # not python offsets but FCS stream positions
for span in cur_result.spans.get(LAYER_ID_TEXT):
ranges.append((offset, offset + len(span)))
offset += len(span) # length of content
offset += 1 # whitespace separator
# we will go over all layers and all spans and add them
for layer_id, layer_uri in cur_result.layers.items():
for idx, span in enumerate(cur_result.spans.get(layer_id)):
is_hit = idx in cur_result.hits
start, end = ranges[idx]
helper.addSpan(
layer_uri, start, end, span, highlight=1 if is_hit else 0
)
# default result presentation, plain text with hit markers
helper.writeHitsDataView(writer, cur_result.layers.get(LAYER_ID_TEXT))
# we just assume that if we use this class and forgot to provide a request
# that it still is ADV search request and we write the dataviw
if self.request is None or self.request.is_query_type(FCSQueryType.FCS):
helper.writeAdvancedDataView(writer)
FCSRecordXMLStreamWriter.endResourceFragment(writer)
FCSRecordXMLStreamWriter.endResource(writer)
# ---------------------------------------------------------------------------
# a single resource PID
RESOURCE_PID = "my:corpus-123"
class MyFulltextonlyFCSSRUEndpointSearchEngine(SimpleEndpointSearchEngineBase):
def _build_EndpointDescription(self) -> EndpointDescription:
dataviews = [
DataView(
identifier="hits",
mimetype="application/x-clarin-fcs-hits+xml",
deliveryPolicy=DataView.DeliveryPolicy.SEND_BY_DEFAULT,
)
]
resources = [
ResourceInfo(
pid=RESOURCE_PID,
title={"en": "My Text Resource"},
description={"en": "An example corpus.", "de": "Beispielkorpus"},
landing_page_uri="http://example.de/corpus-123",
languages=["deu"],
available_DataViews=dataviews,
)
]
return SimpleEndpointDescription(
version=2,
capabilities=["http://clarin.eu/fcs/capability/basic-search"],
supported_DataViews=dataviews,
supported_Layers=[],
resources=resources,
pid_case_sensitive=False,
)
# required to implement
def create_EndpointDescription(
self,
config: SRUServerConfig,
query_parser_registry_builder: SRUQueryParserRegistry.Builder,
params: Dict[str, str],
) -> EndpointDescription:
LOGGER.debug("Create Endpoint Description ...")
# we can dynamically load a bundled endpoint description or parse one from an URL
# riu = params.get(RESOURCE_INVENTORY_URL_KEY)
# if riu is None or riu.isspace():
# LOGGER.debug("Using bundled 'endpoint-description.xml' file")
# return self._load_bundled_EndpointDescription()
# else:
# LOGGER.debug("Using external file '%s'", riu)
# return SimpleEndpointDescriptionParser.parse(riu)
# or we build one on-the-fly ...
return self._build_EndpointDescription()
# required to implement
def do_init(
self,
config: SRUServerConfig,
query_parser_registry_builder: SRUQueryParserRegistry.Builder,
params: Dict[str, str],
) -> None:
LOGGER.debug("Initialize Endpoint ...")
# can be empty if nothing needs to be initialized
pass
# required to implement
def search(
self,
config: SRUServerConfig,
request: SRURequest,
diagnostics: SRUDiagnosticList,
) -> SRUSearchResultSet:
LOGGER.debug("Search Request: %s", request)
query: str
if request.is_query_type(FCSQueryType.CQL):
# Got a CQL query (either SRU 1.1 or higher).
# Translate to a proper CQP query ...
query_in: SRUQuery = request.get_query()
assert isinstance(query_in, CQLQuery)
query = query_in.raw_query
LOGGER.debug("query: %s -> %s", query_in.raw_query, query_in.parsed_query)
else:
# Got something else we don't support. Send error ...
raise SRUException(
SRUDiagnostics.CANNOT_PROCESS_QUERY_REASON_UNKNOWN,
f"Queries with queryType '{request.get_query_type()}' are not supported by this CLARIN-FCS Endpoint.",
)
# NOTE: we just assume `RESOURCE_PID` is the only resource
# simulate no results, minimal implementation
# return MyMinimalEmptyFCSSRUSearchResultSet(request, diagnostics)
# simulate HITS (KWIC) result for BASIC search
return MyHITSFCSSRUSearchResultSet(
RESOURCE_PID,
100, # total number of results
[
SingleLeftHitRightResult("left", "hit", "right"),
SingleLeftHitRightResult("left 2", "better hit", "right 4"),
],
request,
diagnostics,
)
# otherwise fail spectacularly
LAYER_ID_TEXT = "word"
LAYER_ID_POS = "pos"
class MyFCSSRUEndpointSearchEngine(SimpleEndpointSearchEngineBase):
def _build_EndpointDescription(self) -> EndpointDescription:
dataviews = [
DataView(
identifier="hits",
mimetype="application/x-clarin-fcs-hits+xml",
deliveryPolicy=DataView.DeliveryPolicy.SEND_BY_DEFAULT,
),
DataView(
identifier="adv",
mimetype="application/x-clarin-fcs-adv+xml",
deliveryPolicy=DataView.DeliveryPolicy.SEND_BY_DEFAULT,
),
]
layers = [
Layer(
id=LAYER_ID_TEXT, # id to refer on my side
result_id="http://fcs/word",
type="text", # clarin layer type
encoding=Layer.ContentEncoding.VALUE,
),
Layer(
id=LAYER_ID_POS,
result_id="http://fcs/pos",
type="pos",
encoding=Layer.ContentEncoding.VALUE,
)
]
resources = [
ResourceInfo(
pid=RESOURCE_PID,
title={"en": "My Text Resource"},
description={"en": "An example corpus.", "de": "Beispielkorpus"},
landing_page_uri="http://example.de/corpus-123",
languages=["deu"],
available_DataViews=dataviews,
available_Layers=layers,
)
]
return SimpleEndpointDescription(
version=2,
capabilities=["http://clarin.eu/fcs/capability/basic-search"],
supported_DataViews=dataviews,
supported_Layers=layers,
resources=resources,
pid_case_sensitive=False,
)
# required to implement
def create_EndpointDescription(
self,
config: SRUServerConfig,
query_parser_registry_builder: SRUQueryParserRegistry.Builder,
params: Dict[str, str],
) -> EndpointDescription:
LOGGER.debug("Create Endpoint Description ...")
# we can dynamically load a bundled endpoint description or parse one from an URL
# riu = params.get(RESOURCE_INVENTORY_URL_KEY)
# if riu is None or riu.isspace():
# LOGGER.debug("Using bundled 'endpoint-description.xml' file")
# return self._load_bundled_EndpointDescription()
# else:
# LOGGER.debug("Using external file '%s'", riu)
# return SimpleEndpointDescriptionParser.parse(riu)
# or we build one on-the-fly ...
return self._build_EndpointDescription()
# required to implement
def do_init(
self,
config: SRUServerConfig,
query_parser_registry_builder: SRUQueryParserRegistry.Builder,
params: Dict[str, str],
) -> None:
LOGGER.debug("Initialize Endpoint ...")
# can be empty if nothing needs to be initialized
pass
# required to implement
def search(
self,
config: SRUServerConfig,
request: SRURequest,
diagnostics: SRUDiagnosticList,
) -> SRUSearchResultSet:
LOGGER.debug("Search Request: %s", request)
query: str
if request.is_query_type(FCSQueryType.CQL):
# Got a CQL query (either SRU 1.1 or higher).
# Translate to a proper CQP query ...
query_in: SRUQuery = request.get_query()
assert isinstance(query_in, CQLQuery)
LOGGER.debug("query (cql): %s -> %s", query_in.raw_query, query_in.parsed_query)
query = query_in.raw_query
elif request.is_query_type(FCSQueryType.FCS):
# Got a FCS query (SRU 2.0).
# Translate to a proper CQP query
query_in: SRUQuery = request.get_query()
assert isinstance(query_in, FCSQuery)
LOGGER.debug("query (fcs-ql): %s -> %s", query_in.raw_query, query_in.parsed_query)
query = query_in.raw_query
else:
# Got something else we don't support. Send error ...
raise SRUException(
SRUDiagnostics.CANNOT_PROCESS_QUERY_REASON_UNKNOWN,
f"Queries with queryType '{request.get_query_type()}' are not supported by this CLARIN-FCS Endpoint.",
)
# NOTE: we just assume `RESOURCE_PID` is the only resource
# ...
# simulate no results, minimal implementation
# return MyMinimalEmptyFCSSRUSearchResultSet(request, diagnostics)
if request.is_query_type(FCSQueryType.CQL):
# simulate HITS (KWIC) result for BASIC search
return MyHITSFCSSRUSearchResultSet(
RESOURCE_PID,
100, # total number of results
[
SingleLeftHitRightResult("left", "hit", "right"),
SingleLeftHitRightResult("left 2", "better hit", "right 4"),
],
request,
diagnostics,
)
elif request.is_query_type(FCSQueryType.FCS):
# simulate ADV + HITS result for ADVANCED search
return MyADVFCSSRUSearchResultSet(
RESOURCE_PID,
1,
[
SingleAdvancedResult(
layers={LAYER_ID_TEXT: "http://fcs/word", LAYER_ID_POS: "http://fcs/pos"},
spans={
LAYER_ID_TEXT: ["one", "token", "at", "each", "position"],
LAYER_ID_POS: ["NUM", "NOUN", "???", "UNK", "NOUN"],
},
hits=[2, 3],
)
],
request,
diagnostics,
)
# otherwise fail spectacularly
def _check_pids(self):
pass
def _check_dataviews(self):
pass
# ---------------------------------------------------------------------------
def make_app():
here = os.path.dirname(__file__)
config_file = os.path.join(here, "sru-server-config.xml")
ed_file = os.path.join(here, "endpoint-description.xml")
# can be a class if constructur has no parameters or defaults
# or can be instanciated search engine
searchengine_clazz_or_instance = MyFulltextonlyFCSSRUEndpointSearchEngine
searchengine_clazz_or_instance = MyFCSSRUEndpointSearchEngine
app = SRUServerApp(
searchengine_clazz_or_instance,
config_file,
{
# for bundled
# RESOURCE_INVENTORY_URL_KEY: ed_file, # comment out to use bundled
#
# e.g. to provide dynamically configured API URLs
# API_BASE_URL_KEY: API_BASE_URL,
#
# SRU optional
# SRUServerConfigKey.SRU_TRANSPORT: "http",
# SRUServerConfigKey.SRU_HOST: "127.0.0.1",
# SRUServerConfigKey.SRU_PORT: "8080",
# SRU required
SRUServerConfigKey.SRU_DATABASE: "demo",
#
SRUServerConfigKey.SRU_ECHO_REQUESTS: "true",
SRUServerConfigKey.SRU_NUMBER_OF_RECORDS: 10,
SRUServerConfigKey.SRU_MAXIMUM_RECORDS: 100,
SRUServerConfigKey.SRU_ALLOW_OVERRIDE_MAXIMUM_RECORDS: "true",
SRUServerConfigKey.SRU_ALLOW_OVERRIDE_INDENT_RESPONSE: "true",
# To enable SRU 2.0 for FCS 2.0
SRUServerConfigKey.SRU_SUPPORTED_VERSION_MAX: SRUVersion.VERSION_2_0,
# SRUServerConfigKey.SRU_SUPPORTED_VERSION_DEFAULT: SRUVersion.VERSION_2_0,
SRUServerConfigKey.SRU_LEGACY_NAMESPACE_MODE: "loc",
},
develop=True,
)
return app
def make_gunicorn_app():
"""Setup logging to display on stdout with gunicorn logging level."""
import logging
# https://trstringer.com/logging-flask-gunicorn-the-manageable-way/
gunicorn_logger = logging.getLogger("gunicorn.error")
logging.basicConfig(
level=gunicorn_logger.level,
format="[%(levelname).1s][%(name)s:%(lineno)s] %(message)s",
)
return make_app()
# ---------------------------------------------------------------------------
if __name__ == "__main__":
from werkzeug.serving import run_simple
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("cql.parser").setLevel(logging.INFO)
app = make_app()
# "0.0.0.0" for global, "localhost" if only on localhost
run_simple("0.0.0.0", 8080, app, use_reloader=True)
# run with:
# python endpoint.py
# choose variants for
# make_app --> searchengine_clazz_or_instance
# MyFulltextonlyFCSSRUEndpointSearchEngine
# MyFCSSRUEndpointSearchEngine
# SimpleEndpointSearchEngineBase.search --> search result set
# MyMinimalEmptyFCSSRUSearchResultSet
# MyHITSFCSSRUSearchResultSet
# MyADVFCSSRUSearchResultSet
# test:
# http://localhost:8080/?x-fcs-endpoint-description=true
# http://localhost:8080/?query=test
# http://localhost:8080/?query=[word=%22test%22]&queryType=fcs
# ?x-indent-response=1 for indentation
# ---------------------------------------------------------------------------
Werkzeug==3.0.1
fcs-simple-endpoint==1.0.4
<?xml version="1.0" encoding="UTF-8"?>
<endpoint-config xmlns="http://www.clarin.eu/sru-server/1.0/">
<databaseInfo>
<title xml:lang="de">Mein FCS Endpoint</title>
<title xml:lang="en" primary="true">My FCS Endpoint</title>
<description xml:lang="de">Der FCS Endpunkt von mir.</description>
<description xml:lang="en" primary="true">Search in the My FCS Endpoint corpora.</description>
<author xml:lang="en">I</author>
<author xml:lang="de" primary="true">ich</author>
</databaseInfo>
<indexInfo>
<set name="fcs" identifier="http://clarin.eu/fcs/resource">
<title xml:lang="en" primary="true">CLARIN Content Search</title>
</set>
<index search="true" scan="false" sort="false">
<title xml:lang="en" primary="true">Words</title>
<map primary="true">
<name set="fcs">words</name>
</map>
</index>
</indexInfo>
<schemaInfo>
<schema identifier="http://clarin.eu/fcs/resource" name="fcs" sort="false" retrieve="true">
<title xml:lang="en" primary="true">CLARIN Content Search</title>
</schema>
</schemaInfo>
</endpoint-config>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment