Skip to content

Instantly share code, notes, and snippets.

@zachelrath
Created May 19, 2022 13:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zachelrath/dca3ffcb169e1e962bcebff62f50cc0c to your computer and use it in GitHub Desktop.
Save zachelrath/dca3ffcb169e1e962bcebff62f50cc0c to your computer and use it in GitHub Desktop.
Simple pulsar message listener
@Service
@Slf4j
@ConditionalOnProperty(name = "consumer.enabled", havingValue = "true")
public class SimpleMessageListener implements MessageListener<byte[]> {
private final int maxConsumeDelaySeconds;
private static final String CONSUMER_PREFIX = "(CONSUMER) [{}]: ";
public SimpleMessageListener(
@Value("${consumer.max.consume.delay.seconds}") int maxConsumeDelaySeconds) {
this.maxConsumeDelaySeconds = maxConsumeDelaySeconds;
}
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
log.info(CONSUMER_PREFIX + "Received, beginning to process ...", msg.getMessageId());
try {
// Sleep for a random delay to simulate delay in message consumption/processing
TimeUnit.SECONDS.sleep((int) Math.ceil(Math.random() * maxConsumeDelaySeconds));
consumer.acknowledge(msg);
log.info(CONSUMER_PREFIX + "Successfully processed!", msg.getMessageId());
} catch (InterruptedException e) {
e.printStackTrace();
log.error("(CONSUMER) THREAD INTERRUPTED --- DID NOT FINISH PROCESSING!");
} catch (PulsarClientException e) {
log.error("(CONSUMER) Pulsar client exception", e);
consumer.negativeAcknowledge(msg);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment