Created
April 26, 2023 12:06
-
-
Save anjankow/c2aad2d2afc5e1da34af365ff709a2de to your computer and use it in GitHub Desktop.
Event listener for Hyperledger Sawtooth blockchain, using zmq and sawtooth_sdk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// [dependencies] | |
// protobuf = "2.23" | |
// sawtooth-sdk = "0.5.2" | |
// zmq = "0.9.0" | |
// needed for write_to_bytes() method | |
use protobuf::Message; | |
const DEFAULT_VALIDATOR_ENDPOINT: &str = "tcp://validator:4004"; | |
fn main() { | |
// context of the listening sockets | |
let ctx = zmq::Context::new(); | |
// create a socket and connect to validator | |
let socket = ctx.socket(zmq::DEALER).unwrap(); | |
socket.connect(DEFAULT_VALIDATOR_ENDPOINT).unwrap(); | |
// subscribe | |
subscribe(&socket); | |
// start listening | |
loop { | |
let mut message = zmq::Message::new(); | |
socket.recv(&mut message, 0 /* flags */).unwrap(); | |
let event_list: sawtooth_sdk::messages::events::EventList = parse_event_message(&message); | |
for event in event_list.events { | |
if event.event_type == "sawtooth/block-commit" { | |
handler_block_commit(event); | |
} else if event.event_type == "sawtooth/state-delta" { | |
handler_state_delta(event); | |
} else { | |
println!("No handler for this event type: {}", event.event_type); | |
} | |
} | |
} | |
} | |
fn subscribe(socket: &zmq::Socket) { | |
// prepare event subscriptions | |
let subscription_block_commit = sawtooth_sdk::messages::events::EventSubscription { | |
event_type: "sawtooth/block-commit".to_string(), | |
..Default::default() | |
}; | |
let subscription_state_delta = sawtooth_sdk::messages::events::EventSubscription { | |
event_type: "sawtooth/state-delta".to_string(), | |
..Default::default() | |
}; | |
// prepare event subscribe request | |
let event_subscribe_req = sawtooth_sdk::messages::client_event::ClientEventsSubscribeRequest { | |
subscriptions: protobuf::RepeatedField::from_vec(vec![ | |
subscription_block_commit, | |
subscription_state_delta, | |
]), | |
..Default::default() | |
}; | |
// define a correlation id to confirm that the response is valid | |
let correlation_id = "random string".to_string(); | |
// wrap it into a validator message, otherwise validator won't be able to understand it | |
let message = sawtooth_sdk::messages::validator::Message { | |
correlation_id: correlation_id.clone(), | |
message_type: | |
sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS_SUBSCRIBE_REQUEST, | |
content: event_subscribe_req.write_to_bytes().unwrap(), | |
..Default::default() | |
}; | |
// send the message over the socket | |
socket | |
.send( | |
message.write_to_bytes().unwrap(), | |
0, /* flags, none specified */ | |
) | |
.unwrap(); | |
// and receive a response | |
let mut resp = zmq::Message::new(); | |
socket | |
.recv(&mut resp, 0 /* flags, none specified */) | |
.unwrap(); | |
// parse the response into a validator message | |
let validator_resp = | |
sawtooth_sdk::messages::validator::Message::parse_from_bytes(&resp.to_vec()).unwrap(); | |
// and verify that the subscription succeeded | |
if validator_resp.correlation_id != correlation_id { | |
panic!("Wrong correlation_id") | |
} | |
if validator_resp.message_type | |
!= sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS_SUBSCRIBE_RESPONSE | |
{ | |
panic!("Unexpected message type"); | |
} | |
let subscription_resp = | |
sawtooth_sdk::messages::client_event::ClientEventsSubscribeResponse::parse_from_bytes( | |
&validator_resp.content, | |
) | |
.unwrap(); | |
if subscription_resp.status | |
!= sawtooth_sdk::messages::client_event::ClientEventsSubscribeResponse_Status::OK | |
{ | |
panic!("Failed to subscribe"); | |
} | |
} | |
fn parse_event_message(message_raw: &zmq::Message) -> sawtooth_sdk::messages::events::EventList { | |
// wrapped in a validator message | |
let validator_msg = | |
sawtooth_sdk::messages::validator::Message::parse_from_bytes(&message_raw).unwrap(); | |
// check if the message type is CLIENT_EVENTS | |
if validator_msg.message_type | |
!= sawtooth_sdk::messages::validator::Message_MessageType::CLIENT_EVENTS | |
{ | |
// hint: such situation should not cause program panic, | |
// handle errors more gracefully in a production code | |
panic!("Invalid message type"); | |
} | |
// validator message holds a list of events, we need to unpack them | |
let event_list = | |
sawtooth_sdk::messages::events::EventList::parse_from_bytes(validator_msg.get_content()) | |
.unwrap(); | |
event_list | |
} | |
fn handler_block_commit(event: sawtooth_sdk::messages::events::Event) { | |
let attributes = event.get_attributes(); | |
for attribute in attributes { | |
if attribute.key == "state_root_hash" { | |
println!( | |
"Handling sawtooth/block-commit event, state root hash: {}", | |
attribute.value | |
) | |
} | |
} | |
} | |
fn handler_state_delta(event: sawtooth_sdk::messages::events::Event) { | |
println!( | |
"Handling sawtooth/state-delta event, data: {}", | |
String::from_utf8(event.data).unwrap() | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment