Skip to content

Instantly share code, notes, and snippets.

@peterbourgon
Created May 31, 2012 11:28
Show Gist options
  • Save peterbourgon/8172530fc8b680332fcb to your computer and use it in GitHub Desktop.
Save peterbourgon/8172530fc8b680332fcb to your computer and use it in GitHub Desktop.
package main
import (
"net"
"amqp-rpc" // FIXME
"fmt"
"time"
)
func main() {
channel, err := connect()
if err != nil {
fmt.Printf("connect: %s\n", err)
return
}
fmt.Printf("connected, sleeping\n")
time.Sleep(2 * time.Second)
fmt.Printf("closing channel (blocking here)\n")
if err := channel.Close(); err != nil {
fmt.Printf("close: %s\n", err)
return
}
fmt.Printf("done\n")
}
func connect() (*amqp.Channel, error) {
rabbitEndpoint, user, pass := "localhost:5672", "guest", "guest"
exchangeName, queueName := "demo.exchange", "demo.queue"
routingKey := "demo.routingkey"
conn, err := net.Dial("tcp", rabbitEndpoint)
if err != nil {
return nil, fmt.Errorf("Dial: %s", err)
}
connection, err := amqp.NewConnection(
conn,
&amqp.PlainAuth{
Username: user,
Password: pass,
},
"/",
)
if err != nil {
return nil, fmt.Errorf("Connection: %s", err)
}
channel, err := connection.Channel()
if err != nil {
return nil, fmt.Errorf("Channel: %s", err)
}
noArgs := amqp.Table{}
exchange := channel.E(exchangeName)
if err := exchange.Declare(
amqp.UntilDeleted, // lifetime = durable
"direct", // type
false, // internal
false, // noWait
noArgs, // arguments
); err != nil {
channel.Close()
return nil, fmt.Errorf("Exchange Declare: %s", err)
}
queue := channel.Q(queueName)
queueState, err := queue.Declare(
amqp.UntilUnused, // lifetime = auto-delete
false, // exclusive
false, // noWait
noArgs, // arguments
)
if err != nil {
channel.Close()
return nil, fmt.Errorf("Queue Declare: %s", err)
}
if !queueState.Declared {
channel.Close()
return nil, fmt.Errorf("Queue Declare: somehow Undeclared")
}
if err := queue.Bind(
routingKey, // routingKey
exchangeName, // sourceExchange
false, // noWait
noArgs, // arguments
); err != nil {
channel.Close()
return nil, fmt.Errorf("Queue Bind: %s", err)
}
deliveries, err := queue.Consume(
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
"", // consumerTag,
noArgs, // arguments
nil, // deliveries (ie. create a deliveries channel for me)
)
if err != nil {
channel.Close()
return nil, fmt.Errorf("Queue Consume: %s", err)
}
go consumeLoop(deliveries)
return channel, nil
}
func consumeLoop(deliveries chan amqp.Delivery) {
for {
fmt.Printf("%v\n", <-deliveries)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment