Skip to content

Instantly share code, notes, and snippets.

@lukasluecke
Created February 26, 2021 10:57
Show Gist options
  • Save lukasluecke/0c8e2c12a955198926779a8691874c03 to your computer and use it in GitHub Desktop.
Save lukasluecke/0c8e2c12a955198926779a8691874c03 to your computer and use it in GitHub Desktop.
Handling Mercure GraphQL subscriptions (from API Platform) in urql
import { SubscriptionForwarder, SubscriptionOperation } from '@urql/core/dist/types/exchanges/subscription';
import { host } from 'config';
import { DocumentNode, FieldNode, Kind, OperationDefinitionNode, parse } from 'graphql';
import { OperationResult } from 'urql';
import { make, toObservable } from 'wonka';
export const forwardSubscription: SubscriptionForwarder = operation => {
return toObservable(createFetchSource(operation));
};
const getFieldSelections = (query: DocumentNode): readonly FieldNode[] | null => {
const node = query.definitions.find(
(node: any): node is OperationDefinitionNode => {
return node.kind === Kind.OPERATION_DEFINITION && node.name;
}
);
return node !== undefined ? node.selectionSet.selections.filter(
(node: any): node is FieldNode => {
return node.kind === Kind.FIELD && node.name;
}
) : null;
};
const createFetchSource = (operation: SubscriptionOperation) => {
return make<OperationResult>(({ next, complete }) => {
const abortController =
typeof AbortController !== 'undefined'
? new AbortController()
: undefined;
const { context } = operation;
const subscriptions: EventSource[] = [];
const extraOptions =
typeof context.fetchOptions === 'function'
? context.fetchOptions()
: context.fetchOptions || {};
const fetchOptions = {
body: JSON.stringify(operation),
method: 'POST',
...extraOptions,
headers: {
'content-type': 'application/json',
...extraOptions.headers
},
signal:
abortController !== undefined ? abortController.signal : undefined
};
executeFetch(operation, fetchOptions).then(result => {
if (result !== undefined) {
next(result);
const fieldSelections = getFieldSelections(parse(operation.query));
fieldSelections.forEach(fieldSelection => {
const selectionName = fieldSelection.name.value;
const mercureUrl = result.data[selectionName].mercureUrl;
// TODO: automatically add this to the request set, and strip it in result
if (
process.env.NODE_ENV !== 'production' &&
!mercureUrl
) {
throw new Error(
'Received a subscription response without mercureUrl. This will just return static data.'
);
}
const mercureSubscription = new EventSource(mercureUrl.replace('http://127.0.0.1', host));
mercureSubscription.onmessage = ev => {
const newData = JSON.parse(ev.data);
result = {
...result,
data: { ...result.data, [selectionName]: { ...result.data[selectionName], ...newData } }
};
next(result);
};
subscriptions.push(mercureSubscription);
});
}
});
return () => {
subscriptions.forEach(it => it.close());
if (abortController !== undefined) {
abortController.abort();
}
};
});
};
const executeFetch = (operation: SubscriptionOperation, opts: RequestInit) => {
const { url, fetch: fetcher } = operation.context;
let response: Response | undefined;
return (fetcher || fetch)(url, opts)
.then(res => {
const { status } = res;
const statusRangeEnd = opts.redirect === 'manual' ? 400 : 300;
response = res;
if (status < 200 || status >= statusRangeEnd) {
throw new Error(res.statusText);
} else {
return res.json();
}
})
.then(result => result)
.catch(err => {
if (err.name !== 'AbortError') {
return { errors: [err] };
}
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment