Skip to content

Instantly share code, notes, and snippets.

@303248153
Last active December 27, 2017 03:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 303248153/b29d49c72236a652f7deda4e170f00f1 to your computer and use it in GitHub Desktop.
Save 303248153/b29d49c72236a652f7deda4e170f00f1 to your computer and use it in GitHub Desktop.
Even shitter than etw, worse than the worst api
// sudo apt-get install liblttng-ctl-dev
// g++ -Wall -Wextra --std=c++11 CaptureLttngEventFromCoreClr.cpp -llttng-ctl && ./a.out
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// ! !
// ! Don't copy this code to your project, !
// ! it contains GPL licensed code from lttng-tools (lttng-viewer-abi.h).!
// ! !
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#include <lttng/lttng.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <signal.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <unordered_map>
#include <algorithm>
#include <functional>
namespace {
const char* SessionName = "example-session";
const char* SessionUrl = "net://127.0.0.1";
const std::string EventName = "DotNETRuntime:EventSource";
const unsigned int LiveSessionInterval = 1000000; // 1s
bool LoopFlag = true;
void sigint_handler(int) {
LoopFlag = false;
}
#define LTTNG_VIEWER_PATH_MAX 4096
#define LTTNG_VIEWER_NAME_MAX 255
#define LTTNG_VIEWER_HOST_NAME_MAX 64
/* Flags in reply to get_next_index and get_packet. */
enum {
/* New metadata is required to read this packet. */
LTTNG_VIEWER_FLAG_NEW_METADATA = (1 << 0),
/* New stream got added to the trace. */
LTTNG_VIEWER_FLAG_NEW_STREAM = (1 << 1),
};
enum lttng_viewer_command {
LTTNG_VIEWER_CONNECT = 1,
LTTNG_VIEWER_LIST_SESSIONS = 2,
LTTNG_VIEWER_ATTACH_SESSION = 3,
LTTNG_VIEWER_GET_NEXT_INDEX = 4,
LTTNG_VIEWER_GET_PACKET = 5,
LTTNG_VIEWER_GET_METADATA = 6,
LTTNG_VIEWER_GET_NEW_STREAMS = 7,
LTTNG_VIEWER_CREATE_SESSION = 8,
LTTNG_VIEWER_DETACH_SESSION = 9,
};
enum lttng_viewer_attach_return_code {
LTTNG_VIEWER_ATTACH_OK = 1, /* The attach command succeeded. */
LTTNG_VIEWER_ATTACH_ALREADY = 2, /* A viewer is already attached. */
LTTNG_VIEWER_ATTACH_UNK = 3, /* The session ID is unknown. */
LTTNG_VIEWER_ATTACH_NOT_LIVE = 4, /* The session is not live. */
LTTNG_VIEWER_ATTACH_SEEK_ERR = 5, /* Seek error. */
LTTNG_VIEWER_ATTACH_NO_SESSION = 6, /* No viewer session created. */
};
enum lttng_viewer_next_index_return_code {
LTTNG_VIEWER_INDEX_OK = 1, /* Index is available. */
LTTNG_VIEWER_INDEX_RETRY = 2, /* Index not yet available. */
LTTNG_VIEWER_INDEX_HUP = 3, /* Index closed (trace destroyed). */
LTTNG_VIEWER_INDEX_ERR = 4, /* Unknow error. */
LTTNG_VIEWER_INDEX_INACTIVE = 5, /* Inactive stream beacon. */
LTTNG_VIEWER_INDEX_EOF = 6, /* End of index file. */
};
enum lttng_viewer_get_packet_return_code {
LTTNG_VIEWER_GET_PACKET_OK = 1,
LTTNG_VIEWER_GET_PACKET_RETRY = 2,
LTTNG_VIEWER_GET_PACKET_ERR = 3,
LTTNG_VIEWER_GET_PACKET_EOF = 4,
};
enum lttng_viewer_get_metadata_return_code {
LTTNG_VIEWER_METADATA_OK = 1,
LTTNG_VIEWER_NO_NEW_METADATA = 2,
LTTNG_VIEWER_METADATA_ERR = 3,
};
enum lttng_viewer_connection_type {
LTTNG_VIEWER_CLIENT_COMMAND = 1,
LTTNG_VIEWER_CLIENT_NOTIFICATION = 2,
};
enum lttng_viewer_seek {
/* Receive the trace packets from the beginning. */
LTTNG_VIEWER_SEEK_BEGINNING = 1,
/* Receive the trace packets from now. */
LTTNG_VIEWER_SEEK_LAST = 2,
};
enum lttng_viewer_new_streams_return_code {
LTTNG_VIEWER_NEW_STREAMS_OK = 1, /* If new streams are being sent. */
LTTNG_VIEWER_NEW_STREAMS_NO_NEW = 2, /* If no new streams are available. */
LTTNG_VIEWER_NEW_STREAMS_ERR = 3, /* Error. */
LTTNG_VIEWER_NEW_STREAMS_HUP = 4, /* Session closed. */
};
enum lttng_viewer_create_session_return_code {
LTTNG_VIEWER_CREATE_SESSION_OK = 1,
LTTNG_VIEWER_CREATE_SESSION_ERR = 2,
};
enum lttng_viewer_detach_session_return_code {
LTTNG_VIEWER_DETACH_SESSION_OK = 1,
LTTNG_VIEWER_DETACH_SESSION_UNK = 2,
LTTNG_VIEWER_DETACH_SESSION_ERR = 3,
};
struct lttng_viewer_session {
uint64_t id;
uint32_t live_timer;
uint32_t clients;
uint32_t streams;
char hostname[LTTNG_VIEWER_HOST_NAME_MAX];
char session_name[LTTNG_VIEWER_NAME_MAX];
} __attribute__ ((__packed__));
struct lttng_viewer_stream {
uint64_t id;
uint64_t ctf_trace_id;
uint32_t metadata_flag;
char path_name[LTTNG_VIEWER_PATH_MAX];
char channel_name[LTTNG_VIEWER_NAME_MAX];
} __attribute__ ((__packed__));
struct lttng_viewer_cmd {
uint64_t data_size; /* data size following this header */
uint32_t cmd; /* enum lttcomm_relayd_command */
uint32_t cmd_version; /* command version */
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_CONNECT payload.
*/
struct lttng_viewer_connect {
/* session ID assigned by the relay for command connections */
uint64_t viewer_session_id;
uint32_t major;
uint32_t minor;
uint32_t type; /* enum lttng_viewer_connection_type */
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_LIST_SESSIONS payload.
*/
struct lttng_viewer_list_sessions {
uint32_t sessions_count;
char session_list[]; /* struct lttng_viewer_session */
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_ATTACH_SESSION payload.
*/
struct lttng_viewer_attach_session_request {
uint64_t session_id;
uint64_t offset; /* unused for now */
uint32_t seek; /* enum lttng_viewer_seek */
} __attribute__ ((__packed__));
struct lttng_viewer_attach_session_response {
/* enum lttng_viewer_attach_return_code */
uint32_t status;
uint32_t streams_count;
/* struct lttng_viewer_stream */
char stream_list[];
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_GET_NEXT_INDEX payload.
*/
struct lttng_viewer_get_next_index {
uint64_t stream_id;
} __attribute__ ((__packed__));
struct lttng_viewer_index {
uint64_t offset;
uint64_t packet_size;
uint64_t content_size;
uint64_t timestamp_begin;
uint64_t timestamp_end;
uint64_t events_discarded;
uint64_t stream_id;
uint32_t status; /* enum lttng_viewer_next_index_return_code */
uint32_t flags; /* LTTNG_VIEWER_FLAG_* */
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_GET_PACKET payload.
*/
struct lttng_viewer_get_packet {
uint64_t stream_id;
uint64_t offset;
uint32_t len;
} __attribute__ ((__packed__));
struct lttng_viewer_trace_packet {
uint32_t status; /* enum lttng_viewer_get_packet_return_code */
uint32_t len;
uint32_t flags; /* LTTNG_VIEWER_FLAG_* */
char data[];
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_GET_METADATA payload.
*/
struct lttng_viewer_get_metadata {
uint64_t stream_id;
} __attribute__ ((__packed__));
struct lttng_viewer_metadata_packet {
uint64_t len;
uint32_t status; /* enum lttng_viewer_get_metadata_return_code */
char data[];
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_GET_NEW_STREAMS payload.
*/
struct lttng_viewer_new_streams_request {
uint64_t session_id;
} __attribute__ ((__packed__));
struct lttng_viewer_new_streams_response {
/* enum lttng_viewer_new_streams_return_code */
uint32_t status;
uint32_t streams_count;
/* struct lttng_viewer_stream */
char stream_list[];
} __attribute__ ((__packed__));
struct lttng_viewer_create_session_response {
/* enum lttng_viewer_create_session_return_code */
uint32_t status;
} __attribute__ ((__packed__));
/*
* LTTNG_VIEWER_DETACH_SESSION payload.
*/
struct lttng_viewer_detach_session_request {
uint64_t session_id;
} __attribute__ ((__packed__));
struct lttng_viewer_detach_session_response {
/* enum lttng_viewer_detach_session_return_code */
uint32_t status;
} __attribute__ ((__packed__));
int sendall(int fd, const void* buf, std::size_t size) {
std::size_t pos = 0;
while (pos < size) {
auto ret = send(fd,
reinterpret_cast<const char*>(buf) + pos, size - pos, 0);
if (ret <= 0) {
return -1;
}
pos += static_cast<std::size_t>(ret);
}
return 0;
}
int recvall(int fd, void* buf, std::size_t size) {
std::size_t pos = 0;
while (pos < size) {
auto ret = recv(fd,
reinterpret_cast<char*>(buf) + pos, size - pos, 0);
if (ret <= 0) {
return -1;
}
pos += static_cast<std::size_t>(ret);
}
return 0;
}
template <class T>
int sendcmd(int fd, std::uint32_t type, const T& body) {
lttng_viewer_cmd cmd = {};
cmd.data_size = htobe64(sizeof(T));
cmd.cmd = htobe32(type);
if (sendall(fd, &cmd, sizeof(cmd)) < 0) {
return -1;
}
if (sendall(fd, &body, sizeof(body)) < 0) {
return -1;
}
return 0;
}
int sendcmd(int fd, std::uint32_t type) {
lttng_viewer_cmd cmd = {};
cmd.data_size = htobe64(0);
cmd.cmd = htobe32(type);
if (sendall(fd, &cmd, sizeof(cmd)) < 0) {
return -1;
}
return 0;
}
int relayd_connect(int fd, std::uint64_t& viewer_session_id) {
lttng_viewer_connect body = {};
body.major = htobe32(2);
body.minor = htobe32(9);
body.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
if (sendcmd(fd, LTTNG_VIEWER_CONNECT, body) < 0) {
return -1;
}
if (recvall(fd, &body, sizeof(body)) < 0) {
return -1;
}
viewer_session_id = be64toh(body.viewer_session_id);
return 0;
}
int relayd_create_viewer_session(int fd) {
if (sendcmd(fd, LTTNG_VIEWER_CREATE_SESSION) < 0) {
return -1;
}
lttng_viewer_create_session_response response = {};
if (recvall(fd, &response, sizeof(response)) < 0) {
return -1;
}
if (be32toh(response.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
return -2;
}
return 0;
}
int relayd_list_sessions(int fd, std::vector<lttng_viewer_session>& sessions) {
if (sendcmd(fd, LTTNG_VIEWER_LIST_SESSIONS) < 0) {
return -1;
}
lttng_viewer_list_sessions response = {};
if (recvall(fd, &response, sizeof(response)) < 0) {
return -1;
}
auto session_count = be32toh(response.sessions_count);
sessions.resize(session_count);
if (recvall(fd, sessions.data(), sessions.size() * sizeof(sessions[0])) < 0) {
return -1;
}
for (auto& session : sessions) {
session.id = be64toh(session.id);
session.live_timer = be32toh(session.live_timer);
session.clients = be32toh(session.clients);
session.streams = be32toh(session.streams);
}
return 0;
}
int relayd_attach_session(
int fd,
std::uint64_t session_id,
std::unordered_map<std::uint64_t, lttng_viewer_stream>& streams) {
lttng_viewer_attach_session_request request = {};
request.session_id = htobe64(session_id);
request.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
if (sendcmd(fd, LTTNG_VIEWER_ATTACH_SESSION, request) < 0) {
return -1;
}
lttng_viewer_attach_session_response response = {};
if (recvall(fd, &response, sizeof(response)) < 0) {
return -1;
}
response.status = be32toh(response.status);
if (response.status != LTTNG_VIEWER_ATTACH_OK) {
std::cerr << "attach status: " << response.status << std::endl;
return -2;
}
lttng_viewer_stream stream = {};
auto stream_count = be32toh(response.streams_count);
for (std::size_t x = 0; x < stream_count; ++x) {
if (recvall(fd, &stream, sizeof(stream)) < 0) {
return -1;
}
stream.id = be64toh(stream.id);
stream.ctf_trace_id = be64toh(stream.ctf_trace_id);
stream.metadata_flag = be32toh(stream.metadata_flag);
streams[stream.id] = stream;
}
return 0;
}
int relayd_get_new_streams(
int fd,
std::uint64_t session_id,
std::unordered_map<std::uint64_t, lttng_viewer_stream>& streams,
const std::function<void(lttng_viewer_stream&)>& callback) {
lttng_viewer_new_streams_request request = {};
request.session_id = htobe64(session_id);
if (sendcmd(fd, LTTNG_VIEWER_GET_NEW_STREAMS, request) < 0) {
return -1;
}
lttng_viewer_new_streams_response response = {};
if (recvall(fd, &response, sizeof(response)) < 0) {
return -1;
}
auto status = be32toh(response.status);
if (status == LTTNG_VIEWER_NEW_STREAMS_NO_NEW) {
return 0;
}
if (status != LTTNG_VIEWER_NEW_STREAMS_OK) {
return -1;
}
lttng_viewer_stream stream = {};
auto stream_count = be32toh(response.streams_count);
for (std::size_t x = 0; x < stream_count; ++x) {
if (recvall(fd, &stream, sizeof(stream)) < 0) {
return -1;
}
stream.id = be64toh(stream.id);
stream.ctf_trace_id = be64toh(stream.ctf_trace_id);
stream.metadata_flag = be32toh(stream.metadata_flag);
streams[stream.id] = stream;
callback(stream);
}
return 0;
}
int relayd_get_next_index(int fd, std::uint64_t stream_id, lttng_viewer_index& index) {
lttng_viewer_get_next_index request = {};
request.stream_id = htobe64(stream_id);
if (sendcmd(fd, LTTNG_VIEWER_GET_NEXT_INDEX, request) < 0) {
return -1;
}
if (recvall(fd, &index, sizeof(index)) < 0) {
return -1;
}
index.offset = be64toh(index.offset);
index.packet_size = be64toh(index.packet_size);
index.content_size = be64toh(index.content_size);
index.timestamp_begin = be64toh(index.timestamp_begin);
index.timestamp_end = be64toh(index.timestamp_end);
index.events_discarded = be64toh(index.events_discarded);
index.stream_id = be64toh(index.stream_id);
index.status = be32toh(index.status);
index.flags = be32toh(index.flags);
return 0;
}
int relayd_get_metadata(
int fd,
std::uint64_t stream_id,
lttng_viewer_metadata_packet& packet,
std::string& packet_data) {
lttng_viewer_get_metadata request = {};
request.stream_id = htobe64(stream_id);
if (sendcmd(fd, LTTNG_VIEWER_GET_METADATA, request) < 0) {
return -1;
}
if (recvall(fd, &packet, sizeof(packet)) < 0) {
return -1;
}
packet.status = be32toh(packet.status);
packet.len = be64toh(packet.len);
if (packet.status == LTTNG_VIEWER_METADATA_OK) {
if (packet.len == 0) {
return -2;
}
packet_data.resize(packet.len);
if (recvall(fd, &packet_data.front(), packet_data.size()) < 0) {
return -1;
}
}
return 0;
}
int relayd_get_packet(
int fd,
std::uint64_t stream_id,
const lttng_viewer_index& index,
lttng_viewer_trace_packet& packet,
std::string& packet_data) {
lttng_viewer_get_packet request = {};
request.stream_id = htobe64(stream_id);
request.offset = htobe64(index.offset);
// no document mention it's bits but live_test.c
request.len = htobe32(index.content_size / CHAR_BIT);
if (sendcmd(fd, LTTNG_VIEWER_GET_PACKET, request) < 0) {
return -1;
}
if (recvall(fd, &packet, sizeof(packet)) < 0) {
return -1;
}
packet.status = be32toh(packet.status);
packet.len = be32toh(packet.len);
packet.flags = be32toh(packet.flags);
if (packet.status == LTTNG_VIEWER_GET_PACKET_OK) {
if (packet.len == 0) {
return -2;
}
packet_data.resize(packet.len);
if (recvall(fd, &packet_data.front(), packet_data.size()) < 0) {
return -1;
}
}
return 0;
}
void dump(const void* ptr, std::size_t size) {
auto* buf = reinterpret_cast<const unsigned char*>(ptr);
for (std::size_t i=0; i<size; i += 16) {
printf("%06x: ", static_cast<unsigned int>(i));
for (std::size_t j=0; j<16; j++) {
if (i+j < size) {
printf("%02x ", buf[i+j]);
} else {
printf(" ");
}
}
printf(" ");
for (std::size_t j=0; j<16; j++) {
if (i+j < size) {
printf("%c", isprint(buf[i+j]) ? buf[i+j] : '.');
}
}
printf("\n");
}
}
void dump_only_text(const void* ptr, std::size_t size) {
auto* buf = reinterpret_cast<const unsigned char*>(ptr);
for (std::size_t i=0; i<size; i += 72) {
printf("%06x: ", static_cast<unsigned int>(i));
for (std::size_t j=0; j<72; j++) {
if (i+j < size) {
printf("%c", isprint(buf[i+j]) ? buf[i+j] : '.');
}
}
printf("\n");
}
}
}
int main() {
// start processes, won't replace exists
system("lttng-sessiond --daemonize");
system("lttng-relayd --daemonize");
std::this_thread::sleep_for(std::chrono::seconds(1));
// create new session
for (std::size_t x = 0; x < 100; ++x) {
lttng_destroy_session(SessionName);
std::this_thread::sleep_for(std::chrono::milliseconds(15));
}
int ret = lttng_create_session_live(SessionName, SessionUrl, LiveSessionInterval);
if (ret != 0) {
std::cerr << "lttng_create_session: " << lttng_strerror(ret) << std::endl;
return -1;
}
// create handle from session
lttng_domain domain = {};
domain.type = LTTNG_DOMAIN_UST;
lttng_handle* handle = lttng_create_handle(SessionName, &domain);
if (handle == nullptr) {
std::cerr << "lttng_create_handle: " << lttng_strerror(ret) << std::endl;
return -1;
}
// enable event
lttng_event event = {};
event.type = LTTNG_EVENT_TRACEPOINT;
memcpy(event.name, EventName.c_str(), EventName.size());
event.loglevel_type = LTTNG_EVENT_LOGLEVEL_ALL;
event.loglevel = -1;
ret = lttng_enable_event_with_exclusions(handle, &event, nullptr, nullptr, 0, nullptr);
if (ret < 0) {
std::cerr << "lttng_enable_event_with_exclusions: " << lttng_strerror(ret) << std::endl;
return -1;
}
// add context
lttng_event_context contextPid = {};
contextPid.ctx = LTTNG_EVENT_CONTEXT_VPID;
ret = lttng_add_context(handle, &contextPid, nullptr, nullptr);
if (ret < 0) {
std::cerr << "lttng_add_context: " << lttng_strerror(ret) << std::endl;
return -1;
}
// start tracing
ret = lttng_start_tracing(SessionName);
if (ret < 0) {
std::cerr << "lttng_start_tracing: " << lttng_strerror(ret) << std::endl;
return -1;
}
// connect to relayd
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) {
perror("socket");
return -1;
}
sockaddr_in address = {};
address.sin_addr.s_addr = inet_addr("127.0.0.1");
address.sin_family = AF_INET;
address.sin_port = htons(5344);
ret = connect(fd, (sockaddr*)&address, sizeof(address));
if (ret < 0) {
perror("connect");
return -1;
}
// init connection
std::uint64_t session_id = 0;
if ((ret = relayd_connect(fd, session_id)) < 0) {
std::cerr << "relayd_connect failed: " << ret << std::endl;
return -1;
}
std::cout << "relayd_connect success, session_id: " << session_id << std::endl;
// create viewer session
if ((ret = relayd_create_viewer_session(fd)) < 0) {
std::cerr << "relayd_create_viewer_session failed: " << ret << std::endl;
return -1;
}
std::cout << "relayd_create_viewer_session success" << std::endl;
// list sessions and find out which to attach
std::vector<lttng_viewer_session> sessions;
if ((ret = relayd_list_sessions(fd, sessions)) < 0) {
std::cerr << "relayd_list_sessions failed: " << ret << std::endl;
return -1;
}
std::cout << "relayd_list_sessions success, session count: " << sessions.size() << std::endl;
std::uint64_t attach_to_session_id = 0;
for (const auto& session : sessions) {
std::cout << "session: id = " << session.id <<
", live_timer = " << session.live_timer <<
", hostname = " << session.hostname <<
", session_name = " << session.session_name << std::endl;
// only live session can attach
if (strcmp(SessionName, session.session_name) == 0 && session.live_timer > 0) {
attach_to_session_id = session.id;
}
}
if (attach_to_session_id == 0) {
std::cerr << "no session to attach" << std::endl;
return -1;
}
// attach to session
std::unordered_map<std::uint64_t, lttng_viewer_stream> streams;
if ((ret = relayd_attach_session(fd, attach_to_session_id, streams)) < 0) {
std::cerr << "relayd_attach_session failed: " << ret << std::endl;
return -1;
}
std::cout << "relayd_attach_session success" << std::endl;
// event receive loop
signal(SIGINT, sigint_handler);
lttng_viewer_index index = {};
lttng_viewer_metadata_packet metadata = {};
lttng_viewer_trace_packet packet = {};
std::string packet_data;
std::vector<std::uint64_t> remove_stream_ids;
std::function<void(lttng_viewer_stream&)> callback = [](lttng_viewer_stream& stream) {
std::cout << "new stream: id = " << stream.id <<
", ctf_trace_id = " << stream.ctf_trace_id <<
", metadata_flag = " << stream.metadata_flag <<
", path_name = " << stream.path_name <<
", channel_name = " << stream.channel_name << std::endl;
};
std::cout << "enter event receive loop" << std::endl;
bool require_new_metadata = false;
bool require_new_streams = false;
while (LoopFlag) {
// get new streams
if (require_new_streams || streams.empty()) {
if ((ret = relayd_get_new_streams(fd,
attach_to_session_id, streams, callback)) < 0) {
std::cerr << "relayd_get_new_streams failed: " << ret << std::endl;
return -1;
}
require_new_streams = false;
}
// get metadata
if (require_new_metadata) {
for (const auto& pair : streams) {
const auto& stream = pair.second;
if (stream.metadata_flag == 0) {
continue;
}
while (LoopFlag) {
if ((ret = relayd_get_metadata(fd, stream.id, metadata, packet_data)) < 0) {
std::cerr << "relayd_get_metadata failed: " << ret << std::endl;
return -1;
}
if (metadata.status == LTTNG_VIEWER_NO_NEW_METADATA) {
break;
}
if (metadata.status != LTTNG_VIEWER_METADATA_OK) {
remove_stream_ids.emplace_back(stream.id);
break;
}
// shrink packet data (only for dump)
auto index_not_zero = packet_data.find_last_not_of('\x00');
if (index_not_zero != std::string::npos) {
packet_data.resize(index_not_zero + 1);
}
// print metadata
std::cout << "relayd_get_metadata success: len = " << packet_data.size() << std::endl;
dump_only_text(packet_data.c_str(), packet_data.size());
}
}
require_new_metadata = false;
}
// get events
for (const auto& pair : streams) {
const auto& stream = pair.second;
if (stream.metadata_flag != 0) {
continue;
}
while (LoopFlag) {
// get packet index
if ((ret = relayd_get_next_index(fd, stream.id, index)) < 0) {
std::cerr << "relayd_get_next_index failed: " << ret << std::endl;
return -1;
}
if (index.status == LTTNG_VIEWER_INDEX_RETRY) {
break;
}
if (index.status != LTTNG_VIEWER_INDEX_OK) {
remove_stream_ids.emplace_back(stream.id);
break;
}
// check flags
require_new_metadata = require_new_metadata || (index.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
require_new_streams = require_new_streams || (index.flags & LTTNG_VIEWER_FLAG_NEW_STREAM);
if (require_new_metadata || require_new_streams) {
break;
}
// get packet data
if ((ret = relayd_get_packet(fd, stream.id, index, packet, packet_data)) < 0) {
std::cerr << "relayd_get_packet failed: " << ret << std::endl;
return -1;
}
if (packet.status == LTTNG_VIEWER_GET_PACKET_OK) {
std::cout << "relayd_get_packet success: len = " << packet_data.size() << std::endl;
} else {
std::cerr << "relayd_get_packet failed: status = " << packet.status << std::endl;
return -1;
}
// the format is ctf (common trace format)
// I was trying to use babeltrace as a library to parse it
// but babeltrace have none docuemnt or examples, sucks as lttng
dump(packet_data.c_str(), packet_data.size());
}
}
// remove destroyed or inactive stream
for (std::uint64_t stream_id : remove_stream_ids) {
streams.erase(stream_id);
}
remove_stream_ids.clear();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "shutdown" << std::endl;
shutdown(fd, SHUT_RDWR);
close(fd);
lttng_stop_tracing(SessionName);
lttng_destroy_handle(handle);
lttng_destroy_session(SessionName);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment