Skip to content

Instantly share code, notes, and snippets.

@yus-ham
Created May 26, 2024 13:45
Show Gist options
  • Save yus-ham/36f75733f26f832efe071664cf638054 to your computer and use it in GitHub Desktop.
Save yus-ham/36f75733f26f832efe071664cf638054 to your computer and use it in GitHub Desktop.
Bun.serve() workaround for emulated CPU
import os from 'os';
import http from 'http';
import { HTTPParser } from 'http-parser-js';
import { styleText } from 'util';
import type { ServeOptions } from 'bun';
const runningOnEmulatedCPUEnv = os.cpus().find(cpu => cpu.model.includes('KVM'))
if (runningOnEmulatedCPUEnv) {
try {
Bun.serve({ fetch() {} }).stop(true)
} catch (err) {
const req_buffer = []
/**
*
* @param {*} input
* @param {import("bun").TCPSocket} socket
* @param {*} onRequest
*/
async function parseRequest(input, socket, onRequest) {
const log = (...args) => `${Bun.env.APP_ENV}`.startsWith('dev') && console.info(styleText(`blue`, `[req handler]`), ...args)
const parser = new HTTPParser(HTTPParser.REQUEST);
let complete = false;
let shouldKeepAlive;
let upgrade;
let method;
let url;
let versionMajor;
let versionMinor;
let headers_ = [];
let trailers = [];
let bodyChunks = [];
parser[HTTPParser.kOnHeadersComplete] = function (req) {
shouldKeepAlive = req.shouldKeepAlive;
upgrade = req.upgrade;
method = HTTPParser.methods[req.method];
url = req.url;
versionMajor = req.versionMajor;
versionMinor = req.versionMinor;
headers_ = req.headers;
};
parser[HTTPParser.kOnBody] = function (chunk, offset, length) {
bodyChunks.push(chunk.slice(offset, offset + length));
};
// This is actually the event for trailers, go figure.
parser[HTTPParser.kOnHeaders] = function (t) {
trailers = t;
};
parser[HTTPParser.kOnMessageComplete] = function () {
complete = true;
};
// Since we are sending the entire Buffer at once here all callbacks above happen synchronously.
// The parser does not do _anything_ asynchronous.
// However, you can of course call execute() multiple times with multiple chunks, e.g. from a stream.
// But then you have to refactor the entire logic to be async (e.g. resolve a Promise in kOnMessageComplete and add timeout logic).
parser.execute(input);
parser.finish();
if (!complete) {
throw new Error('Could not parse request');
}
const body = Buffer.concat(bodyChunks);
console.info({
shouldKeepAlive,
upgrade,
method,
url,
versionMajor,
versionMinor,
headers_,
body,
trailers,
// socket
});
const headers = new Headers()
for (let i = 0; i < headers_.length; i += 2) {
headers.set(headers_[i].toLowerCase(), headers_[i + 1])
}
/** @type {Response} */
const req = new Request(`http://${headers.get('host')}`, { headers, signal: socket.ctrl.signal })
/** @type {Response} */
const res = await onRequest(req)
const statusText = res.statusText || http.STATUS_CODES[res.status]
socket.write(`HTTP/${versionMajor}.${versionMinor} ${res.status} ${statusText}\r\n`)
const headers2 = await extendIterator(res.headers.entries())
for await (const [key, value] of headers2) {
log(`writeHeader:`, key, value)
socket.write(`${key}: ${value}\r\n` + (headers2.hasNext() ? `` : `\r\n`))
}
if (method === 'OPTIONS') {
log('stop on OPTIONS req')
return
}
log('res_body_reader ready')
let chunk, res_body_reader = res.body.getReader()
while ((chunk = await res_body_reader.read()).value) {
log('respon from user req handler:', styleText(`red`, `|`) + Buffer.from(chunk.value.subarray(0, 50)).toString() + styleText(`red`, `|`))
socket.write(chunk.value)
}
}
Bun.serve = function (opts: ServeOptions) {
if (typeof opts !== 'object') {
const err = new TypeError('Bun.serve expects an object')
err.code = 'ERR_INVALID_ARG_TYPE'
throw err
}
if (typeof opts.fetch !== 'function') {
const err = new TypeError('Expected fetch() to be a function')
err.code = 'ERR_INVALID_ARG_TYPE'
throw err
}
const log = (...args) => console.info(styleText(`cyan`, `[Bun.serve]`), ...args)
const port = opts.port || Bun.env.PORT || 3000
try {
const tcp = Bun.listen({
port,
hostname: opts.hostname || '0.0.0.0',
socket: {
open(socket) {
log(`socket client open ...`)
socket.ctrl = new AbortController()
// socket.startTime = performance.now()
// incrementalWrite(socket)
},
drain() {
log(`drain`)
},
data(socket, data) {
log(`received:`, styleText(`red`, `|`) + data.toString().slice(0, 50) + styleText(`red`, `|`))
req_buffer.push(data)
if (data.subarray(-4).equals(Buffer.from('\r\n\r\n'))) {
parseRequest(Buffer.concat(req_buffer), socket, opts.fetch)
}
},
end(socket) {
log(`end`)
socket.end()
},
close(socket) {
log(`close`)
log(`socket.ctrl.abort()`)
socket.ctrl.abort()
},
error(socket, error) {
log(`error: ` + error)
},
connectError(socket, error) {
log(`connectError: ` + error)
},
}
})
return {
address: {
address: tcp.hostname,
family: tcp.hostname.includes(':') ? 'IPv6' : 'IPv4',
port: tcp.port,
},
url: new URL(`http://${tcp.hostname}:${tcp.port}/`),
}
} catch (err) {
if (err.message.includes('Failed to listen')) {
err.message = `Failed to start server. Is port ${port} in use?`
err.name = 'EADDRINUSE'
}
throw err
}
}
}
}
async function extendIterator<T> (iterator: AsyncIterableIterator<T>): Promise<ExtendedAsyncIterableIterator<T>> {
let curr = await iterator.next()
let next = await iterator.next()
let hasNext = !next.done
async function * innerIterator(): AsyncIterableIterator<T> {
while (!curr.done) {
yield curr.value
curr = next
next = await iterator.next()
hasNext = !next.done
}
}
const wrapper = innerIterator()
Object.defineProperty(wrapper, 'hasNext', { writable: false, value: () => hasNext })
return wrapper as ExtendedAsyncIterableIterator<T>
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment