Skip to content

Instantly share code, notes, and snippets.

@dylanowen
Last active May 14, 2021 19:19
Show Gist options
  • Save dylanowen/bbd70b493544d8a30b8202ddf1f2cd14 to your computer and use it in GitHub Desktop.
Save dylanowen/bbd70b493544d8a30b8202ddf1f2cd14 to your computer and use it in GitHub Desktop.
Apollo Attachments Plugin
const {GraphQLList, GraphQLNonNull} = require("graphql/type/definition");
const express = require('express');
const {ApolloServer, gql} = require('apollo-server-express');
const Busboy = require('busboy');
const createError = require('http-errors');
const {initTracer} = require("jaeger-client");
const {WriteStream} = require('fs-capacitor');
const {createHash} = require('crypto');
const { printSchema, parse, visit } = require('graphql');
const tracer = initTracer({
serviceName: 'prototype',
reporter: {
collectorEndpoint: 'http://localhost:14268/api/traces',
logSpans: true,
},
sampler: {
type: 'const',
param: 1
}
}, {logger: console});
const typeDefs = gql`
scalar Upload
type Query {
_dummy: Boolean
}
type Mutation {
upload(files: [Upload!]!): String
}
`;
async function fileInfo(file, parentSpan) {
const {filename, createReadStream} = await file;
const readStream = createReadStream();
const span = tracer.startSpan(`hashing_file:${filename}`, {
childOf: parentSpan
});
const hasher = createHash('sha1');
hasher.setEncoding('hex');
readStream.pipe(hasher);
const hash = await new Promise((resolve, reject) => {
readStream.on('end', () => resolve(hasher.read()));
readStream.on('error', reject);
})
span.finish();
return `${filename}: ${hash}`;
}
const resolvers = {
Query: {
_dummy: () => false,
},
Mutation: {
async upload(parent, {files}, context, info) {
const span = tracer.startSpan("upload_resolver", {
childOf: context.span
});
let result = "";
for (let file of files) {
result += await fileInfo(file, span) + " ";
}
span.finish();
return result;
}
}
};
class Upload {
constructor() {
this.promise = new Promise((resolve, reject) => {
this.resolve = (file) => {
this.file = file;
resolve(file);
};
this.reject = reject;
});
this.promise.catch(() => {
});
}
}
class Uploads {
constructor() {
this.parsed = false;
this.uploadsMap = new Map();
}
setUpload(file) {
this.getUpload(file.fieldName).resolve(file);
}
getUpload(id) {
let upload = this.uploadsMap.get(id);
if (upload === undefined) {
upload = new Upload();
this.uploadsMap.set(id, upload)
}
// if we've already finished parsing reject this invalid file-id
if (this.parsed) {
upload.reject(createError(400, `No valid file submitted for id: ${id}`))
}
return upload
}
httpParsingComplete() {
// reject any pending uploads as GraphQL has requested ids that don't exist
for (const [id, upload] of this.uploadsMap.entries()) {
if (!upload.file) {
upload.reject(createError(400, `No valid file submitted for id: ${id}`))
}
}
// mark our parsing as done
this.parsed = true;
}
uploads() {
return this.uploadsMap.values();
}
}
function processRequest(
request,
response,
{
maxFieldSize = 1000000, // 1 MB
maxFileSize = Infinity,
maxFiles = Infinity,
} = {}
) {
return new Promise((resolve, reject) => {
let released;
let exitError;
let currentStream;
let graphQLRequest;
let uploads = new Uploads()
const parser = new Busboy({
headers: request.headers,
limits: {
fieldSize: maxFieldSize,
fields: 1, // Only the graphql query
fileSize: maxFileSize,
files: maxFiles,
},
});
const exit = (error) => {
if (exitError) return;
exitError = createError(500, `error`);
reject(exitError);
parser.destroy();
if (currentStream) currentStream.destroy(exitError);
request.unpipe(parser);
setImmediate(() => {
request.resume();
});
};
const release = () => {
if (released) return;
released = true;
for (const upload of uploads.uploads()) {
if (upload.file) {
upload.file.capacitor.release();
}
}
};
parser.on(
'field',
(fieldName, value, fieldNameTruncated, valueTruncated) => {
try {
graphQLRequest = JSON.parse(value);
} catch (error) {
return exit();
}
resolve({graphQLRequest, uploads});
}
);
parser.on('file', (fieldName, stream, filename, encoding, mimetype) => {
const span = tracer.startSpan(`reading_file:${fieldName}`, {
childOf: request.span
});
currentStream = stream;
stream.on('end', () => {
span.finish();
currentStream = null;
});
let fileError;
const capacitor = new WriteStream();
capacitor.on('error', () => {
stream.unpipe();
stream.resume();
});
stream.on('error', (error) => {
fileError = error;
stream.unpipe();
capacitor.destroy(exitError);
});
const file = {
fieldName,
filename,
mimetype,
encoding,
createReadStream(options) {
const error = fileError || (released ? exitError : null);
if (error) throw error;
return capacitor.createReadStream(options);
},
};
Object.defineProperty(file, 'capacitor', {value: capacitor});
stream.pipe(capacitor);
uploads.setUpload(file);
});
parser.once('finish', () => {
request.unpipe(parser);
request.resume();
// reject any invalid file ids
uploads.httpParsingComplete();
if (!graphQLRequest)
return exit();
});
parser.once('error', exit);
response.once('finish', release);
response.once('close', release);
request.pipe(parser);
});
};
const app = express();
app.use((req, res, next) => {
const span = tracer.startSpan('http_request');
req.span = span;
res.once('finish', () => span.finish());
next();
});
app.use(async (request, response, next) => {
if (!request.is('multipart/form-data')) return next();
let {graphQLRequest, uploads} = await processRequest(request, response);
request.body = graphQLRequest;
request.graphQLUploads = uploads;
return next();
});
// Doesn't handle InputObjects but it could, it also probably should be some form of AstVisitor
function processUploadScalars(value, graphQLType, uploads) {
if (graphQLType instanceof GraphQLNonNull) {
return processUploadScalars(value, graphQLType.ofType, uploads)
} else if (graphQLType instanceof GraphQLList) {
return value.map((v) => processUploadScalars(v, graphQLType.ofType, uploads))
} else if (value && graphQLType.name === "Upload") {
return uploads.getUpload(value).promise;
} else {
return value;
}
}
const server = new ApolloServer({
uploads: false,
typeDefs, resolvers,
context: ({req}) => {
return {
uploads: req.graphQLUploads || new Map(),
span: req.span,
}
},
plugins: [
{
requestDidStart(context) {
return {
executionDidStart() {
const span = tracer.startSpan("graphql_execution", {
childOf: context.context.span
});
context.context.span = span;
return {
willResolveField({source, args, context, info}) {
info.parentType.getFields()[info.fieldName].args.forEach(({name, type}) => {
// check for our Attachment scalars
args[name] = processUploadScalars(args[name], type, context.uploads);
})
},
executionDidEnd() {
span.finish();
}
}
}
}
}
}
]
});
(async function () {
await server.start()
server.applyMiddleware({app})
await new Promise(resolve => app.listen({port: 4000}, resolve));
console.log(`🚀 Server ready at http://localhost:4000${server.graphqlPath}`);
}());
{
"name": "apollo-attachments",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"apollo-server-express": "^2.24.0",
"busboy": "^0.3.1",
"express": "^4.17.1",
"fs-capacitor": "^6.2.0",
"graphql": "^15.5.0",
"jaeger-client": "^3.15.0",
"opentracing": "^0.14.4"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment