Skip to content

Instantly share code, notes, and snippets.

@dvirsky
Last active December 10, 2020 17:09
Show Gist options
  • Save dvirsky/f3300b90bf943aefc48c5c5e6c658670 to your computer and use it in GitHub Desktop.
Save dvirsky/f3300b90bf943aefc48c5c5e6c658670 to your computer and use it in GitHub Desktop.

RediSearch Aggregation Engine

Main Idea

RediSearch can already index and retrieve large amounts of documents really fast. This is a proposal to allow aggregations on top of that - i.e to enable extracting statitstics and insights from data stored in Redis using RediSearch.

The idea is to perform a search on the RS index, load properties from the fetched documents, and perform calculations based on them - using grouping, sorting, and projection functions. These are composed as a pipeline, and reentrant.

Internally, the aggregation engine uses the same mechanism that loads normal search results (the result processor chain), only using a different set of result processors, which ultimately build result objects from the pipeline. If a normal search processing pipeline looks like filter -> score -> sort -> load documents -> serialize, an aggregation pipeline would look like: filter -> load properties -> group -> reduce -> project -> sort -> serialize.

To avoid making the search request too complex, we create a new API call for aggregations. Let's call it FT.AGGREGATE. The user provides this function with a chain of operations consisting of:

  • FILTER

    (One per request) A normal search query filter, executed just like any search would, using the exact same syntax.

  • LOAD / SELECT

    (One per request) Which properties to load from each document. We try to minimize the properties to reduce loading time. It is preferable to use the document table's sortable field table, to reduce processing time.

  • GROUPBY + GROUPREDUCE

    Grouping rule by 1 or more properties in each document, accompanied by one or more "group reducer", that reduces each group into one record. These can include count, sum, average, count distinct, and so on

  • SORTBY

    Sorting the results accumulated upstream.

  • PROJECT

    Take one or more properties from the upstream results, and perform some projection function on them (i.e. a ratio between two sums).

  • LIMIT

    Limit the number of results in the aggregation pipelin, either pre grouping or post grouping.

These can be chained and are reentrant - you can have several group by and sorts for the same aggregation.

Proposed API

This is the API for FT.AGGREGATE:

    FT.AGGREGATE {index}
      FILTER {query}
      SELECT {nargs} {field} ...
      [
        GROUPBY {nargs} {property} ... [AS {alias}]
        GROUPREDUCE {function} {nargs} {arg} ... [AS {alias}]
        ...
      ]
      [SORTBY {nargs} {property} ... ]
      [PROJECT {function} {nargs} {args} [AS {alias}]]
      [LIMIT {count} {offset}]
      ...

Concrete example:

FT.AGGREGATE users
  FILTER "@age:[18 34]"
  GROUPBY 2 country gender AS grp
    GROUPREDUCE average 1 age
    GROUPREDUCE count 0 AS num
  SORTBY num
  PROJECT floor 1 age

Which would be akin to SQL:

SELECT FLOOR(avg_age), COUNT(*) as num FROM users
   WHERE age BETWEEN 18 AND 34
   GROUP BY country, gender
   ORDER BY num

Distributed Aggregations

In cluster mode, aggregations will not be fully distributed - only the filtering. While this will hurt performance when processing very large data sets, it will simplify the engine greatly, since we will not have to perform complex calculations of push-down stages to the shards based on different types of aggregations.

Instead, when the user request hits a coordinator node, it will act as an aggregator: it will take the request, send only the filter and select parts of it to all the index nodes, and receive raw records back.

The rest of the aggregation pipeline will be performed on the aggregator node. In the future we can extend the engine so that more work will be pushed down to the shards and the aggregator will be able to do less work.

Having the index nodes stream raw records to the aggregator nodes, without using too much memory, will require the aggregator to receive the records in chunks, for example, 1000 at a time. The aggregator will merge the streams from all the index nodes into one stream to be processed by the pipeline, and when the stream from a specific node is drained, it will request more records from that node.

To enable that, we will have to add a cursor API to the engine - that is being able to page through search results incrementally, "parking" the query between requests, much like SCAN does now, although we will have to save the state between iterations. The current query processing architecture allows that, as we already park query processing when performing concurrent search. The main gap would be implementing a "cursor cache" that holds pending cursors between processing cycles.

Alternatively, we can re-create cursors each time, and seek to the last read docId in them. This will be simpler, but will require re-parsing the queries for each iteration. It might be suitable as an initial implementation of cursors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment