Skip to content

Instantly share code, notes, and snippets.

@Climax777
Last active June 18, 2024 14:39
Show Gist options
  • Save Climax777/ecea9f6021647b92ead1eadc438125fe to your computer and use it in GitHub Desktop.
Save Climax777/ecea9f6021647b92ead1eadc438125fe to your computer and use it in GitHub Desktop.
ASIO test files
#include <boost/asio.hpp>
#include <atomic>
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
using namespace std;
const int datasize = 256;
// Tweak this to test smaller sizes (make counterpart in python program the same)
const int multiplier = 1024;
uint32_t totalsize = 0;
std::mutex my_test_mutex;
std::atomic_int64_t counter = 0;
std::atomic_int64_t request_order = 0;
void handler(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<std::vector<char>> buffer, int64_t order) {
cout << "Handler called for order: " << order << endl;
cout.flush();
if (error) {
cerr << "Error: " << error.message() << endl;
} else {
cout << "Bytes transferred: " << bytes_transferred << endl;
}
counter++;
cout << "Total completed: " << counter << endl;
cout.flush();
}
void my_test_func(boost::asio::ip::tcp::socket* socket, boost::asio::io_context* io) {
while (true) {
cout << "Starting out on thread " << std::this_thread::get_id() << endl;
cout.flush();
int64_t order = ++request_order;
auto buffer = std::make_shared<std::vector<char>>(datasize * multiplier * 2, 0);
// Initialize the buffer
(*buffer)[0] = static_cast<char>(order % 256); // Use the first byte to store the order
for (int i = 0; i < datasize * multiplier; ++i) {
(*buffer)[i*2] = static_cast<char>(order % 256);
(*buffer)[i*2 + 1] = static_cast<char>(i % 256);
}
std::vector<boost::asio::const_buffer> scatter_buffer;
scatter_buffer.push_back(boost::asio::buffer(*buffer));
boost::asio::async_write(*socket, scatter_buffer, boost::asio::transfer_exactly(totalsize),
[buffer, order](const boost::system::error_code& error, std::size_t bytes_transferred) {
handler(error, bytes_transferred, buffer, order);
});
cout << "Done on thread " << std::this_thread::get_id() << endl;
cout.flush();
// Change this to a smaller value (milliseconds/microseconds) to see breakage even in a single thread
std::this_thread::sleep_for(std::chrono::seconds(100));
// break;
}
}
int main(int argc, char** argv) {
totalsize = datasize * multiplier * 2;
boost::asio::io_context my_test_ioservice;
boost::asio::ip::tcp::socket my_test_socket(my_test_ioservice);
boost::asio::ip::tcp::resolver my_test_tcp_resolver(my_test_ioservice);
// Change IP/Port here
boost::asio::ip::tcp::resolver::query my_test_tcp_query("localhost", "40000");
boost::asio::ip::tcp::resolver::iterator my_test_tcp_iterator = my_test_tcp_resolver.resolve(my_test_tcp_query);
boost::asio::connect(my_test_socket, my_test_tcp_iterator);
my_test_ioservice.post([]() {
while (true) std::this_thread::sleep_for(std::chrono::seconds(1));
});
cout << "Starting senders for " << totalsize << " bytes" << endl;
cout.flush();
vector<std::jthread> sender_threads;
// Change this to single thread/multiple threads and witness the destruction
for (size_t i = 0; i < 10; ++i) { // Adjust the loop count to tweak the number of sender threads.
sender_threads.emplace_back(my_test_func, &my_test_socket, &my_test_ioservice);
}
cout << "Starting threads" << endl;
cout.flush();
std::vector<std::jthread> worker_threads;
for (size_t i = 0; i < 19; ++i) {
worker_threads.emplace_back([&]() {
while (!my_test_ioservice.stopped()) {
try {
my_test_ioservice.run();
} catch (std::exception& e) {
cerr << "Workers: thread exception: " << e.what() << endl;
cerr.flush();
if (my_test_ioservice.stopped())
break;
}
}
});
}
while (true) {
try {
my_test_ioservice.run();
} catch (std::exception& e) {
cerr << "main: thread exception: " << e.what() << endl;
cerr.flush();
if (my_test_ioservice.stopped())
break;
}
}
cout << "Done" << endl;
cout.flush();
return 0;
}
import socket
# Server configuration
HOST = '0.0.0.0' # The IP address the server will bind to
PORT = 40000 # The port the server will bind to
BUFFER_SIZE = 256 * 1024 * 2 # Buffer size to match the C++ client's data size
def receive_full_data(client_socket, buffer_size):
data = bytearray("", "ascii")
while len(data) < buffer_size:
packet = client_socket.recv(buffer_size - len(data))
if not packet:
break
data.extend(packet)
return data
def start_server():
# Create a TCP/IP socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
# Bind the socket to the address and port
server_socket.bind((HOST, PORT))
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Listen for incoming connections
server_socket.listen()
print(f'Server listening on {HOST}:{PORT}')
previous_order = -1
while True:
# Wait for a connection
client_socket, client_address = server_socket.accept()
with client_socket:
print(f'Connected by {client_address}')
while True:
# Receive the full buffer size data
data = receive_full_data(client_socket, BUFFER_SIZE)
if not data or len(data) == 0 or len(data) != BUFFER_SIZE:
print(f"incorrect size {len(data)} vs {BUFFER_SIZE}")
break
order_byte = data[0] # The first byte indicates the order
print(f"starting {order_byte} {(previous_order + 1) % 256} size {len(data)}")
if previous_order != -1 and order_byte != (previous_order + 1) % 256:
print(f"Error: Order byte {order_byte} did not increment correctly from {previous_order}")
else:
print(f"Order byte {order_byte} incremented correctly from {previous_order}")
previous_order = order_byte
# Check that the rest of the data is incrementing correctly
correct_data = True
for i in range(0, int(len(data)/2), 1):
expected_value = i % 256
#print(f"{i} {i*2 + 1}")
#print(f"{data[i]} {expected_value}")
if data[(i*2)+1] != expected_value:
correct_data = False
print(f"Error: Data byte {i} with value {data[i*2+1]} is not incrementing correctly. should be {expected_value}")
#break
if data[(i*2)] != previous_order:
correct_data = False
print(f"Error: Order byte {i} with value {data[i*2]} is not staying correct. Should be {previous_order}")
#break
if correct_data:
print(f"Data received correctly with order byte {order_byte}")
if __name__ == '__main__':
start_server()
@Climax777
Copy link
Author

g++ -std=c++20 -pthread -lboost_system -o testasio testasio.cpp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment