Created
October 14, 2022 21:14
-
-
Save TApplencourt/ce51bbb4a459be987d9c73c64285ca75 to your computer and use it in GitHub Desktop.
bt2_filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "utlist.h" | |
#include <babeltrace2/babeltrace.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
struct el { | |
const bt_message *message; | |
struct el *next, *prev; | |
}; | |
// Losely based on | |
// https://babeltrace.org/docs/v2.0/libbabeltrace2/example-simple-sink-cmp-cls.html | |
struct xprof_common_data { | |
/* Component's input port (weak) */ | |
bt_self_component_port_input *in_port; | |
struct el *queue; | |
struct el *pool; | |
}; | |
/* Message iterator's private data */ | |
struct xprof_message_iterator { | |
/* (Weak) link to the component's private data */ | |
struct xprof_common_data *common_data; | |
/* Upstream message iterator (owned by this) */ | |
bt_message_iterator *message_iterator; | |
}; | |
/* | |
* Consume Message | |
*/ | |
/* TODO: This function should call the callbacks */ | |
static bt_message_iterator_class_next_method_status | |
xprof_message_iterator_next(bt_self_message_iterator *self_message_iterator, | |
bt_message_array_const messages, uint64_t capacity, | |
uint64_t *count) { | |
/* Retrieve our private data from the message iterator's user data */ | |
struct xprof_message_iterator *xprof_iter = | |
bt_self_message_iterator_get_data(self_message_iterator); | |
/* Consume a batch of messages from the upstream message iterator */ | |
bt_message_array_const upstream_messages; | |
uint64_t upstream_message_count; | |
bt_message_iterator_next_status next_status; | |
struct xprof_common_data *common_data = xprof_iter->common_data; | |
consume_upstream_messages: | |
if (common_data->queue) | |
goto pop_queued_messages; | |
next_status = | |
bt_message_iterator_next(xprof_iter->message_iterator, &upstream_messages, | |
&upstream_message_count); | |
/* Initialize the return status to a success */ | |
bt_message_iterator_class_next_method_status status = | |
BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; | |
switch (next_status) { | |
case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: | |
/* End of iteration: put the message iterator's reference */ | |
bt_message_iterator_put_ref(xprof_iter->message_iterator); | |
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; | |
goto end; | |
case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN: | |
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; | |
goto end; | |
case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR: | |
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; | |
goto end; | |
case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR: | |
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; | |
goto end; | |
default: | |
break; | |
} | |
/* For each consumed message */ | |
for (uint64_t upstream_i = 0; upstream_i < upstream_message_count; | |
upstream_i++) { | |
/* Current message */ | |
const bt_message *upstream_message = upstream_messages[upstream_i]; | |
struct el *elt; | |
if (common_data->pool) { | |
elt = common_data->pool; | |
DL_DELETE(common_data->pool, elt); | |
} else { | |
elt = (struct el *)malloc(sizeof *elt); | |
} | |
elt->message = upstream_message; | |
DL_APPEND(common_data->queue, elt); | |
} | |
if (!common_data->queue) { | |
/* | |
* We discarded all the upstream messages: get a new batch of | |
* messages, because this method _cannot_ return | |
* `BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK` and put no | |
* messages into its output message array. | |
*/ | |
goto consume_upstream_messages; | |
} | |
pop_queued_messages: | |
for (*count = 0; *count < upstream_message_count && common_data->queue; | |
(*count)++) { | |
struct el *elt = common_data->queue; | |
messages[*count] = elt->message; | |
DL_DELETE(common_data->queue, elt); | |
DL_APPEND(common_data->pool, elt); | |
} | |
end: | |
return status; | |
} | |
/* | |
* Initializes the sink component. | |
*/ | |
static bt_component_class_initialize_method_status | |
xprof_initialize(bt_self_component_filter *self_component_filter, | |
bt_self_component_filter_configuration *configuration, | |
const bt_value *params, void *initialize_method_data) { | |
/* Allocate a private data structure */ | |
struct xprof_common_data *common_data = | |
calloc(1, sizeof(struct xprof_common_data)); | |
/* Set the component's user data to our private data structure */ | |
bt_self_component_set_data( | |
bt_self_component_filter_as_self_component(self_component_filter), | |
common_data); | |
/* | |
* Add an input port named `in` to the filter component. | |
* | |
* This is needed so that this filter component can be connected to | |
* a filter or a source component. With a connected upstream | |
* component, this filter component's message iterator can create a | |
* message iterator to consume messages. | |
* | |
* Add an output port named `out` to the filter component. | |
* | |
* This is needed so that this filter component can be connected to | |
* a filter or a sink component. Once a downstream component is | |
* connected, it can create our message iterator. | |
*/ | |
bt_self_component_filter_add_input_port(self_component_filter, "in", NULL, | |
&common_data->in_port); | |
bt_self_component_filter_add_output_port(self_component_filter, "out", NULL, | |
NULL); | |
return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK; | |
} | |
/* | |
* Initializes the message iterator. | |
*/ | |
static bt_message_iterator_class_initialize_method_status | |
xprof_message_iterator_initialize( | |
bt_self_message_iterator *self_message_iterator, | |
bt_self_message_iterator_configuration *configuration, | |
bt_self_component_port_output *self_port) { | |
/* Allocate a private data structure */ | |
struct xprof_message_iterator *xprof_iter = malloc(sizeof(*xprof_iter)); | |
/* Retrieve the component's private data from its user data */ | |
struct xprof_common_data *xprof_common_data = bt_self_component_get_data( | |
bt_self_message_iterator_borrow_component(self_message_iterator)); | |
/* Keep a link to the component's private data */ | |
xprof_iter->common_data = xprof_common_data; | |
/* Create the uptream message iterator */ | |
bt_message_iterator_create_from_message_iterator( | |
self_message_iterator, xprof_common_data->in_port, | |
&xprof_iter->message_iterator); | |
/* Set the message iterator's user data to our private data structure */ | |
bt_self_message_iterator_set_data(self_message_iterator, xprof_iter); | |
return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; | |
} | |
static void xprof_finalize(bt_self_component_filter *self_component_filter) { | |
struct xprof_common_data *common_data = bt_self_component_get_data( | |
bt_self_component_filter_as_self_component(self_component_filter)); | |
struct el *elt, *tmp; | |
DL_FOREACH_SAFE(common_data->pool, elt, tmp) { | |
DL_DELETE(common_data->pool, elt); | |
free(elt); | |
} | |
free(common_data); | |
} | |
static void xprof_message_iterator_finalize( | |
bt_self_message_iterator *self_message_iterator) { | |
/* Retrieve our private data from the message iterator's user data */ | |
struct xprof_message_iterator *xprof_iter = | |
bt_self_message_iterator_get_data(self_message_iterator); | |
/* Free the allocated structure */ | |
free(xprof_iter); | |
} | |
/* Mandatory */ | |
BT_PLUGIN_MODULE(); | |
BT_PLUGIN(roger); | |
BT_PLUGIN_FILTER_COMPONENT_CLASS(xprof, xprof_message_iterator_next); | |
BT_PLUGIN_FILTER_COMPONENT_CLASS_INITIALIZE_METHOD(xprof, xprof_initialize); | |
BT_PLUGIN_FILTER_COMPONENT_CLASS_FINALIZE_METHOD(xprof, xprof_finalize); | |
BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD( | |
xprof, xprof_message_iterator_initialize); | |
BT_PLUGIN_FILTER_COMPONENT_CLASS_MESSAGE_ITERATOR_CLASS_FINALIZE_METHOD( | |
xprof, xprof_message_iterator_finalize); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment