Skip to content

Instantly share code, notes, and snippets.

@mhart
Last active April 14, 2019 15:38
Show Gist options
  • Save mhart/4c84c567ecdec6bcec43 to your computer and use it in GitHub Desktop.
Save mhart/4c84c567ecdec6bcec43 to your computer and use it in GitHub Desktop.
node_modules/awslambda
#!/usr/bin/env node
var path = require('path')
var fs = require('fs')
var lib = path.join(path.dirname(fs.realpathSync(__filename)), '../lib');
require(lib + '/awslambda.js').start_runtime();
//
// Node.js runtime.cc
// Lambda
//
// Copyright (c) 2013 Amazon. All rights reserved.
//
#include <node.h>
#include <v8.h>
#include <uv.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/un.h>
extern "C" {
#include "runtime.h"
#include "util.h"
}
#define TYPE_ERROR(msg) ThrowException(Exception::TypeError(String::New(msg)));
namespace awslambda {
using namespace node;
using namespace v8;
static Handle<Value> ReceiveStart()
{
sb_start_request request;
HandleScope scope;
if (runtime_recv_start(__runtime, &request)) {
return scope.Close(ThrowException(ErrnoException(errno, "receive_start")));
}
Handle<Object> credentials = Object::New();
credentials->Set(String::New("key"), String::New(request.credentials.key));
credentials->Set(String::New("secret"), String::New(request.credentials.secret));
credentials->Set(String::New("session"), String::New(request.credentials.session));
Handle<Object> object = Object::New();
object->Set(String::New("invokeid"), String::New(request.invokeid));
object->Set(String::New("handler"), String::New(request.handler));
object->Set(String::New("mode"), String::New(ENUM_STRING(lambda_runtime_mode, request.mode)));
object->Set(String::New("credentials"), credentials);
object->Set(String::New("suppress_init"), Boolean::New(request.suppress_user_init_function));
return scope.Close(object);
}
static Handle<Value> ReportRunning(const Arguments& args)
{
HandleScope scope;
if (!__runtime) {
return ThrowException(Exception::Error(String::New(RUNTIME_ERROR_UNINITIALIZED)));
}
String::Utf8Value invokeid(args[0]);
if (runtime_report_running(__runtime, *invokeid)) {
return ThrowException(ErrnoException(errno, "report_running"));
}
return scope.Close(v8::Null());
}
static Handle<Value> ReportDone(const Arguments& args)
{
HandleScope scope;
if (!__runtime) {
return ThrowException(Exception::Error(String::New(RUNTIME_ERROR_UNINITIALIZED)));
}
if (args.Length() < 1) {
return TYPE_ERROR(RUNTIME_ERROR_INVALID_ARGS);
}
String::Utf8Value invokeid(args[0]);
String::Utf8Value errorAsString(args[1]);
String::Utf8Value messageAsString(args[2]);
char const *error = NULL;
if (!((*args[1])->IsNull() || (*args[1])->IsUndefined())) {
error = *errorAsString;
}
char const *message = NULL;
if (!((*args[2])->IsNull() || (*args[2])->IsUndefined())) {
message = *messageAsString;
}
if (runtime_report_done(__runtime, *invokeid, error, message)) {
return ThrowException(ErrnoException(errno, "report_done"));
}
return scope.Close(v8::Null());
}
static Handle<Value> ReportFault(const Arguments& args)
{
HandleScope scope;
if (args.Length() > 4) {
return TYPE_ERROR(RUNTIME_ERROR_INVALID_ARGS);
}
String::Utf8Value invokeid(args[0]);
String::Utf8Value msg(args[1]);
String::Utf8Value exceptionAsString(args[2]);
String::Utf8Value traceAsString(args[3]);
char const *exception = NULL;
if (!((*args[2])->IsNull() || (*args[2])->IsUndefined())) {
exception = *exceptionAsString;
}
char const *trace = NULL;
if (!((*args[3])->IsNull() || (*args[3])->IsUndefined())) {
trace = *traceAsString;
}
if (runtime_report_fault(__runtime, *invokeid, *msg, exception, trace)) {
return ThrowException(ErrnoException(errno, "report_fault"));
}
return scope.Close(v8::Null());
}
/**
* This struct contains all the data necessary to perform an wait_for_invoke request
* using the uv_queue_worker API, which allows blocking code to be scheduled to be run on a separate
* thread.
*/
typedef struct wait_for_invoke_work {
//input values
uv_work_t req; //this is used by libuv
Persistent<Function> callback; //this is the node.js javascript callback to invoke when the worker is done
//return values
int rc; // if non-zero, the initialization failed
int _errno; //set to something meaningful when rc is non-zero
int runtime_uninitialized;
int data_sock;
char invokeid[INVOKE_ID_SIZE];
awscredentials creds;
char json_event_body[LAMBDA_EVENT_BODY_SIZE];
} wait_for_invoke_work;
static void wait_for_invoke_do(uv_work_t* req)
{
int rc = 0;
wait_for_invoke_work *work = (wait_for_invoke_work *)req->data;
//TODO clean up by dis-entangling runtime initialization from receive_start
if(!__runtime) {
work->rc = -1;
work->_errno = ENOENT;
return;
}
rc = runtime_recv_invoke(__runtime, work->invokeid, &work->creds, &work->data_sock, work->json_event_body, sizeof(work->json_event_body));
if(rc) {
work->rc = -1;
work->_errno = errno;
work->runtime_uninitialized = 1;
} else {
work->rc = 0;
work->_errno = 0;
work->runtime_uninitialized = 0;
}
}
/**
* this function gets called by uv in a separate thread, after init_runtime_do has been called.
*/
static void post_wait_for_invoke_do(uv_work_t* req, int status) {
HandleScope scope;
wait_for_invoke_work *work = (wait_for_invoke_work *)req->data;
Handle<Value> argv[1] = {Undefined()};
if(work->rc) {
if(work->runtime_uninitialized) {
argv[0] = Exception::Error(String::New(RUNTIME_ERROR_UNINITIALIZED));
} else {
argv[0] = ErrnoException(work->_errno, "WaitForInvokeNb");
}
} else {
Handle<Object> credentials = Object::New();
credentials->Set(String::New("key"), String::New(work->creds.key));
credentials->Set(String::New("secret"), String::New(work->creds.secret));
credentials->Set(String::New("session"), String::New(work->creds.session));
Handle<Object> object = Object::New();
object->Set(String::New("invokeid"), String::New(work->invokeid));
object->Set(String::New("sockfd"), Integer::New(work->data_sock));
object->Set(String::New("credentials"), credentials);
object->Set(String::New("eventbody"), String::New(work->json_event_body));
argv[0] = object;
}
//invoke the javascript callback function, either with an exception or with the receive_start data
node::MakeCallback(Context::GetCurrent()->Global(),
work->callback,
1,
argv);
//TODO do we need to worry about C++ exceptions here?
work->callback.Dispose();
work->callback.Clear();
free(work);
}
/**
* wait for invoke asynchronously.
* Arguments are:
* the control socket (int)
* a callback to invoke when function finishes (function)
*/
static Handle<Value> WaitForInvokeNb(const Arguments& args)
{
HandleScope scope;
wait_for_invoke_work *work;
if(!args[0]->IsFunction()) {
return TYPE_ERROR(RUNTIME_ERROR_INVALID_ARGS);
}
work = (wait_for_invoke_work *)calloc(1, sizeof(work[0]));
work->req.data = work;
work->callback = Persistent<Function>::New(args[0].As<Function>());
uv_queue_work(uv_default_loop(), &work->req, wait_for_invoke_do, post_wait_for_invoke_do);
return Undefined();
}
/**
* Sends the console logs to the logger process.
*/
static Handle<Value> SendConsoleLogs(const Arguments& args)
{
if (args.Length() < 1) {
return TYPE_ERROR(RUNTIME_ERROR_INVALID_ARGS);
}
String::Utf8Value console_log(args[0]);
runtime_send_console_message(__runtime, *console_log);
return Undefined();
}
static Handle<Value> InitRuntime(const Arguments& args)
{
HandleScope scope;
Handle<Value> startData;
startData = ReceiveStart();
return scope.Close(startData);
}
void Initialize(v8::Handle<v8::Object> exports)
{
if(runtime_init()) {
exit(-1);
}
exports->Set(String::NewSymbol("report_running"),
FunctionTemplate::New(ReportRunning)->GetFunction());
exports->Set(String::NewSymbol("report_done"),
FunctionTemplate::New(ReportDone)->GetFunction());
exports->Set(String::NewSymbol("report_fault"),
FunctionTemplate::New(ReportFault)->GetFunction());
exports->Set(String::NewSymbol("init_runtime"),
FunctionTemplate::New(InitRuntime)->GetFunction());
exports->Set(String::NewSymbol("wait_for_invoke_nb"),
FunctionTemplate::New(WaitForInvokeNb)->GetFunction());
exports->Set(String::NewSymbol("send_console_logs"),
FunctionTemplate::New(SendConsoleLogs)->GetFunction());
}
} // namespace awslambda
NODE_MODULE(awslambda, awslambda::Initialize)
var net = require("net");
var repl = require("repl");
var http = require("http");
var util = require("util");
awslambda = require("../build/Release/awslambda");
// Logging helpers
function rt_console_log(message) {
console.log("[nodejs] " + message);
}
function rt_console_trace(message) {
console.trace("[nodejs] " + message);
}
function rt_console_error(message) {
console.error("[nodejs] " + message);
}
// Filter out from stack traces awslambda.js and all frames below it
function customPrepareStackTrace(error, stack) {
var idx = stack.length;
for(var i = 0; i < stack.length; i++) {
if(stack[i].getFileName() == __filename) {
idx = i;
break;
}
}
var lines = new Array();
lines[0] = error;
for (var i = 0; i < idx; i++) {
var frame = stack[i];
var line;
try {
line = frame.toString();
} catch (e) {
try {
line = "<error: " + e + ">";
} catch (ee) {
line = "<error>";
}
}
lines[i+1] = " at " + line;
}
return lines.join("\n");
}
// node.js stack traces have the error message on the first line.
// Since we already report the error message in another field, strip it from the stack to avoid redundancy.
function stripMessageFromStack(stack) {
if(Error.prepareStackTrace != customPrepareStackTrace || (typeof stack === 'undefined') || stack == null) {
return null;
} else {
return stack.slice(stack.indexOf("\n") + 1);
}
}
function wrap_user_handler(user_handler, mode) {
// Dispatch to handler
switch(mode) {
case "http":
return wrap_http(user_handler);
case "json":
return wrap_json(user_handler);
case "event":
return wrap_event_invoke(user_handler);
default:
return function(invokeid, sock) {
sock.destroy();
awslambda.report_fault(invokeid, "invalid mode specified: " + mode, null, null);
awslambda.report_done(invokeid);
}
}
}
function wrap_event_invoke(user_handler) {
return function(invokeid, json_string, event_response, postDone) {
try {
var args = JSON.parse(json_string);
}
catch(err) {
awslambda.report_fault(invokeid, "Unable to parse input as json");
postDone();
return;
}
user_handler(args, event_response);
}
}
function wrap_http(user_handler) {
var handler_with_invokeid = function(request, response) {
request._aws_invokeid = request.socket._aws_invokeid;
user_handler(request, response);
};
var server = http.createServer(handler_with_invokeid);
// Catches all errors originating from the client connection, including:
// invalid HTTP request
// socket errors
server.on('clientError', function(exception, sock) {
awslambda.report_fault(sock._aws_invoke_id, "Unable to parse HTTP request", exception, stripMessageFromStack(exception.stack));
sock.destroy();
});
return function(invokeid, sock) {
sock._aws_invokeid = invokeid;
server.emit('connection', sock);
}
}
function wrap_json(handler) {
return function(invokeid, sock) {
sock.on('data', function(data) {
try {
var args = JSON.parse(data);
}
catch(err) {
awslambda.report_fault(invokeid, "Unable to parse input as json", err, null);
sock.destroy();
return;
}
handler(args, function(response) {
try {
var output = JSON.stringify(response, null);
}
catch(err) {
awslambda.report_fault(invokeid, "Unable to dump output as json", err, null);
sock.destroy();
return;
}
sock.write(output);
sock.end();
});
});
}
}
function set_creds(credentials) {
if(credentials === undefined) {
return;
}
if (credentials['key']) {
process.env['AWS_ACCESS_KEY_ID'] = credentials['key'];
}
if (credentials['secret']) {
process.env['AWS_SECRET_ACCESS_KEY'] = credentials['secret'];
}
if (credentials['session']) {
process.env['AWS_SESSION_TOKEN'] = credentials['session'];
}
}
function get_handlers(handler_string, mode, suppress_init) {
if(suppress_init) {
return get_handlers_delayed(handler_string, mode);
} else {
return get_handlers_immediate(handler_string, mode);
}
}
/**
* delay loading the user's code until an invoke occurs, to ensure we don't crash the runtime.
*/
function get_handlers_delayed(handler_string, mode) {
var modules_loaded = false;
var real_request_handler = undefined;
var request_handler = function(invokeid, sock) {
if(modules_loaded) {
return real_request_handler(invokeid, sock);
} else {
try {
var handlers = get_handlers_immediate(handler_string, mode);
var init_handler = handlers[0];
real_request_handler = handlers[1];
/*
* We can't call the user's init function here.
* Nodejs has an amazing amount of quirks, bugs, and weird behavior.
* In this case, if the user's init function does something asynchronous,
* nodejs by default reads data from the socket as it becomes available and stuffs it
* in an in-memory buffer. The HTTP parser that eventually gets attached to the socket
* ignores this buffer, so it misses part or all of the HTTP request.
*/
/*
init_handler(function() {
return real_request_handler(invokeid, sock);
});*/
return real_request_handler(invokeid, sock);
} finally {
modules_loaded = true;
}
}
};
return [function(done) { done(); }, request_handler];
}
function get_handlers_immediate(handler_string, mode) {
var app_parts = handler_string.split(".");
var init_handler = function(done) { done(); }
var request_handler;
var finisher;
if(mode == 'event') {
finisher = function(invokeid, json_string, event_response, postDone) {
event_response.done();
};
} else {
finisher = function(invokeid, sock) {
sock.destroy();
}
}
if(app_parts.length != 2) {
request_handler = function() {
awslambda.report_fault(arguments[0], "Bad handler " + handler_string);
finisher.apply(this, arguments);
}
} else {
var module_path = app_parts[0];
var handler_name = app_parts[1];
var init_handler_name = "init";
try {
var app = require(module_path);
init_handler = app[init_handler_name] || init_handler;
var user_handler = app[handler_name];
if (user_handler === undefined) {
request_handler = function() {
awslambda.report_fault(arguments[0], "Handler '" + handler_name + "' missing on module '" + module_path + "'", null, null);
finisher.apply(this, arguments);
};
} else {
request_handler = wrap_user_handler(user_handler, mode);
}
}
catch (e) {
if (e.code == "MODULE_NOT_FOUND") {
request_handler = function() {
awslambda.report_fault(arguments[0], "Unable to import module '" + module_path + "'", e, stripMessageFromStack(e.stack));
finisher.apply(this, arguments);
};
} else if (e instanceof SyntaxError) {
request_handler = function() {
awslambda.report_fault(arguments[0], "Syntax error in module '" + module_path + "'", e, stripMessageFromStack(e.stack));
finisher.apply(this, arguments);
};
} else {
request_handler = function() {
awslambda.report_fault(arguments[0], "module initialization error", e, stripMessageFromStack(e.stack));
finisher.apply(this, arguments);
}
}
}
}
if(init_handler === undefined) {
init_handler = function(on_done) { on_done(); }
}
return [init_handler, request_handler];
}
function do_init(options) {
var init_invokeid = options['invokeid'];
awslambda.report_running(init_invokeid);
global_invokeid = init_invokeid;
set_creds(options['credentials']);
//monkey patching to change console.log behavior
var old_console_log = console.log;
function pretty_console_log() {
var dateString = new Date().toISOString();
//This is how we used to print before
//util.print(dateString + " RequestID: " + global_invokeid + " ");
//old_console_log.apply(console, arguments);
var message = dateString + "\t" + global_invokeid + "\t" + util.format.apply(this, arguments) + "\n";
awslambda.send_console_logs(message);
}
console.log = console.error = console.warn = console.info = pretty_console_log;
var handler_string = options['handler'];
var handlers = get_handlers(handler_string, options['mode'], options['suppress_init']);
var init_handler = handlers[0];
var request_handler = handlers[1];
var callback = function(options) {
return invoke_callback(request_handler, options);
};
function on_init_done() {
awslambda.report_done(init_invokeid);
awslambda.wait_for_invoke_nb(callback);
};
try {
init_handler(on_init_done);
} catch(e) {
awslambda.report_fault(init_invokeid, "init handler error", e, stripMessageFromStack(e.stack));
on_init_done();
}
}
function finish_invoke(request_handler, invokeid, error, message) {
awslambda.report_done(invokeid, error, message);
//when the task is complete, listen for a new task
function callback(options) {
return invoke_callback(request_handler, options);
};
awslambda.wait_for_invoke_nb(callback);
}
function invoke_callback(request_handler, options) {
var invokeid = options['invokeid'];
var event_body = options['eventbody'];
var sockfd = options['sockfd'];
global_invokeid = invokeid;
set_creds(options['credentials']);
if(event_body && sockfd < 0) {
var doneStatus = false;
var postDone = function(error, message) {
finish_invoke(request_handler, invokeid, error, message);
};
function LambdaEventResponse(invokeid) {
this.invokeid = invokeid;
}
LambdaEventResponse.prototype.done = function(err, message) {
if(doneStatus) {
return;
}
doneStatus = true;
var error = null;
if(!(typeof err == "undefined" || (typeof err == "object" && !err))) {
error = util.format(err);
console.log(error);
}
/*
* use a timeout to perform the operation once the user gives up control of the event thread
* This is how HTTP handler works right now
*/
setTimeout(function() {
postDone(error, message);
}, 0);
}
event_response = new LambdaEventResponse(invokeid);
request_handler(invokeid, event_body, event_response, postDone);
} else if(!event_body && sockfd >= 0) {
var sockopts = {
fd: sockfd,
allowHalfOpen: true,
readable: true,
writable: true
};
global_data_sock = new net.Socket(sockopts);
global_data_sock.on('close', function() { finish_invoke(request_handler, invokeid); });
request_handler(invokeid, global_data_sock);
} else {
awslambda.report_fault(invokeid, "invalid args - eventbody = " + event_body + " socket =" + sockfd ,null, null);
finish_invoke(request_handler, invokeid);
}
}
var global_data_sock = undefined;
var global_invokeid = undefined;
exports.start_runtime = function() {
Error.prepareStackTrace = customPrepareStackTrace;
// Load native runtime
try {
//TODO define native functions for logging locally instead of to cloudwatch
//rt_console_log("Loading runtime");
} catch (e) {
if (e.code == "MODULE_NOT_FOUND") {
//rt_console_log("Lambda runtime not found");
return;
} else {
throw e;
}
}
// Init runtime
//rt_console_log('Initializing runtime');
var options = awslambda.init_runtime();
do_init(options);
}
process.on('uncaughtException', function (e) {
awslambda.report_fault(global_invokeid, "Failure while running task", e, stripMessageFromStack(e.stack));
if(global_data_sock !== undefined) {
global_data_sock.destroy();
}
});
@Vadorequest
Copy link

Vadorequest commented Apr 14, 2019

For what version of the nodejs runtime is that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment