Created
July 5, 2015 14:52
-
-
Save ik5/a19361f896d2cda86e9a to your computer and use it in GitHub Desktop.
attempted load on rabbit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"log" | |
"github.com/streadway/amqp" | |
) | |
var ( | |
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI") | |
exchangeName = flag.String("exchange", "test-exchange", "Durable AMQP exchange name") | |
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom") | |
routingKey = flag.String("key", "test-key", "AMQP routing key") | |
body = flag.String("body", "foobar", "Body of message") | |
reliable = flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting") | |
) | |
func init() { | |
flag.Parse() | |
} | |
func main() { | |
if err := publish(*uri, *exchangeName, *exchangeType, *routingKey, *body, *reliable); err != nil { | |
log.Fatalf("%s", err) | |
} | |
log.Printf("published %dB OK", len(*body)) | |
for { | |
} | |
} | |
func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error { | |
// This function dials, connects, declares, publishes, and tears down, | |
// all in one go. In a real service, you probably want to maintain a | |
// long-lived connection as state, and publish against that. | |
log.Printf("dialing %q", amqpURI) | |
connection, err := amqp.Dial(amqpURI) | |
if err != nil { | |
return fmt.Errorf("Dial: %s", err) | |
} | |
defer connection.Close() | |
log.Printf("got Connection, getting Channel") | |
channel, err := connection.Channel() | |
if err != nil { | |
return fmt.Errorf("Channel: %s", err) | |
} | |
log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange) | |
if err := channel.ExchangeDeclare( | |
exchange, // name | |
exchangeType, // type | |
true, // durable | |
false, // auto-deleted | |
false, // internal | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return fmt.Errorf("Exchange Declare: %s", err) | |
} | |
// Reliable publisher confirms require confirm.select support from the | |
// connection. | |
if reliable { | |
log.Printf("enabling publishing confirms.") | |
if err := channel.Confirm(false); err != nil { | |
return fmt.Errorf("Channel could not be put into confirm mode: %s", err) | |
} | |
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1)) | |
defer confirmOne(confirms) | |
} | |
exec := func(i int) (error) { | |
log.Printf("%d declared Exchange, publishing %dB body (%q)", i, len(body), body) | |
if err = channel.Publish( | |
exchange, // publish to an exchange | |
routingKey, // routing to 0 or more queues | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
Headers: amqp.Table{}, | |
ContentType: "text/plain", | |
ContentEncoding: "", | |
Body: []byte(body), | |
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent | |
Priority: 0, // 0-9 | |
// a bunch of application/implementation-specific fields | |
}, | |
); err != nil { | |
return fmt.Errorf("Exchange Publish: %s", err) | |
} | |
return nil | |
} | |
for { | |
for i := 0; i <= 9900000; i++ { | |
go exec(i + 1) | |
time.Sleep(1 * time.Millisecond) | |
} | |
time.Sleep(5000 * time.Millisecond) | |
} | |
return nil | |
} | |
// One would typically keep a channel of publishings, a sequence number, and a | |
// set of unacknowledged sequence numbers and loop until the publishing channel | |
// is closed. | |
func confirmOne(confirms <-chan amqp.Confirmation) { | |
log.Printf("waiting for confirmation of one publishing") | |
if confirmed := <-confirms; confirmed.Ack { | |
log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag) | |
} else { | |
log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment