Skip to content

Instantly share code, notes, and snippets.

@tjtjtj
Created February 20, 2020 11:31
Show Gist options
  • Save tjtjtj/cb54c64ec35ae065c733187cfea955da to your computer and use it in GitHub Desktop.
Save tjtjtj/cb54c64ec35ae065c733187cfea955da to your computer and use it in GitHub Desktop.
go-kit amqp publish#1
package main
import (
"encoding/json"
"errors"
"log"
"net/http"
"github.com/go-kit/kit/endpoint"
"golang.org/x/net/context"
httptransport "github.com/go-kit/kit/transport/http"
"github.com/streadway/amqp"
)
type PublishService interface {
Publish(message string) (string, error)
}
type publishService struct{}
func (publishService) Publish(s string) (string, error) {
if s == "" {
return "", errors.New("empty")
}
err := pub("testqueue", s)
if err != nil {
return "", err
}
return "Done", nil
}
func pub(qname string, message string) error {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Print("failed Dial")
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Print("failed Channel")
return err
}
defer ch.Close()
q, err := ch.QueueDeclare(
qname, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Print("failed QueueDeclare")
return err
}
err = ch.Publish(
"", // exchange
q.Name, //q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
log.Print("failed Publish")
return err
}
log.Printf("pulished %s : %s", qname, message)
return nil
}
type publishRequest struct {
S string `json:"s"`
}
type publishResponse struct {
V string `json:"v"`
Err string `json:"err,omitempty"`
}
func decodePublishRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request publishRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
func makePublishEndpoint(svc PublishService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(publishRequest)
v, err := svc.Publish(req.S)
if err != nil {
return publishResponse{v, err.Error()}, nil
}
return publishResponse{v, ""}, nil
}
}
func main() {
svc := publishService{}
publishHandler := httptransport.NewServer(
makePublishEndpoint(svc),
decodePublishRequest,
encodeResponse,
)
http.Handle("/publish", publishHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment