Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 510202c7..64669273 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -48,6 +48,7 @@ option(FLB_SQLDB "Enable SQL embedded DB" No)
option(FLB_HTTP_SERVER "Enable HTTP Server" No)
option(FLB_BACKTRACE "Enable stacktrace support" Yes)
option(FLB_LUAJIT "Enable Lua Scripting support" Yes)
+option(FLB_MRUBY "Enable mruby Scripting support" Yes)
# Metrics: Experimental Feature, disabled by default on 0.12 series
# but enabled in the upcoming 0.13 release. Note that development
@@ -121,6 +122,7 @@ option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes)
option(FLB_FILTER_RECORD_MODIFIER "Enable record_modifier filter" Yes)
option(FLB_FILTER_NEST "Enable nest filter" Yes)
option(FLB_FILTER_LUA "Enable Lua scripting filter" Yes)
+option(FLB_FILTER_MRUBY "Enable Lua scripting filter" Yes)
# Enable all features
if(FLB_ALL)
@@ -190,6 +192,7 @@ include_directories(
lib/sha1
lib/msgpack-2.1.3/include
lib/luajit-2.0.5/src/
+ lib/mruby/src/
${MONKEY_INCLUDE_DIR}
)
@@ -441,6 +444,20 @@ if(FLB_LUAJIT)
FLB_DEFINITION(FLB_HAVE_LUAJIT)
endif()
+if(FLB_MRUBY)
+ set(MRUBY_PATH ${CMAKE_CURRENT_SOURCE_DIR}/lib/mruby)
+ ExternalProject_Add(mruby
+ SOURCE_DIR ${MRUBY_PATH}
+ CONFIGURE_COMMAND pwd
+ BUILD_COMMAND cd ${MRUBY_PATH} && rake all
+ INSTALL_COMMAND cp ${MRUBY_PATH}/build/host/lib/libmruby.a "${CMAKE_CURRENT_BINARY_DIR}/lib/libmruby.a")
+ add_library(libmruby STATIC IMPORTED GLOBAL)
+ set_target_properties(libmruby PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/lib/libmruby.a")
+ add_dependencies(libmruby mruby)
+ include_directories("${CMAKE_CURRENT_SOURCE_DIR}/lib/mruby/include/")
+ FLB_DEFINITION(FLB_HAVE_MRUBY)
+endif()
+
# Pthread Local Storage
# =====================
# By default we expect the compiler already support thread local storage
diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt
index 5aeda7fc..96089a55 100644
--- a/plugins/CMakeLists.txt
+++ b/plugins/CMakeLists.txt
@@ -159,6 +159,10 @@ if(FLB_LUAJIT)
REGISTER_FILTER_PLUGIN("filter_lua")
endif()
+if(FLB_MRUBY)
+ REGISTER_FILTER_PLUGIN("filter_mruby")
+endif()
+
REGISTER_FILTER_PLUGIN("filter_record_modifier")
REGISTER_FILTER_PLUGIN("filter_nest")
REGISTER_FILTER_PLUGIN("filter_modify")
diff --git a/plugins/filter_mruby/CMakeLists.txt b/plugins/filter_mruby/CMakeLists.txt
new file mode 100644
index 00000000..8aeb9303
--- /dev/null
+++ b/plugins/filter_mruby/CMakeLists.txt
@@ -0,0 +1,5 @@
+set(src
+ mruby_config.c
+ mruby.c)
+
+FLB_PLUGIN(filter_mruby "${src}" "m")
diff --git a/plugins/filter_mruby/mruby.c b/plugins/filter_mruby/mruby.c
new file mode 100644
index 00000000..e17d656a
--- /dev/null
+++ b/plugins/filter_mruby/mruby.c
@@ -0,0 +1,177 @@
+#include <fluent-bit/flb_config.h>
+#include <msgpack.h>
+#include <fluent-bit.h>
+#include <fluent-bit/flb_time.h>
+#include <mruby/hash.h>
+#include <mruby/include/mruby/variable.h>
+#include <mruby/include/mruby/array.h>
+
+#include "mruby_config.h"
+
+#define FLB_FILTER_MODIFIED 1
+#define FLB_FILTER_NOTOUCH 2
+
+void mrb_tommsgpack(mrb_state *state, mrb_value value, msgpack_packer *pck)
+{
+ enum mrb_vtype type = mrb_type(value);
+
+ if (mrb_undef_p(value) || mrb_nil_p(value)) {
+ printf("undef or nil");
+ }
+
+ switch (type) {
+ case MRB_TT_STRING: {
+ char *c = RSTRING_PTR(value);
+ msgpack_pack_str(pck, strlen(c));
+ msgpack_pack_str_body(pck, c, strlen(c));
+ break;
+ }
+ case MRB_TT_HASH: {
+ mrb_value keys = mrb_hash_keys(state, value);
+ int len = RARRAY_LEN(keys);
+ msgpack_pack_map(pck, len);
+ for (int i = 0; i < len; i++) {
+ mrb_value key = mrb_ary_ref(state, keys, i);
+ mrb_tommsgpack(state, key, pck);
+ mrb_tommsgpack(state, mrb_hash_get(state, value, key), pck);
+ }
+ break;
+ }
+ }
+}
+
+mrb_value msgpack_obj_to_mrb_value(mrb_state *mrb, msgpack_object *record)
+{
+ int size, i;
+ char *s;
+ mrb_value mrb_v;
+
+ switch(record->type) {
+ case MSGPACK_OBJECT_STR:
+ s = flb_malloc(record->via.str.size);
+ strncpy(s, record->via.str.ptr, record->via.str.size);
+ s[record->via.str.size] = '\0';
+ mrb_v = mrb_str_new_cstr(mrb, s);
+ break;
+ case MSGPACK_OBJECT_MAP:
+ size = record->via.map.size;
+ if (size != 0) {
+ msgpack_object_kv *p = record->via.map.ptr;
+ for (i = 0; i < size; i++) {
+ msgpack_object *key = &(p+i)->key;
+ msgpack_object *val = &(p+i)->val;
+ mrb_v = mrb_hash_new(mrb);
+ mrb_hash_set(mrb, mrb_v, msgpack_obj_to_mrb_value(mrb, key), msgpack_obj_to_mrb_value(mrb, val));
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ return mrb_v;
+}
+
+static int cb_mruby_init(struct flb_filter_instance *f_ins,
+ struct flb_config *config,
+ void *data)
+{
+ struct mruby_filter *ctx;
+ struct mf_t *mf;
+ mrb_value obj;
+
+ // Create mrb_state
+ mf = flb_calloc(1, sizeof(struct mf_t));
+ mf->mrb_state = mrb_open();
+ mf->mrb_state->ud = mf;
+
+ // Create context
+ ctx = flb_calloc(1, sizeof(struct mruby_filter));
+ ctx->mf = mf;
+ ctx->call = flb_filter_get_property("call", f_ins);
+
+ // Load mruby script
+ FILE* fp = fopen(flb_filter_get_property("script", f_ins), "r");
+ obj = mrb_load_file(mf->mrb_state, fp);
+ ctx->mf->obj = obj;
+ fclose(fp);
+
+ // Set context
+ flb_filter_set_context(f_ins, ctx);
+
+ return 0;
+}
+
+static int cb_mruby_filter(void *data, size_t bytes,
+ char *tag, int tag_len,
+ void **out_buf, size_t *out_bytes,
+ struct flb_filter_instance *f_ins,
+ void *filter_context,
+ struct flb_config *config)
+{
+ struct mruby_filter *ctx = filter_context;
+
+ size_t off = 0;
+ double ts;
+ struct flb_time t;
+ msgpack_object *p;
+ msgpack_sbuffer tmp_sbuf;
+ msgpack_packer tmp_pck;
+ msgpack_unpacked result;
+
+ msgpack_sbuffer_init(&tmp_sbuf);
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
+
+ msgpack_unpacked_init(&result);
+ while (msgpack_unpack_next(&result, data, bytes, &off)) {
+ msgpack_packer data_pck;
+ msgpack_sbuffer data_sbuf;
+ mrb_state *mrb_state;
+ mrb_value value;
+
+ mrb_state = ctx->mf->mrb_state;
+
+ msgpack_sbuffer_init(&data_sbuf);
+ msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write);
+
+ flb_time_pop_from_msgpack(&t, &result, &p);
+ ts = flb_time_to_double(&t);
+
+ ctx->mf->ts = ts;
+ ctx->mf->tag = tag;
+ ctx->mf->record = p;
+
+ value = mrb_funcall(mrb_state, ctx->mf->obj, ctx->call, 3, mrb_str_new_cstr(mrb_state, tag), mrb_float_value(mrb_state, ts), msgpack_obj_to_mrb_value(mrb_state, p));
+
+ msgpack_pack_array(&tmp_pck, 2);
+ flb_time_from_double(&t, ts);
+ flb_time_append_to_msgpack(&t, &tmp_pck, 0);
+ mrb_tommsgpack(mrb_state, value, &tmp_pck);
+
+ msgpack_sbuffer_destroy(&data_sbuf);
+ }
+ msgpack_unpacked_destroy(&result);
+
+ *out_buf = tmp_sbuf.data;
+ *out_bytes = tmp_sbuf.size;
+
+ return FLB_FILTER_MODIFIED;
+}
+
+static int cb_mruby_exit(void *data, struct flb_config *config)
+{
+ struct mruby_filter *ctx;
+
+ ctx = data;
+ mrb_close(ctx->mf->mrb_state);
+ free(ctx->mf);
+ return 0;
+}
+
+struct flb_filter_plugin filter_mruby_plugin = {
+ .name = "mruby",
+ .description = "mruby Scriptiong Filter",
+ .cb_init = cb_mruby_init,
+ .cb_filter = cb_mruby_filter,
+ .cb_exit = cb_mruby_exit,
+ .flags = 0
+};
\ No newline at end of file
diff --git a/plugins/filter_mruby/mruby_config.c b/plugins/filter_mruby/mruby_config.c
new file mode 100644
index 00000000..d5e24c6c
--- /dev/null
+++ b/plugins/filter_mruby/mruby_config.c
@@ -0,0 +1 @@
+#include "mruby_config.h"
\ No newline at end of file
diff --git a/plugins/filter_mruby/mruby_config.h b/plugins/filter_mruby/mruby_config.h
new file mode 100644
index 00000000..02ef4280
--- /dev/null
+++ b/plugins/filter_mruby/mruby_config.h
@@ -0,0 +1,25 @@
+#include <fluent-bit/flb_info.h>
+#include <fluent-bit/flb_filter.h>
+#include <fluent-bit/flb_luajit.h>
+#include <fluent-bit/flb_sds.h>
+
+#include "mruby.h"
+#include "mruby/compile.h"
+#include "mruby/string.h"
+
+typedef struct mf_t {
+ double ts;
+ char *tag;
+ msgpack_object *record;
+ mrb_value obj;
+ struct mrb_state *mrb_state;
+
+} mf;
+
+struct mruby_filter {
+ flb_sds_t call;
+ struct mf_t *mf;
+};
+
+MRB_API mrb_value mrb_str_new_cstr(mrb_state*, const char*);
+mrb_value msgpack_obj_to_mrb_value(mrb_state*, msgpack_object*);
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4796ce5e..7a7be35b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -136,6 +136,12 @@ if(FLB_LUAJIT)
"libluajit")
endif()
+if(FLB_MRUBY)
+ set(extra_libs
+ ${extra_libs}
+ "libmruby")
+endif()
+
if(FLB_SQLDB)
set(src
${src}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment