Skip to content

Instantly share code, notes, and snippets.

@BastienDurel
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BastienDurel/790577ae3dca57dff4c7 to your computer and use it in GitHub Desktop.
Save BastienDurel/790577ae3dca57dff4c7 to your computer and use it in GitHub Desktop.
inproc test
// -*- mode: c++ -*-
#if !defined TEST_INPROC_COMMON_INCLUDED
#define TEST_INPROC_COMMON_INCLUDED 1
#include <stdexcept>
#include <cstring>
#include <zmq.h>
#if ZMQ_VERSION_MAJOR < 4
#define zmq_ctx_term zmq_ctx_destroy
#endif
#if __cplusplus < 201103L
#define constexpr const
#endif
#define WM_VALUE 100000000
extern void * ctx;
inline void ERR(std::string&& err) throw(std::exception) {
err.append(": ").append(strerror(errno));
throw std::runtime_error(err.c_str());
}
struct msg {
zmq_msg_t _m;
msg() { int rc = zmq_msg_init (&_m); if (rc != 0) ERR("zmq_msg_init"); }
explicit msg(int size) { int rc = zmq_msg_init_size (&_m, size); if (rc != 0) ERR("zmq_msg_init_size"); }
explicit msg(size_t size) { int rc = zmq_msg_init_size (&_m, (int)size); if (rc != 0) ERR("zmq_msg_init_size"); }
msg(const msg&) = delete;
~msg() { int rc = zmq_msg_close (&_m); if (rc != 0) ERR("zmq_msg_close"); }
operator zmq_msg_t*() { return &_m; }
};
inline void set_keepalive_options(void* sock) {
#if defined DOKEEPALIVE && DOKEEPALIVE > 0
int rc;
const int keepalive = 1; // use TCP SO_KEEPALIVE
const int ka_idle = 1; // Start keeplives after this period
const int ka_cnt = 1; // Interval between keepalives
const int ka_int = 1; // Number of keepalives before death
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE, &keepalive, sizeof(keepalive));
if (rc != 0)
ERR("zmq_setsockopt");
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_IDLE, &ka_idle, sizeof(ka_idle));
if (rc != 0)
ERR("zmq_setsockopt");
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_CNT, &ka_cnt, sizeof(ka_cnt));
if (rc != 0)
ERR("zmq_setsockopt");
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_INTVL, &ka_int, sizeof(ka_int));
if (rc != 0)
ERR("zmq_setsockopt");
#endif
}
inline void* connect_socket(const char* endpoint) {
auto sock = zmq_socket(ctx, ZMQ_PUSH);
if (!sock)
ERR("zmq_socket");
const int wm = WM_VALUE;
int rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &wm, sizeof(wm));
if (rc != 0)
ERR("zmq_setsockopt");
set_keepalive_options(sock);
rc = zmq_connect(sock, endpoint);
if (rc != 0)
ERR("zmq_connect");
return sock;
}
inline void* connect_socket_pull(const char* endpoint) {
auto sock = zmq_socket(ctx, ZMQ_PULL);
if (!sock)
ERR("zmq_socket");
const int wm = WM_VALUE;
int rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &wm, sizeof(wm));
if (rc != 0)
ERR("zmq_setsockopt");
set_keepalive_options(sock);
rc = zmq_connect(sock, endpoint);
if (rc != 0)
ERR("zmq_connect");
return sock;
}
inline void* connect_socket_push(const char* endpoint) {
return connect_socket(endpoint);
}
inline void* bind_socket(const char* endpoint) {
auto sock = zmq_socket(ctx, ZMQ_PULL);
if (!sock)
ERR("zmq_socket");
const int wm = WM_VALUE;
int rc = zmq_setsockopt(sock, ZMQ_RCVHWM, &wm, sizeof(wm));
if (rc != 0)
ERR("zmq_setsockopt");
set_keepalive_options(sock);
rc = zmq_bind(sock, endpoint);
if (rc != 0)
ERR("zmq_connect");
return sock;
}
inline void* bind_socket_pull(const char* endpoint) {
return bind_socket(endpoint);
}
inline void* bind_socket_push(const char* endpoint) {
auto sock = zmq_socket(ctx, ZMQ_PUSH);
if (!sock)
ERR("zmq_socket");
const int wm = WM_VALUE;
int rc = zmq_setsockopt(sock, ZMQ_RCVHWM, &wm, sizeof(wm));
if (rc != 0)
ERR("zmq_setsockopt");
set_keepalive_options(sock);
rc = zmq_bind(sock, endpoint);
if (rc != 0)
ERR("zmq_connect");
return sock;
}
#endif// ~TEST_INPROC_COMMON_INCLUDED
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{27E6F8CB-2F11-49C0-83E3-4C5AE66157D0}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>inproc</RootNamespace>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="PropertySheets">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="PropertySheets">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>C:\Program Files\ZeroMQ 4.0.4\include;$(IncludePath)</IncludePath>
<LibraryPath>C:\Program Files\ZeroMQ 4.0.4\lib;$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>C:\Program Files\ZeroMQ 4.0.4\include;$(IncludePath)</IncludePath>
<LibraryPath>C:\Program Files\ZeroMQ 4.0.4\lib;$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<PrecompiledHeader>
</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<AdditionalDependencies>libzmq-v120-mt-gd-4_0_4.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PrecompiledHeader>
</PrecompiledHeader>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<AdditionalDependencies>libzmq-v120-mt-4_0_4.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<Text Include="ReadMe.txt" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="stdafx.h" />
<ClInclude Include="targetver.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="main.cpp" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
#include <iostream>
#include <chrono>
#include <thread>
#include <memory>
#include <atomic>
#include <cassert>
#include "common.h"
#if !defined NUM_THREADS
constexpr int NUM_THREADS { 2 };
#endif
volatile bool thr_start { false };
std::atomic_int init_count { 0 };
#if defined WORK
#define MSG_COUNT 5000
#else
#define MSG_COUNT 10000000
#endif
void* ctx { nullptr };
constexpr const char * endpoint { "inproc://foo" };
void push_worker(int num);
void pull_worker(int& count);
int main()
{
ctx = zmq_ctx_new();
assert(ctx);
std::thread pull[NUM_THREADS];
int count[NUM_THREADS] = {0};
for (int i = 0; i < NUM_THREADS; ++i) {
pull[i] = std::thread(pull_worker, std::ref(count[i]));
}
std::thread push(push_worker, MSG_COUNT);
constexpr int to_init = NUM_THREADS + 1;
while (init_count.load() < to_init)
std::this_thread::yield();
auto start = std::chrono::high_resolution_clock::now();
thr_start = true;
std::this_thread::yield();
push.join();
for (int i = 0; i < NUM_THREADS; ++i) {
pull[i].join();
}
int rc = zmq_ctx_term(ctx);
if (rc != 0)
ERR("zmq_ctx_term");
auto end = std::chrono::high_resolution_clock::now();
using namespace std::chrono;
std::cout << "done in "
<< duration_cast<milliseconds>(end - start).count()
<< " ms for " << NUM_THREADS << " threads" << std::endl;
std::cout << "Counts: " << std::endl;
int total = 0;
for (int i = 0; i < NUM_THREADS; ++i) {
total += count[i];
std::cout << "Thread# " << i << ": " << count[i] << std::endl;
}
std::cout << "Total: " << total << std::endl;
#if defined _MSC_VER
int k; std::cin >> k;
#endif
return 0;
}
void _push_worker(int num) {
auto sock = bind_socket_push(endpoint);
const std::string hello{"hello"};
int rc { 0 };
init_count++;
while (!thr_start)
;// spinlock without yield, should start first
for (int i = 0; i < num; ++i) {
msg message{hello.size()};
memmove(zmq_msg_data(message), hello.c_str(), hello.size());
const int size = zmq_msg_size(message);
rc = zmq_msg_send(message, sock, 0);
if (rc != size)
ERR("zmq_msg_send");
}
const std::string end{"end"};
for (int i = 0; i < NUM_THREADS; ++i) {
msg message{end.size()};
memmove(zmq_msg_data(message), end.c_str(), end.size());
const int size = zmq_msg_size(message);
rc = zmq_msg_send(message, sock, 0);
if (rc != size)
ERR("zmq_msg_send");
}
rc = zmq_close(sock);
if (rc != 0)
ERR("zmq_close");
}
void push_worker(int num) {
try {
_push_worker(num);
}
catch (const std::exception& ex) {
std::cerr << "[front_worker]" << ex.what() << std::endl;
}
}
void _pull_worker(int& count) {
auto sock = connect_socket_pull(endpoint);
int rc { 0 };
int lin = 200;
bool run = true;
rc = zmq_setsockopt(sock, ZMQ_LINGER, &lin, sizeof(lin));
if (rc != 0)
ERR("zmq_setsockopt");
init_count++;
while (!thr_start)
std::this_thread::yield();
while (run) {
zmq_msg_t message;
rc = zmq_msg_init(&message);
if (rc != 0)
ERR("zmq_msg_init");
//constexpr int flags = ZMQ_DONTWAIT;
constexpr int flags = 0;
rc = zmq_msg_recv(&message, sock, flags);
if (rc < 0) {
if (errno == ETERM)
break;
if (errno != EAGAIN)
ERR("zmq_msg_recv");
rc = zmq_msg_close (&message);
if (rc != 0)
ERR("zmq_msg_close");
std::this_thread::yield();
continue;
}
char* m = static_cast<char*>(zmq_msg_data(&message));
if (rc == 3 && strncmp(m, "end", 3) == 0) {
run = false;
}
else
++count;
#if defined WORK
std::this_thread::sleep_for(std::chrono::milliseconds{1});
#endif
rc = zmq_msg_close (&message);
if (rc != 0)
ERR("zmq_msg_close");
}
rc = zmq_close(sock);
if (rc != 0)
ERR("zmq_close");
}
void pull_worker(int& count) {
try {
_pull_worker(count);
}
catch (const std::exception& ex) {
std::cerr << "[pull_worker]" << ex.what() << std::endl;
}
}
SRC=main.cpp
OBJ=$(SRC:.cpp=.o)
OPT=-g -O2
ifneq ($(THREADS),)
OPT+=-DNUM_THREADS=$(THREADS)
endif
ifeq ($(WORK),1)
OPT+=-DWORK
endif
CXXFLAGS=$(shell pkg-config --cflags libzmq) --std=c++11 $(OPT)
LDFLAGS=$(shell pkg-config --libs libzmq) $(OPT)
NAME=inproc-test
all: $(NAME)
test: $(NAME)
./$(NAME)
$(NAME): $(OBJ)
$(CXX) -o $@ $(OBJ) $(LDFLAGS)
$(OBJ): common.h
clean:
rm -f $(OBJ) *~
fclean: clean
rm -f $(NAME)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment