Skip to content

Instantly share code, notes, and snippets.

@asymingt
Created February 6, 2023 04:48
Show Gist options
  • Save asymingt/7dd639df643f6684e6c73e79f9925c67 to your computer and use it in GitHub Desktop.
Save asymingt/7dd639df643f6684e6c73e79f9925c67 to your computer and use it in GitHub Desktop.
Tracy muxer WIP: Boost version
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <common/TracyProtocol.hpp>
#include <common/TracyVersion.hpp>
#include <cstring>
#include <iostream>
#include <memory>
#include <ostream>
#include <string>
using boost::asio::ip::tcp;
using boost::asio::ip::udp;
namespace {
// This class provides a synthetic data stream as a concatenation of all other
// streams listened to by this app.
class TracyClientDataProviderSession
: public std::enable_shared_from_this<TracyClientDataProviderSession> {
public:
TracyClientDataProviderSession(tcp::socket socket)
: socket_(std::move(socket)) {
std::cout << "Starting TCP data provider session" << std::endl;
}
void start() { do_read(); }
private:
void do_read() {
auto self(shared_from_this());
socket_.async_read_some(
boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
do_write(length);
}
});
}
void do_write(std::size_t length) {
auto self(shared_from_this());
boost::asio::async_write(
socket_, boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
std::cout << "Write succeeded..." << std::endl;
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class TracyClientDataProvider {
public:
TracyClientDataProvider(boost::asio::io_context &io_context, short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
std::cout << "Starting TCP data provider on port " << port << std::endl;
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec) {
std::make_shared<TracyClientDataProviderSession>(std::move(socket))
->start();
}
do_accept();
});
}
private:
tcp::acceptor acceptor_;
};
// This class listens for client broadcasts on 8086, and when a new client
// is found, a TCP instance is started on the port encoded in the message.
class TracyClientBroadcastListener {
public:
TracyClientBroadcastListener(boost::asio::io_service &io_service)
: _io_service(io_service),
_provider(io_service, 9095),
_socket(io_service) {
_socket.open(udp::v4());
_socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
_socket.set_option(boost::asio::socket_base::broadcast(true));
_socket.bind(udp::endpoint(boost::asio::ip::address_v4::any(), 8086));
startReceive();
}
private:
void startReceive() {
_socket.async_receive_from(
boost::asio::buffer(_recvBuffer), _remote,
boost::bind(&TracyClientBroadcastListener::handleReceive, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handleReceive(const boost::system::error_code &error,
std::size_t bytes_transferred) {
if (!error || error == boost::asio::error::message_size) {
std::cout << "Packet received of size " << bytes_transferred << std::endl;
if (bytes_transferred > sizeof(tracy::BroadcastMessage)) {
std::cout << "Packet too large" << std::endl;
}
uint16_t broadcastVersion;
memcpy(&broadcastVersion, _recvBuffer.data(), sizeof(uint16_t));
std::cout << "Packet version: " << broadcastVersion << std::endl;
if (broadcastVersion <= tracy::BroadcastVersion) {
switch (broadcastVersion) {
case 3: {
if (bytes_transferred <= sizeof(tracy::BroadcastMessage)) {
const tracy::BroadcastMessage *bm =
reinterpret_cast<const tracy::BroadcastMessage *>(
_recvBuffer.data());
processClient(bm->listenPort, bm->protocolVersion,
bm->programName, bm->pid, bm->activeTime);
}
break;
}
case 2: {
if (bytes_transferred <= sizeof(tracy::BroadcastMessage_v2)) {
const tracy::BroadcastMessage_v2 *bm =
reinterpret_cast<const tracy::BroadcastMessage_v2 *>(
_recvBuffer.data());
processClient(bm->listenPort, bm->protocolVersion,
bm->programName, 0, bm->activeTime);
}
break;
}
case 1: {
if (bytes_transferred <= sizeof(tracy::BroadcastMessage_v1)) {
const tracy::BroadcastMessage_v1 *bm =
reinterpret_cast<const tracy::BroadcastMessage_v1 *>(
_recvBuffer.data());
processClient(bm->listenPort, bm->protocolVersion,
bm->programName, 0, bm->activeTime);
}
break;
}
case 0: {
if (bytes_transferred <= sizeof(tracy::BroadcastMessage_v0)) {
const tracy::BroadcastMessage_v0 *bm =
reinterpret_cast<const tracy::BroadcastMessage_v0 *>(
_recvBuffer.data());
processClient(8086, bm->protocolVersion, bm->programName, 0,
bm->activeTime);
}
break;
}
default:
std::cout << "Unknown packet" << std::endl;
break;
}
}
startReceive();
}
}
void processClient(uint16_t listenPort, uint32_t protoVersion,
const char *programName, uint64_t pid,
int32_t activeTime) {
//if (!_clients.count(listenPort)) {
std::cout << "Spinning up a new thread to handle client"
<< ": port=" << listenPort << ", protoVersion=" << protoVersion
<< ", programName=" << programName << ", pid=" << pid
<< ", activeTime=" << activeTime << std::endl;
// _clients.emplace(listenPort,
// std::move(TracyClientDataConsumer(_io_service, listenPort)));
//}
}
private:
boost::asio::io_service &_io_service;
TracyClientDataProvider _provider;
udp::socket _socket;
udp::endpoint _remote;
std::array<char, 1024> _recvBuffer;
//std::unordered_map<uint16_t, TracyClientDataConsumer> _clients;
};
} // namespace
int main(int argc, char *argv[]) {
std::cout << "Starting listener..." << std::endl;
try {
boost::asio::io_service io_service;
TracyClientBroadcastListener server{io_service};
io_service.run();
} catch (const std::exception &ex) {
std::cerr << ex.what() << std::endl;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment