Skip to content

Instantly share code, notes, and snippets.

@patrick-entinux
Last active May 9, 2019 03:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save patrick-entinux/91024248b2260f0fc4b928a7549da874 to your computer and use it in GitHub Desktop.
Save patrick-entinux/91024248b2260f0fc4b928a7549da874 to your computer and use it in GitHub Desktop.
ElasticRx (RxJS Observables for @elastic/elasticsearch)
import { ApiResponse } from '@elastic/elasticsearch';
import {
TransportRequestCallback,
TransportRequestOptions,
} from '@elastic/elasticsearch/lib/Transport';
import { Observable } from 'rxjs';
/**
* Use ElasticSearch API with RxJS observables that can be
* cancelled. Observables are cold -- they must be subscribed to make
* any request to the server.
*
* Usage:
* ```
* import { Client } from '@elastic/elasticsearch';
* import { ElasticRx as er } from './elastic-rx';
*
* const client = new Client({
* node: 'http://localhost:9200/',
* });
*
* // Get request as Observable<ApiResponse>
* const request = er.from(client.index, {
* index: 'text',
* body: { message: 'hello world' },
* });
* ```
*/
export const ElasticRx = {
from,
};
function from<TApiMethod>(
apiMethod: TApiMethod,
params?: ParamsOfApiMethod<TApiMethod>,
options?: TransportRequestOptions
) {
return new Observable<ApiResponse<BodyOfApiMethod<TApiMethod>>>(observer => {
const requestCallback: callbackFn<any> = (error, result) => {
if (error) {
return observer.error(error);
}
observer.next(result);
observer.complete();
};
const requestParams = [];
if (params) {
requestParams.push(params);
if (options) {
requestParams.push(options);
}
}
const request = (apiMethod as any)(
...requestParams,
requestCallback
) as TransportRequestCallback;
return () => request.abort();
});
}
// Copied from @elastic/elasticsearch. If exported, would look like:
// import { callbackFn } from '@elastic/elasticsearch';
declare type callbackFn<T> = (
err: Error | null,
result: ApiResponse<T>
) => void;
// Copied from @elastic/elasticsearch. If exported, would look like:
// import { ApiMethod } from '@elastic/elasticsearch';
interface ApiMethod<TParams, TBody = any> {
// Promise API
(): Promise<ApiResponse<TBody>>;
(params: TParams): Promise<ApiResponse<TBody>>;
(params: TParams, options: TransportRequestOptions): Promise<
ApiResponse<TBody>
>;
// Callback API
(callback: callbackFn<TBody>): TransportRequestCallback;
(params: TParams, callback: callbackFn<TBody>): TransportRequestCallback;
(
params: TParams,
options: TransportRequestOptions,
callback: callbackFn<TBody>
): TransportRequestCallback;
}
// `infer` feature requires TypeScript 2.8.
// https://www.typescriptlang.org/docs/handbook/release-notes/typescript-2-8.html
type ParamsOfApiMethod<T> = T extends ApiMethod<infer TParams> ? TParams : any;
type BodyOfApiMethod<T> = T extends ApiMethod<any, infer TBody> ? TBody : any;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment