Skip to content

Instantly share code, notes, and snippets.

@Cartroo
Last active October 10, 2024 11:00
Show Gist options
  • Save Cartroo/7bab78114d69e9985022d8084c716723 to your computer and use it in GitHub Desktop.
Save Cartroo/7bab78114d69e9985022d8084c716723 to your computer and use it in GitHub Desktop.
A very simple producer to try out BlazingMQ

This Gist contains source code for my blog article "BlazingMQ: Hands On". See the article for more explanation.

import blazingmq
import signal
import threading
event = threading.Event()
def handler(signum, _):
print("---- Terminating")
event.set()
signal.signal(signal.SIGINT, handler)
def on_message_callback(message, message_handle):
print("Received message:", message.data)
message_handle.confirm()
queue_uri = "bmq://bmq.test.persistent.priority/somequeue"
with blazingmq.Session(
blazingmq.session_events.log_session_event,
on_message=on_message_callback,
) as session:
print("---- Session open")
session.open_queue(queue_uri, read=True)
print("---- Queue open")
event.wait()
print("---- Closing queue")
session.configure_queue(
queue_uri, blazingmq.QueueOptions(0, 0, 0)
)
session.close_queue(queue_uri)
BMQROOT=../blazingmq
INCLUDE_ROOT=$(BMQROOT)/src/groups/bmq
CPPFLAGS=-g -std=c++17 -I../blazingmq/include \
-I$(INCLUDE_ROOT)/bmqa -I$(INCLUDE_ROOT)/bmqt \
-I$(INCLUDE_ROOT)/bdlb
LDFLAGS=-g
LDLIBS=-lz -lpcre2 \
-L$(BMQROOT)/build/blazingmq/src/groups/bmq \
-lbmqa -lbmqp -lbmqt -lbmqimp -lbmqpi -lbmqscm -lbmqeval \
-L$(BMQROOT)/lib \
-lbsl -lbal -lbdl -lntc -lnts -linteldfp \
-L$(BMQROOT)/build/blazingmq/src/groups/mwc \
-lmwc
producer: producer.o
clang++ $(LDFLAGS) -o producer producer.o $(LDLIBS)
producer.o: producer.cpp
clang++ $(CPPFLAGS) -c producer.cpp
clean:
rm producer
rm producer.o
#include <bmqa_session.h>
#include <bmqt_queueflags.h>
#include <iostream>
using namespace BloombergLP;
const char QUEUE_URL[] =
"bmq://bmq.test.persistent.priority/somequeue";
void runProducer(bmqa::Session *);
void sendMessage(const std::string &, const bmqa::QueueId &,
bmqa::Session *);
int main(int argc, const char *argv[])
{
std::cerr << "Starting session" << std::endl;
bmqa::Session session;
int startResult = session.start();
if (startResult != 0)
{
std::cerr << "Failed to start session result=" << startResult
<< std::endl;
return 1;
}
std::cerr << "Trying to send messages" << std::endl;
runProducer(&session);
std::cerr << "Stopping session" << std::endl;
session.stop();
return 0;
}
void runProducer(bmqa::Session *session)
{
std::cerr << "Opening queue" << std::endl;
bmqa::QueueId queueId(1);
bmqa::OpenQueueStatus openStatus = session->openQueueSync(
&queueId, QUEUE_URL, bmqt::QueueFlags::e_WRITE);
if (!openStatus)
{
std::cerr << "Failed to open queue status=" << openStatus
<< std::endl;
return;
}
// Send some messages
sendMessage("Message one", queueId, session);
sendMessage("Message two", queueId, session);
sendMessage("Message three", queueId, session);
std::cerr << "Closing queue" << std::endl;
bmqa::CloseQueueStatus closeStatus =
session->closeQueueSync(&queueId);
if (!closeStatus)
{
std::cerr << "Failed to close queue status=" << closeStatus
<< std::endl;
}
}
void sendMessage(const std::string &msg, const bmqa::QueueId &queueId,
bmqa::Session *session)
{
bmqa::MessageEventBuilder builder;
session->loadMessageEventBuilder(&builder);
bmqa::Message &message = builder.startMessage();
message.setDataRef(msg.c_str(), msg.length());
int buildStatus = builder.packMessage(queueId);
if (buildStatus != 0)
{
std::cerr << "Failed to pack message msg=" << msg
<< " status="
<< bmqt::EventBuilderResult::Enum(buildStatus)
<< std::endl;
return;
}
const bmqa::MessageEvent &messageEvent = builder.messageEvent();
int postStatus = session->post(messageEvent);
if (!postStatus)
{
std::cerr << "Failed to post message msg=" << msg
<< " status=" << postStatus << std::endl;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment