Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Interface for testing nsq producers.
package nsqutils
import (
// Producer is an interface that nsq.Producer fulfills
type Producer interface {
Publish(string, []byte) error
// TestProducer implements Producer and just counts the number of messages
// optionally if it includes a nsq.Producer it will also send messages
type TestProducer struct {
Counters map[string]int32
Producer *nsq.Producer
LastMessage []byte
// Reset the counters
func (p *TestProducer) Reset() {
p.Counters = nil
p.LastMessage = make([]byte, 0)
// Count the total number of events
func (p *TestProducer) Count() int32 {
defer p.Unlock()
var i int32
for _, c := range p.Counters {
i += c
return i
// Publish tracks publishing to a topic
func (p *TestProducer) Publish(topic string, body []byte) error {
defer p.Unlock()
if p.Counters == nil {
p.Counters = make(map[string]int32)
p.LastMessage = body
if p.Producer != nil {
return p.Producer.Publish(topic, body)
return nil
// NoOpMessageDelegate can be used when constructing a test nsq.Message to allow Finish/Requeue
// messages to be a NoOp
type NoOpMessageDelegate struct{}
func (d *NoOpMessageDelegate) OnFinish(m *nsq.Message) {}
func (d *NoOpMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool) {}
func (d *NoOpMessageDelegate) OnTouch(m *nsq.Message) {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.