Skip to content

Instantly share code, notes, and snippets.

@colinmollenhour
Last active June 13, 2023 23:49
Show Gist options
  • Save colinmollenhour/84d400f65815cd58cb988725750a3b6e to your computer and use it in GitHub Desktop.
Save colinmollenhour/84d400f65815cd58cb988725750a3b6e to your computer and use it in GitHub Desktop.
Watch the spool, delay processing of emails from unrecognized domain names.
/**
* Watch the spool, delay processing of emails from unrecognized domain names.
*
* Installation:
* npm install nedb
* npm install node-windows
* node server.js --install {spool_dir}
* net start SpamDelayer
*
* @Author Colin Mollenhour
* @Copyright Colin Mollenhour (2016)
*/
var fs = require('fs')
,path = require('path')
;
var serviceName = 'SpamDelayer'
,scriptPath = fs.realpathSync(process.argv[1])
,appDir = null
,holdDir = null
,logFile = null// {Spool}/SpamDelayer/logfile.txt
,debug = false
,delayTime = 15*60*1000 // 15 minutes in ms
,isWindows = process.platform === "win32"
,logger = (isWindows ? new (require('node-windows').EventLogger)(serviceName) : null)
;
// The --install argument installs the Windows service. The last argument should be the spool directory.
if (process.argv.length == 4 && process.argv[2] == '--install') {
installService(process.argv[3]);
}
// The --uninstall argument uninstalls the Windows service.
else if (process.argv.length == 3 && process.argv[2] == '--uninstall') {
uninstallService();
}
// Invalid arguments
else if (process.argv.length != 2) {
process.stdout.write("Invalid arguments.");
}
// Otherwise we watch the spool directory
else {
if ( ! process.env['SPOOL_DIR']) {
process.stdout.write("Missing environment variable SPOOL_DIR\r\n");
}
if (process.env['DEBUG']) {
debug = true;
writeLog("info", "Debug mode enabled.");
}
watchSpool(process.env['SPOOL_DIR']);
}
function writeLog(severity, message) {
try {
if (logFile) {
fs.appendFileSync(logFile, (new Date()).toUTCString()+" ("+severity+") "+message+"\r\n");
} else {
process.stdout.write(severity+": "+message+"\r\n");
}
} catch (e) {
if (logger) logger.error(e.message);
}
if (logger) {
if (severity == "warn") logger.warn(message);
else if (severity == "error") logger.error(message);
else if (severity != "info") logger.error("Severity is not valid: "+severity);
}
}
function initAppDirectories(spoolDir) {
if ( ! fs.existsSync(spoolDir)) {
process.stdout.write("The spool directory cannot be accessed: "+spoolDir+"\r\n");
process.exit(1);
}
appDir = path.join(spoolDir, serviceName);
ensureDirectory(appDir);
logFile = path.join(appDir,'logfile.txt');
holdDir = path.join(spoolDir, 'hold');
ensureDirectory(holdDir);
}
function installService(spoolDir) {
initAppDirectories(spoolDir);
require('nedb'); // Check dependencies
var Service = require('node-windows').Service;
var svc = new Service({
name: serviceName,
description: 'A node.js process to delay processing of potential spam messages.',
script: scriptPath,
env: [
{ name: "SPOOL_DIR", value: spoolDir }
]
});
svc.on('install',function() {
writeLog("info", "Service '"+serviceName+"' installed.");
process.stdout.write("Service installed. Run 'net start "+serviceName+"' to start it.\r\n");
process.exit(0);
});
svc.on('alreadyinstalled',function() {
process.stdout.write("Service '"+serviceName+"' has already been installed.\r\n");
process.exit(1);
});
svc.on('invalidinstallation',function() {
process.stdout.write("Service installed but some required files were missing.. ??\r\n");
process.exit(0);
});
svc.on('error', function(error) {
process.stdout.write("Error installing Windows service: "+error+"\r\n");
process.exit(1);
});
svc.install();
}
function uninstallService() {
var Service = require('node-windows').Service;
var svc = new Service({
name: serviceName,
script: scriptPath
});
svc.on('uninstall',function(){
process.stdout.write("Service '"+serviceName+"' has been uninstalled.\r\n");
process.exit(0);
});
svc.on('error', function(error) {
process.stdout.write("Error uninstalling Windows service: "+error+"\r\n");
process.exit(1);
});
svc.uninstall();
}
function ensureDirectory(dir) {
if ( ! fs.existsSync(dir)) {
try {
fs.mkdirSync(dir);
} catch (e) {
process.stdout.write("Unable to create directory: "+dir+" ("+e+")\r\n");
process.exit(1);
}
}
}
function watchSpool(spoolDir) {
initAppDirectories(spoolDir);
process.chdir(spoolDir);
writeLog("info", "Starting "+serviceName+" in "+process.cwd());
// Instantiate database
var Datastore = require('nedb')
, db = new Datastore({ filename: path.join(appDir,'domains.db'), autoload: true });
db.persistence.setAutocompactionInterval(4*60*60*1000); // Compact every 4 hours
// Process files that exist in proc and hold directory before starting watcher
try {
fs.readdirSync('proc').forEach(function(file) {
if (file.match(/\.hdr$/)) {
if (debug) writeLog("info", "Processing old file: "+file);
processProcFile(path.join('proc',file));
}
});
fs.readdirSync('hold').forEach(function(file) {
if (file.match(/\.hdr$/)) {
moveBackToSpool(path.join('hold',file));
}
});
} catch (e) {
writeLog("exit", "Could not process existing files in proc and hold directories.");
process.exit(1);
}
// Start watcher
writeLog("info", "Watching proc directory");
var queue = {};
try {
var watcher = fs.watch('proc', function(event, filename) {
if (debug) writeLog("info", "Received "+event+" on "+filename);
if (event != 'change' || ! filename || ! filename.match(/\.hdr$/)) return;
var procFile = path.join('proc', filename);
if (queue[procFile]) return;
setTimeout( function() { delete queue[procFile]; processProcFile(procFile); }, 100);
});
} catch (e) {
writeLog("error", "Error watching proc directory: "+e);
process.exit(1);
}
watcher.on('error', function(error){
writeLog("error", "Error watching "+spoolDir+": "+error);
});
// Process a file in the /proc directory
function processProcFile(procFile) {
fs.readFile(procFile, {encoding: 'utf8'}, function(err, data) {
if (err) {
writeLog("error", "Unable to read file data: "+procFile);
return;
}
if (debug) writeLog("info", "Read "+data.length+" bytes from "+procFile);
if (data.charCodeAt(0) === 0xFEFF) { // Strip BOM from string
data = data.slice(1);
}
var lines = data.split("\r\n");
if (debug) writeLog("info", lines.join('|'));
if ( lines[0].match(/^Failed/)) {
if (debug) writeLog("info", "Failed message in "+procFile+": "+lines);
fs.unlink(procFile, function(err){ if (err) writeLog("error", "Error deleting "+procFile+": "+err); } );
fs.unlink(procFile.replace(/(.*)\.hdr$/, '$1.eml'), function(err){ if (err) writeLog("error", "Error deleting "+procFile.replace(/(.*)\.hdr$/, '$1.eml')+": "+err); } );
}
if ( ! lines[0].match(/^Written/)) {
if (debug) writeLog("info", "Email is not 'Written': "+lines[0]);
return;
}
if (lines.length > 5 &&
lines[1].match(/@/) &&
lines.some(function(line){ return line == 'containsLocalDeliveries: True'; })
) {
var domain = lines[1].split(/@/)[1];
var secondLevelDomain = domain.split('.').slice(-2).join('.');
if (secondLevelDomain.match(/\.uk$/) ||
secondLevelDomain.match(/^(com?|org|net|gov|edu|biz)\.[a-z]{2}$/) ||
secondLevelDomain.match(/^[a-z]{2}\.(gov|us)$/)
) {
secondLevelDomain = domain.split('.').slice(-3).join('.');
}
if (debug) writeLog("info", "Looking up domain "+secondLevelDomain+" for file "+procFile);
lookupDomain(secondLevelDomain, procFile);
} else {
if (debug) writeLog("info", "File does not meet delay criteria");
setTimeout(function(){ moveBackToSpool(procFile); }, 200);
}
});
}
// Lookup a domain and process the file accordingly
function lookupDomain(domain, procFile) {
var now = new Date();
var doc = {
_id: domain,
firstSeen: now
};
db.insert(doc, function(err, newDoc){
// Insert, check timestamp if exists
if (err && err.errorType == 'uniqueViolated') {
if (debug) writeLog("info", "Key already exists in database: "+domain);
db.findOne({ _id: domain }, function(err2, doc) {
if (err2) {
writeLog("error", "Error searching for "+domain+": "+err2);
setTimeout(function(){ moveBackToSpool(procFile); }, 100);
}
else if ( ! doc) {
writeLog("error", "Domain cannot be inserted but cannot be found either!? "+domain);
setTimeout(function(){ moveBackToSpool(procFile); }, 100);
}
// Record is older than delayTime - no delay
else if (now.getTime() > doc.firstSeen.getTime() + delayTime) {
if (debug) writeLog("info", domain+" is old news: "+doc.firstSeen);
setTimeout(function(){ moveBackToSpool(procFile); }, 100);
}
// Record is younger than delayTime - delay up to delayTime
else {
var waitTime = delayTime - (now.getTime() - doc.firstSeen.getTime());
if (debug) writeLog("info", domain+" is still new. waiting for "+waitTime);
setTimeout(function(){ delayRespool(procFile, waitTime); }, 100);
}
});
// TODO - update with lastSeen and count?
}
// Unexpected insert error
else if (err) {
writeLog("error", "Error inserting "+domain+": "+err.errorType+" - "+err.message);
setTimeout(function(){ moveBackToSpool(procFile); }, 100);
}
// First insert, delay for maximum time
else {
setTimeout(function(){ delayRespool(procFile, delayTime); }, 100);
}
});
}
// Move a file to holding location for waitTime ms and then back to spool
function delayRespool(file, waitTime) {
if (debug) writeLog("info", "Moving "+file+" to holding location.");
var dirname = path.dirname(file);
var basename = path.basename(file, '.hdr');
var holdFile = path.join(holdDir, basename)+'.hdr';
fs.rename(path.join(dirname, basename)+'.eml', path.join(holdDir, basename)+'.eml', function(err) {
if (err) {
writeLog("error", "Error moving "+path.join(dirname, basename)+'.eml'+" to hold directory: "+err);
}
});
fs.rename(path.join(dirname, basename)+'.hdr', path.join(holdDir, basename)+'.hdr', function(err) {
if (err) {
writeLog("error", "Error moving "+path.join(dirname, basename)+'.hdr'+" to hold directory: "+err);
}
});
setTimeout(function(){ moveBackToSpool(holdFile); }, waitTime);
}
// Move a file back to the spool when ready
function moveBackToSpool(file) {
if (debug) writeLog("info", "Moving "+file+" back to spool.");
var dirname = path.dirname(file);
var basename = path.basename(file, '.hdr');
fs.rename(path.join(dirname, basename)+'.eml', path.join(spoolDir, basename)+'.eml', function(err) {
if (err) {
writeLog("error", "Error moving "+path.join(dirname, basename)+'.eml'+" back to spool: "+err);
}
});
fs.rename(path.join(dirname, basename)+'.hdr', path.join(spoolDir, basename)+'.hdr', function(err) {
if (err) {
writeLog("error", "Error moving "+path.join(dirname, basename)+'.hdr'+" back to spool: "+err);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment