Skip to content

Instantly share code, notes, and snippets.

@andris9
Last active February 25, 2021 03:42
Show Gist options
  • Save andris9/179f3c685ddb161856a5b2ebc090fc14 to your computer and use it in GitHub Desktop.
Save andris9/179f3c685ddb161856a5b2ebc090fc14 to your computer and use it in GitHub Desktop.
'use strict';
const { Transform } = require('stream');
const { Headers } = require('mailsplit');
const { SMTPServer } = require('smtp-server');
const MAX_ALLOWED_SIZE = 10 * 1024 * 1024;
/**
* MessageSplitter instance is a transform stream that separates message headers
* from the rest of the body. Headers are emitted with the 'headers' event. Message
* body is passed on as the resulting stream.
*/
class MessageSplitter extends Transform {
constructor(options) {
super(options);
this.lastBytes = Buffer.alloc(4);
this.headersParsed = false;
this.headerBytes = 0;
this.headerChunks = [];
this.rawHeaders = false;
this.bodySize = 0;
}
/**
* Keeps count of the last 4 bytes in order to detect line breaks on chunk boundaries
*
* @param {Buffer} data Next data chunk from the stream
*/
_updateLastBytes(data) {
const lblen = this.lastBytes.length;
const nblen = Math.min(data.length, lblen);
// shift existing bytes
for (let i = 0, len = lblen - nblen; i < len; i++) {
this.lastBytes[i] = this.lastBytes[i + nblen];
}
// add new bytes
for (let i = 1; i <= nblen; i++) {
this.lastBytes[lblen - i] = data[data.length - i];
}
}
/**
* Finds and removes message headers from the remaining body. We want to keep
* headers separated until final delivery to be able to modify these
*
* @param {Buffer} data Next chunk of data
* @return {Boolean} Returns true if headers are already found or false otherwise
*/
_checkHeaders(data) {
if (this.headersParsed) {
return true;
}
const lblen = this.lastBytes.length;
let headerPos = 0;
this.curLinePos = 0;
for (
let i = 0, len = this.lastBytes.length + data.length;
i < len;
i++
) {
let chr;
if (i < lblen) {
chr = this.lastBytes[i];
} else {
chr = data[i - lblen];
}
if (chr === 0x0a && i) {
const pr1 =
i - 1 < lblen ? this.lastBytes[i - 1] : data[i - 1 - lblen];
const pr2 =
i > 1
? i - 2 < lblen
? this.lastBytes[i - 2]
: data[i - 2 - lblen]
: false;
if (pr1 === 0x0a) {
this.headersParsed = true;
headerPos = i - lblen + 1;
this.headerBytes += headerPos;
break;
} else if (pr1 === 0x0d && pr2 === 0x0a) {
this.headersParsed = true;
headerPos = i - lblen + 1;
this.headerBytes += headerPos;
break;
}
}
}
if (this.headersParsed) {
this.headerChunks.push(data.slice(0, headerPos));
this.rawHeaders = Buffer.concat(
this.headerChunks,
this.headerBytes
);
this.headerChunks = null;
this.headers = new Headers(this.rawHeaders);
this.emit('headers', this.headers);
if (data.length - 1 > headerPos) {
const chunk = data.slice(headerPos);
this.bodySize += chunk.length;
// this would be the first chunk of data sent downstream
// from now on we keep header and body separated until final delivery
setImmediate(() => this.push(chunk));
}
return false;
}
this.headerBytes += data.length;
this.headerChunks.push(data);
// store last 4 bytes to catch header break
this._updateLastBytes(data);
return false;
}
_transform(chunk, encoding, callback) {
if (!chunk || chunk.length === 0) {
return callback();
}
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
}
let headersFound;
try {
headersFound = this._checkHeaders(chunk);
} catch (err) {
return callback(err);
}
if (headersFound) {
this.bodySize += chunk.length;
this.push(chunk);
}
setImmediate(callback);
}
_flush(callback) {
if (this.headerChunks) {
// all chunks are checked but we did not find where the body starts
// so emit all we got as headers and push empty line as body
this.headersParsed = true;
// add header terminator
this.headerChunks.push(Buffer.from('\r\n\r\n'));
this.headerBytes += 4;
// join all chunks into a header block
this.rawHeaders = Buffer.concat(
this.headerChunks,
this.headerBytes
);
this.headers = new Headers(this.rawHeaders);
this.emit('headers', this.headers);
this.headerChunks = null;
// this is our body
this.push(Buffer.from('\r\n'));
}
callback();
}
}
// Setup server
const server = new SMTPServer({
// log to console
logger: true,
// not required but nice-to-have
banner: 'Welcome to My Awesome SMTP Server',
// disable STARTTLS to allow authentication in clear text mode
disabledCommands: ['AUTH', 'STARTTLS'],
// Accept messages up to 10 MB
size: MAX_ALLOWED_SIZE,
// Handle message stream
onData(stream, session, callback) {
let chunks = [];
let chunklen = 0;
let sizeExceeded = false;
let messageSplitter = new MessageSplitter();
messageSplitter.on('readable', () => {
let chunk;
while ((chunk = messageSplitter.read()) !== null) {
if (
!sizeExceeded &&
chunklen + chunk.length > MAX_ALLOWED_SIZE
) {
sizeExceeded = true;
console.log('Input to large!');
}
if (!sizeExceeded) {
chunks.push(chunk);
chunklen += chunk.length;
}
}
});
messageSplitter.on('end', () => {
let err;
if (sizeExceeded) {
err = new Error(
'Error: message exceeds fixed maximum message size 10 MB'
);
err.responseCode = 552;
return callback(err);
}
callback(null, `received ${chunklen} bytes`);
});
// handle errors that occur from parsing stream
messageSplitter.once('error', err => {
console.error(err);
// we still need to consume the stream
stream.unpipe(messageSplitter);
stream.on('readable', () => {
let chunk;
while ((chunk = stream.read()) !== null) {
chunklen += chunk.length;
}
});
stream.once('end', () => {
callback(err);
});
});
stream.on('error', err => {
callback(err);
});
stream.pipe(messageSplitter);
}
});
server.on('error', err => {
console.log('Error occurred');
console.log(err);
});
// start listening
server.listen(2500);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment