Skip to content

Instantly share code, notes, and snippets.

@anjankow
Created April 26, 2023 12:06
Show Gist options
  • Save anjankow/c2aad2d2afc5e1da34af365ff709a2de to your computer and use it in GitHub Desktop.
Save anjankow/c2aad2d2afc5e1da34af365ff709a2de to your computer and use it in GitHub Desktop.
Event listener for Hyperledger Sawtooth blockchain, using zmq and sawtooth_sdk
// [dependencies]
// protobuf = "2.23"
// sawtooth-sdk = "0.5.2"
// zmq = "0.9.0"
// needed for write_to_bytes() method
use protobuf::Message;
const DEFAULT_VALIDATOR_ENDPOINT: &str = "tcp://validator:4004";
fn main() {
// context of the listening sockets
let ctx = zmq::Context::new();
// create a socket and connect to validator
let socket = ctx.socket(zmq::DEALER).unwrap();
socket.connect(DEFAULT_VALIDATOR_ENDPOINT).unwrap();
// subscribe
subscribe(&socket);
// start listening
loop {
let mut message = zmq::Message::new();
socket.recv(&mut message, 0 /* flags */).unwrap();
let event_list: sawtooth_sdk::messages::events::EventList = parse_event_message(&message);
for event in event_list.events {
if event.event_type == "sawtooth/block-commit" {
handler_block_commit(event);
} else if event.event_type == "sawtooth/state-delta" {
handler_state_delta(event);
} else {
println!("No handler for this event type: {}", event.event_type);
}
}
}
}
fn subscribe(socket: &zmq::Socket) {
// prepare event subscriptions
let subscription_block_commit = sawtooth_sdk::messages::events::EventSubscription {
event_type: "sawtooth/block-commit".to_string(),
..Default::default()
};
let subscription_state_delta = sawtooth_sdk::messages::events::EventSubscription {
event_type: "sawtooth/state-delta".to_string(),
..Default::default()
};
// prepare event subscribe request
let event_subscribe_req = sawtooth_sdk::messages::client_event::ClientEventsSubscribeRequest {
subscriptions: protobuf::RepeatedField::from_vec(vec![
subscription_block_commit,
subscription_state_delta,
]),
..Default::default()
};
// define a correlation id to confirm that the response is valid
let correlation_id = "random string".to_string();
// wrap it into a validator message, otherwise validator won't be able to understand it
let message = sawtooth_sdk::messages::validator::Message {
correlation_id: correlation_id.clone(),
message_type:
sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS_SUBSCRIBE_REQUEST,
content: event_subscribe_req.write_to_bytes().unwrap(),
..Default::default()
};
// send the message over the socket
socket
.send(
message.write_to_bytes().unwrap(),
0, /* flags, none specified */
)
.unwrap();
// and receive a response
let mut resp = zmq::Message::new();
socket
.recv(&mut resp, 0 /* flags, none specified */)
.unwrap();
// parse the response into a validator message
let validator_resp =
sawtooth_sdk::messages::validator::Message::parse_from_bytes(&resp.to_vec()).unwrap();
// and verify that the subscription succeeded
if validator_resp.correlation_id != correlation_id {
panic!("Wrong correlation_id")
}
if validator_resp.message_type
!= sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS_SUBSCRIBE_RESPONSE
{
panic!("Unexpected message type");
}
let subscription_resp =
sawtooth_sdk::messages::client_event::ClientEventsSubscribeResponse::parse_from_bytes(
&validator_resp.content,
)
.unwrap();
if subscription_resp.status
!= sawtooth_sdk::messages::client_event::ClientEventsSubscribeResponse_Status::OK
{
panic!("Failed to subscribe");
}
}
fn parse_event_message(message_raw: &zmq::Message) -> sawtooth_sdk::messages::events::EventList {
// wrapped in a validator message
let validator_msg =
sawtooth_sdk::messages::validator::Message::parse_from_bytes(&message_raw).unwrap();
// check if the message type is CLIENT_EVENTS
if validator_msg.message_type
!= sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS
{
// hint: such situation should not cause program panic,
// handle errors more gracefully in a production code
panic!("Invalid message type");
}
// validator message holds a list of events, we need to unpack them
let event_list =
sawtooth_sdk::messages::events::EventList::parse_from_bytes(validator_msg.get_content())
.unwrap();
event_list
}
fn handler_block_commit(event: sawtooth_sdk::messages::events::Event) {
let attributes = event.get_attributes();
for attribute in attributes {
if attribute.key == "state_root_hash" {
println!(
"Handling sawtooth/block-commit event, state root hash: {}",
attribute.value
)
}
}
}
fn handler_state_delta(event: sawtooth_sdk::messages::events::Event) {
println!(
"Handling sawtooth/state-delta event, data: {}",
String::from_utf8(event.data).unwrap()
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment