This Gist contains source code for my blog article "BlazingMQ: Hands On". See the article for more explanation.
Last active
October 10, 2024 11:00
-
-
Save Cartroo/7bab78114d69e9985022d8084c716723 to your computer and use it in GitHub Desktop.
A very simple producer to try out BlazingMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import blazingmq | |
import signal | |
import threading | |
event = threading.Event() | |
def handler(signum, _): | |
print("---- Terminating") | |
event.set() | |
signal.signal(signal.SIGINT, handler) | |
def on_message_callback(message, message_handle): | |
print("Received message:", message.data) | |
message_handle.confirm() | |
queue_uri = "bmq://bmq.test.persistent.priority/somequeue" | |
with blazingmq.Session( | |
blazingmq.session_events.log_session_event, | |
on_message=on_message_callback, | |
) as session: | |
print("---- Session open") | |
session.open_queue(queue_uri, read=True) | |
print("---- Queue open") | |
event.wait() | |
print("---- Closing queue") | |
session.configure_queue( | |
queue_uri, blazingmq.QueueOptions(0, 0, 0) | |
) | |
session.close_queue(queue_uri) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BMQROOT=../blazingmq | |
INCLUDE_ROOT=$(BMQROOT)/src/groups/bmq | |
CPPFLAGS=-g -std=c++17 -I../blazingmq/include \ | |
-I$(INCLUDE_ROOT)/bmqa -I$(INCLUDE_ROOT)/bmqt \ | |
-I$(INCLUDE_ROOT)/bdlb | |
LDFLAGS=-g | |
LDLIBS=-lz -lpcre2 \ | |
-L$(BMQROOT)/build/blazingmq/src/groups/bmq \ | |
-lbmqa -lbmqp -lbmqt -lbmqimp -lbmqpi -lbmqscm -lbmqeval \ | |
-L$(BMQROOT)/lib \ | |
-lbsl -lbal -lbdl -lntc -lnts -linteldfp \ | |
-L$(BMQROOT)/build/blazingmq/src/groups/mwc \ | |
-lmwc | |
producer: producer.o | |
clang++ $(LDFLAGS) -o producer producer.o $(LDLIBS) | |
producer.o: producer.cpp | |
clang++ $(CPPFLAGS) -c producer.cpp | |
clean: | |
rm producer | |
rm producer.o |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <bmqa_session.h> | |
#include <bmqt_queueflags.h> | |
#include <iostream> | |
using namespace BloombergLP; | |
const char QUEUE_URL[] = | |
"bmq://bmq.test.persistent.priority/somequeue"; | |
void runProducer(bmqa::Session *); | |
void sendMessage(const std::string &, const bmqa::QueueId &, | |
bmqa::Session *); | |
int main(int argc, const char *argv[]) | |
{ | |
std::cerr << "Starting session" << std::endl; | |
bmqa::Session session; | |
int startResult = session.start(); | |
if (startResult != 0) | |
{ | |
std::cerr << "Failed to start session result=" << startResult | |
<< std::endl; | |
return 1; | |
} | |
std::cerr << "Trying to send messages" << std::endl; | |
runProducer(&session); | |
std::cerr << "Stopping session" << std::endl; | |
session.stop(); | |
return 0; | |
} | |
void runProducer(bmqa::Session *session) | |
{ | |
std::cerr << "Opening queue" << std::endl; | |
bmqa::QueueId queueId(1); | |
bmqa::OpenQueueStatus openStatus = session->openQueueSync( | |
&queueId, QUEUE_URL, bmqt::QueueFlags::e_WRITE); | |
if (!openStatus) | |
{ | |
std::cerr << "Failed to open queue status=" << openStatus | |
<< std::endl; | |
return; | |
} | |
// Send some messages | |
sendMessage("Message one", queueId, session); | |
sendMessage("Message two", queueId, session); | |
sendMessage("Message three", queueId, session); | |
std::cerr << "Closing queue" << std::endl; | |
bmqa::CloseQueueStatus closeStatus = | |
session->closeQueueSync(&queueId); | |
if (!closeStatus) | |
{ | |
std::cerr << "Failed to close queue status=" << closeStatus | |
<< std::endl; | |
} | |
} | |
void sendMessage(const std::string &msg, const bmqa::QueueId &queueId, | |
bmqa::Session *session) | |
{ | |
bmqa::MessageEventBuilder builder; | |
session->loadMessageEventBuilder(&builder); | |
bmqa::Message &message = builder.startMessage(); | |
message.setDataRef(msg.c_str(), msg.length()); | |
int buildStatus = builder.packMessage(queueId); | |
if (buildStatus != 0) | |
{ | |
std::cerr << "Failed to pack message msg=" << msg | |
<< " status=" | |
<< bmqt::EventBuilderResult::Enum(buildStatus) | |
<< std::endl; | |
return; | |
} | |
const bmqa::MessageEvent &messageEvent = builder.messageEvent(); | |
int postStatus = session->post(messageEvent); | |
if (!postStatus) | |
{ | |
std::cerr << "Failed to post message msg=" << msg | |
<< " status=" << postStatus << std::endl; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment