Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
FLUFF - connecting pipes
package main
import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/go-redis/redis"
"github.com/nats-io/go-nats"
"github.com/quipo/statsd"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
func main() {
// get config form environment with github.com/caarlos0/env
cfg, err := config.NewConfig()
if err != nil {
fmt.Printf("Config Error: %v", err)
panic(err)
}
// Closing signal
stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh,
syscall.SIGKILL,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)
// Logger
log, err := makeLogger(cfg.LogLevel)
if err != nil {
panic(err)
}
defer log.Sync() // flushes buffer
asyncErrorHandler := func(conn *nats.Conn, c *nats.Subscription, err error) {
log.Error("error processing NATS message", zap.Any("conn", conn), zap.String("subject", c.Subject), zap.Error(err))
}
reconnectHandler := func(conn *nats.Conn) {
log.Info("reconnection happened", zap.Any("conn", conn))
}
disconnectHandler := func(conn *nats.Conn) {
log.Info("disconnection happened", zap.Any("conn", conn))
}
// Hide redis password
secureCfg := &config.Config{}
*secureCfg = cfg
secureCfg.RedisPassword = ""
ms := &runtime.MemStats{}
runtime.ReadMemStats(ms)
log.Info("Starting service...",
zap.Int("CPU cores", runtime.NumCPU()),
zap.String("Available Memory", fmt.Sprintf("%d MB", ms.Sys/1024)),
zap.Any("config", secureCfg))
defer log.Info("Service stopped")
// StatsD Client
stats := statsd.NewStatsdClient(strings.Join([]string{cfg.StatsdHost, cfg.StatsdPort}, ":"), "requestserver.")
err = stats.CreateSocket()
if err != nil {
log.Panic("failed to create StatsD socket", zap.Error(err))
}
defer stats.Close()
stats.Incr("service-init", 1)
defer stats.Incr("service-shutdown", 1)
// NATS
opts := nats.DefaultOptions
opts.Servers = cfg.NatsServers
opts.AllowReconnect = true
opts.MaxReconnect = -1
opts.AsyncErrorCB = asyncErrorHandler
opts.ReconnectedCB = reconnectHandler
opts.DisconnectedCB = disconnectHandler
nc, err := opts.Connect()
if err != nil {
log.Panic("failed to connect to NATS", zap.Error(err))
}
// REDIS
rc := redis.NewUniversalClient(&redis.UniversalOptions{
MasterName: cfg.RedisMasterName,
Addrs: cfg.RedisServers,
Password: cfg.RedisPassword,
DB: 0,
})
if _, err = rc.Ping().Result(); err != nil {
log.Panic("failed to connect to Redis", zap.Error(err))
}
// HEALTHCHECK
hc := healthcheck.NewHealthCheck(nc, rc, stats, log)
sm := http.NewServeMux()
sm.HandleFunc("/check", hc.ServeHTTP)
srv := &http.Server{Addr: ":" + cfg.HTTPPort, Handler: sm}
go func() {
if err = srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error("failed to start healthcheck: %v", zap.Error(err))
}
}()
// GRPC Init
server := requestserver.NewServer(stats, log, rc, nc, filter.ConcurrentBuilder)
go server.Start()
lis, err := net.Listen("tcp", ":"+cfg.GRPCPort)
if err != nil {
log.Panic("failed to initialize grpc tcp listener: %v", zap.Error(err))
}
grpcServer := grpc.NewServer()
types.RegisterProtoServer(grpcServer, server)
go grpcServer.Serve(lis)
<-server.Started
log.Info("Service started")
// Shutdown
sig := <-stopCh
defer close(stopCh)
log.Info("Stopping service...", zap.Any("signal", sig))
// stop grpc
grpcServer.GracefulStop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Error("failed to stop healthcheck", zap.Error(err))
}
// stop server services
if err := server.Stop(); err != nil {
log.Error("failed to stop server services", zap.Error(err))
}
// stop nats connections
if err := nc.Flush(); err != nil {
log.Error("failed to flush NATS buffer", zap.Error(err))
}
nc.Close()
// stop redis
if err := rc.Close(); err != nil {
log.Error("failed to stop Redis", zap.Error(err))
}
}
func makeLogger(level string) (*zap.Logger, error) {
var lvl zapcore.Level
switch level {
case "ERROR":
lvl = zap.ErrorLevel
case "INFO":
lvl = zap.InfoLevel
case "DEBUG":
lvl = zap.DebugLevel
default:
lvl = zap.InfoLevel
}
// same as NewProduction() except for the log level!
logCfg := zap.Config{
Level: zap.NewAtomicLevelAt(lvl),
Development: false,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: "json",
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
return logCfg.Build()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.