Skip to content

Instantly share code, notes, and snippets.

@TApplencourt
Created October 14, 2022 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TApplencourt/ce51bbb4a459be987d9c73c64285ca75 to your computer and use it in GitHub Desktop.
Save TApplencourt/ce51bbb4a459be987d9c73c64285ca75 to your computer and use it in GitHub Desktop.
bt2_filter
#include "utlist.h"
#include <babeltrace2/babeltrace.h>
#include <stdio.h>
#include <stdlib.h>
struct el {
const bt_message *message;
struct el *next, *prev;
};
// Losely based on
// https://babeltrace.org/docs/v2.0/libbabeltrace2/example-simple-sink-cmp-cls.html
struct xprof_common_data {
/* Component's input port (weak) */
bt_self_component_port_input *in_port;
struct el *queue;
struct el *pool;
};
/* Message iterator's private data */
struct xprof_message_iterator {
/* (Weak) link to the component's private data */
struct xprof_common_data *common_data;
/* Upstream message iterator (owned by this) */
bt_message_iterator *message_iterator;
};
/*
* Consume Message
*/
/* TODO: This function should call the callbacks */
static bt_message_iterator_class_next_method_status
xprof_message_iterator_next(bt_self_message_iterator *self_message_iterator,
bt_message_array_const messages, uint64_t capacity,
uint64_t *count) {
/* Retrieve our private data from the message iterator's user data */
struct xprof_message_iterator *xprof_iter =
bt_self_message_iterator_get_data(self_message_iterator);
/* Consume a batch of messages from the upstream message iterator */
bt_message_array_const upstream_messages;
uint64_t upstream_message_count;
bt_message_iterator_next_status next_status;
struct xprof_common_data *common_data = xprof_iter->common_data;
consume_upstream_messages:
if (common_data->queue)
goto pop_queued_messages;
next_status =
bt_message_iterator_next(xprof_iter->message_iterator, &upstream_messages,
&upstream_message_count);
/* Initialize the return status to a success */
bt_message_iterator_class_next_method_status status =
BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
switch (next_status) {
case BT_MESSAGE_ITERATOR_NEXT_STATUS_END:
/* End of iteration: put the message iterator's reference */
bt_message_iterator_put_ref(xprof_iter->message_iterator);
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
goto end;
case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
goto end;
case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
goto end;
case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
goto end;
default:
break;
}
/* For each consumed message */
for (uint64_t upstream_i = 0; upstream_i < upstream_message_count;
upstream_i++) {
/* Current message */
const bt_message *upstream_message = upstream_messages[upstream_i];
struct el *elt;
if (common_data->pool) {
elt = common_data->pool;
DL_DELETE(common_data->pool, elt);
} else {
elt = (struct el *)malloc(sizeof *elt);
}
elt->message = upstream_message;
DL_APPEND(common_data->queue, elt);
}
if (!common_data->queue) {
/*
* We discarded all the upstream messages: get a new batch of
* messages, because this method _cannot_ return
* `BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK` and put no
* messages into its output message array.
*/
goto consume_upstream_messages;
}
pop_queued_messages:
for (*count = 0; *count < upstream_message_count && common_data->queue;
(*count)++) {
struct el *elt = common_data->queue;
messages[*count] = elt->message;
DL_DELETE(common_data->queue, elt);
DL_APPEND(common_data->pool, elt);
}
end:
return status;
}
/*
* Initializes the sink component.
*/
static bt_component_class_initialize_method_status
xprof_initialize(bt_self_component_filter *self_component_filter,
bt_self_component_filter_configuration *configuration,
const bt_value *params, void *initialize_method_data) {
/* Allocate a private data structure */
struct xprof_common_data *common_data =
calloc(1, sizeof(struct xprof_common_data));
/* Set the component's user data to our private data structure */
bt_self_component_set_data(
bt_self_component_filter_as_self_component(self_component_filter),
common_data);
/*
* Add an input port named `in` to the filter component.
*
* This is needed so that this filter component can be connected to
* a filter or a source component. With a connected upstream
* component, this filter component's message iterator can create a
* message iterator to consume messages.
*
* Add an output port named `out` to the filter component.
*
* This is needed so that this filter component can be connected to
* a filter or a sink component. Once a downstream component is
* connected, it can create our message iterator.
*/
bt_self_component_filter_add_input_port(self_component_filter, "in", NULL,
&common_data->in_port);
bt_self_component_filter_add_output_port(self_component_filter, "out", NULL,
NULL);
return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
}
/*
* Initializes the message iterator.
*/
static bt_message_iterator_class_initialize_method_status
xprof_message_iterator_initialize(
bt_self_message_iterator *self_message_iterator,
bt_self_message_iterator_configuration *configuration,
bt_self_component_port_output *self_port) {
/* Allocate a private data structure */
struct xprof_message_iterator *xprof_iter = malloc(sizeof(*xprof_iter));
/* Retrieve the component's private data from its user data */
struct xprof_common_data *xprof_common_data = bt_self_component_get_data(
bt_self_message_iterator_borrow_component(self_message_iterator));
/* Keep a link to the component's private data */
xprof_iter->common_data = xprof_common_data;
/* Create the uptream message iterator */
bt_message_iterator_create_from_message_iterator(
self_message_iterator, xprof_common_data->in_port,
&xprof_iter->message_iterator);
/* Set the message iterator's user data to our private data structure */
bt_self_message_iterator_set_data(self_message_iterator, xprof_iter);
return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
}
static void xprof_finalize(bt_self_component_filter *self_component_filter) {
struct xprof_common_data *common_data = bt_self_component_get_data(
bt_self_component_filter_as_self_component(self_component_filter));
struct el *elt, *tmp;
DL_FOREACH_SAFE(common_data->pool, elt, tmp) {
DL_DELETE(common_data->pool, elt);
free(elt);
}
free(common_data);
}
static void xprof_message_iterator_finalize(
bt_self_message_iterator *self_message_iterator) {
/* Retrieve our private data from the message iterator's user data */
struct xprof_message_iterator *xprof_iter =
bt_self_message_iterator_get_data(self_message_iterator);
/* Free the allocated structure */
free(xprof_iter);
}
/* Mandatory */
BT_PLUGIN_MODULE();
BT_PLUGIN(roger);
BT_PLUGIN_FILTER_COMPONENT_CLASS(xprof, xprof_message_iterator_next);
BT_PLUGIN_FILTER_COMPONENT_CLASS_INITIALIZE_METHOD(xprof, xprof_initialize);
BT_PLUGIN_FILTER_COMPONENT_CLASS_FINALIZE_METHOD(xprof, xprof_finalize);
BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD(
xprof, xprof_message_iterator_initialize);
BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_FINALIZE_METHOD(
xprof, xprof_message_iterator_finalize);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment