Skip to content

Instantly share code, notes, and snippets.

@mavam
Last active May 6, 2016 22:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mavam/463312c5b7ec57f1838c77b50e25c973 to your computer and use it in GitHub Desktop.
Save mavam/463312c5b7ec57f1838c77b50e25c973 to your computer and use it in GitHub Desktop.
Broker API Synopsis
// --- component setup --------------------------------------------------------
// Creates a broker execution context that encapsulates runtime state, such as
// a thread pool and message type information.
system sys{cfg};
// Create a broker in a given system. A broker is a thin abstraction on top of
// a CAF actor for sending and receiving messages. There exist synchronous and
// aysnchronous brokers.
async_broker a{sys};
sync_broker b{sys};
// Publish broker at a TCP endpoint, allowing it to accept remote connections.
b.listen("127.0.0.1", 42000);
// --- peering between brokers ------------------------------------------------
// Peer with another broker. Peering units exchange subscriptions and route
// messages among them.
sync_broker c{sys};
c.peer(b);
b.peer(c); // idempotent
// Peer with a remote broker and remote broker to peers upon success.
b.peer("1.2.3.4", 42000); // block until connected or failed
b.peer<immediate>("1.2.3.4", 42000); // equivalent, but with explicit flag
// Try to connect automatically until successful.
// This version uncondionally adds the remote broker to the list of peers.
b.peer<lazy>("1.2.3.4", 42000);
b.peer<lazy>("1.2.3.4", 42000,
hour(1), // when to abort ultimately
seconds(10) // delay between attempts
);
// Inspect peers.
for (auto p : b.peers())
cout << p.address() << ':' << p.port() << endl; // If failed: <unbound>:0
// Remove a peering.
b.unpeer(c);
b.unpeer("1.2.3.4", 42000);
// --- publishing messages ----------------------------------------------------
// Send data <foo, 42, 3.14> under topic "/foo" to all peers.
// The function drops the message if no peers are alive/connected.
b.publish("/foo", "foo", 42, 3.14);
// --- subscribing to messages ------------------------------------------------
// Subscribes to a topic.
b.subscribe("/foo");
b.subscribe("/bar");
// For a synchronous broker, block and wait until a single message arrives.
// For an asynchronous broker, register the handler.
b.process(
[=](const topic& t, const message& msg) {
msg.apply(
[&](int, int) { .. },
[&](double) { .. },
[&](const string&) { .. }
);
},
[=](const status& s) {
if (s == remote_peer_disconnected) {
auto r = s.context().get<broker>(0); // retrieve status context
cout << "lost connection to " << r.address() << ':' << r.port() << endl;
}
}
);
// For synchronous brokers and as an alternative to the functional message
// processing API, one can just receive the next <topic, data> pair.
// This function blocks.
auto msg = b.receive();
msg.apply(
[&](const topic& t, const message& msg) {
// received regular data
},
[&](const status& s) {
// received a status message
}
);
// For synchronous brokers, retrieve a descriptor suitable for integration into
// poll/select loops. The descriptor is "ready" when there exists at least one
// message that can then be extracted synchronously without waiting.
auto fd = b.descriptor();
// --- data store API ---------------------------------------------------------
// TODO
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment