Skip to content

Instantly share code, notes, and snippets.

@rkuzner
Created August 15, 2019 20:11
Show Gist options
  • Save rkuzner/81d8d55e3bb9d0f758bdc8deb063b8d0 to your computer and use it in GitHub Desktop.
Save rkuzner/81d8d55e3bb9d0f758bdc8deb063b8d0 to your computer and use it in GitHub Desktop.
Workshop Resiliencia - Demo Distributed Tracing
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"net/http"
"sync/atomic"
"time"
// obtener las dependencias con: "go get -u DEPENDENCIA"
uuid "github.com/satori/go.uuid"
)
const (
requestLimitNewLine = 60
useCircuitBreaker = true
defaultAPIName = "Cliente"
targetAddress = "localhost"
defaultTargetPort = 4003
targetPath = "work"
)
var (
ticker *time.Ticker
requestCount uint32
apiName string
targetPort uint
targetURL string
loggerURL string
httpClient http.Client
)
func init() {
flag.StringVar(&apiName, "apiName", defaultAPIName, "nombre de este cliente")
flag.UintVar(&targetPort, "targetPort", defaultTargetPort, "puerto del servidor de destino")
flag.Parse()
httpClient = http.Client{Timeout: time.Duration(1 * time.Second)}
targetURL = fmt.Sprintf("http://%s:%d/%s", targetAddress, targetPort, targetPath)
requestCount = 0
loggerURL = "http://localhost:4444/log"
}
func main() {
fmt.Println(fmt.Sprintf("Cliente iniciado y solicitando números al Servidor (%s)", targetURL))
var req *http.Request
var requestID string
var err error
var resp *http.Response
// inicializo un ticker para poder invocar ritmicamente al endpoint de healthCheck del server
for ticker = time.NewTicker(1 * time.Second); true; <-ticker.C {
req, _ = http.NewRequest(http.MethodGet, targetURL, nil)
if req != nil {
UniqueID, _ := uuid.NewV4()
requestID = UniqueID.String()
req.Header.Set("X-Request-Id", requestID)
startTime := time.Now()
resp, err = httpClient.Do(req)
endTime := time.Now()
statusCode := -1
if resp != nil {
statusCode = resp.StatusCode
}
go backgroundRemoteLoggerPusher(startTime, endTime, requestID, statusCode)
if err != nil {
fmt.Print("X")
} else if resp.StatusCode >= http.StatusInternalServerError {
fmt.Print("5")
} else if resp.StatusCode >= http.StatusBadRequest {
fmt.Print("4")
} else {
fmt.Print(".")
}
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
} else {
fmt.Print("x")
}
incrementRequestCount()
}
}
func incrementRequestCount() {
atomic.AddUint32(&requestCount, 1)
if atomic.CompareAndSwapUint32(&requestCount, requestLimitNewLine, 0) {
fmt.Println()
}
}
func backgroundRemoteLoggerPusher(startTime, endTime time.Time, requestID string, statusCode int) {
timeDelta := endTime.Sub(startTime)
timeStamp := endTime.Format(time.RFC3339)
logMessage := fmt.Sprintf("%s | api:%s | reqID:%s | status:%d | timeDelta:%f",
timeStamp, apiName, requestID, statusCode, timeDelta.Seconds())
requestBodyMap := make(map[string]string)
requestBodyMap["message"] = logMessage
if requestBodyBytes, marshalErr := json.Marshal(requestBodyMap); marshalErr == nil {
http.Post(loggerURL, "application/json", bytes.NewBuffer(requestBodyBytes))
}
}
package main
import (
"fmt"
"net/http"
// obtener las dependencias con: "go get -u DEPENDENCIA"
"github.com/gin-gonic/gin"
)
const (
serverPort = 4444
)
var (
loggerChannelBuffer uint
loggerChannel chan string
)
func init() {
loggerChannelBuffer = 100
loggerChannel = make(chan string, loggerChannelBuffer)
}
func main() {
router := gin.New()
// declaramos los endpoints con sus handlers (Notaron la diferencia con http?)
router.GET("/ping", pingHandler)
router.POST("/log", logHandler)
fmt.Println("Levantamos el background printer")
go backgroundPrinter()
// Levantamos un server con el middleware
fmt.Println(fmt.Sprintf("Server iniciado y recibiendo logs en el puerto:%d", serverPort))
router.Run(fmt.Sprintf(":%d", serverPort))
}
func pingHandler(ginCtx *gin.Context) {
ginCtx.String(http.StatusOK, fmt.Sprintln("pong"))
}
func logHandler(ginCtx *gin.Context) {
// empiezo mi trabajo
var request map[string]string
if err := ginCtx.ShouldBindJSON(&request); err != nil {
ginCtx.JSON(http.StatusBadRequest, gin.H{"message": "invalid request"})
}
loggerChannel <- request["message"]
// ...y respondo
ginCtx.JSON(http.StatusAccepted, gin.H{"message": "accepted"})
}
func backgroundPrinter() {
for mensaje := range loggerChannel {
fmt.Println(mensaje)
}
}
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
// obtener las dependencias con: "go get -u DEPENDENCIA"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
)
const (
requestLimitNewLine = 60
defaultServerPort = 4003
defaultAPIName = "api"
defaultTargetAddress = "localhost"
defaultTargetPort = 0
defaultTargetPath = "work"
)
var (
requestCount uint32
countMutex sync.Mutex
randomGenerator *rand.Rand
randomMutex sync.Mutex
apiName string
serverPort uint
targetAddress string
targetPort uint
targetPath string
targetURL string
loggerURL string
httpClient http.Client
logger *logrus.Logger
)
func init() {
flag.StringVar(&apiName, "apiName", defaultAPIName, "nombre de la API de este servidor")
flag.UintVar(&serverPort, "serverPort", defaultServerPort, "puerto de escucha del servidor")
flag.StringVar(&targetAddress, "targetAddress", defaultTargetAddress, "dirección del servidor de destino")
flag.UintVar(&targetPort, "targetPort", defaultTargetPort, "puerto del servidor de destino")
flag.StringVar(&targetPath, "targetPath", defaultTargetPath, "endpoint path del servidor de destino")
flag.Parse()
if strings.EqualFold(targetAddress, defaultTargetAddress) && serverPort == targetPort {
fmt.Println(fmt.Sprintf("referencia circular con el servidor de destino (%s:%d)", targetAddress, targetPort))
flag.PrintDefaults()
os.Exit(2)
}
requestCount = 0
randomGenerator = rand.New(rand.NewSource(time.Now().UnixNano()))
httpClient = http.Client{Timeout: time.Duration(1 * time.Second)}
targetURL = fmt.Sprintf("http://%s:%d/%s", targetAddress, targetPort, targetPath)
loggerURL = "http://localhost:4444/log"
logger = logrus.New()
}
func main() {
router := gin.New()
router.Use(requestCounter)
// declaramos los endpoints con sus handlers (Notaron la diferencia con http?)
router.GET("/ping", pingHandler)
router.GET("/work", requestID, remoteLogger, workHandler)
// agregamos los endpoints de PProf
pprof.Register(router)
// Levantamos un server con el middleware
fmt.Println(fmt.Sprintf("Server iniciado y escuchando en el puerto:%d", serverPort))
if targetPort != 0 {
fmt.Println(fmt.Sprintf("...y solicitando números al Servidor (%s)", targetURL))
}
router.Run(fmt.Sprintf(":%d", serverPort))
}
func pingHandler(ginCtx *gin.Context) {
ginCtx.String(http.StatusOK, fmt.Sprintln("pong"))
}
func workHandler(ginCtx *gin.Context) {
defer startInternalTimer("workHandler").endTimer()
// empiezo mi trabajo
trabajarEnAlgo(generarNumeroEntero(10, 50))
// pido un dato extra
otroNumero := subTarea1(ginCtx)
// hago mas trabajo
trabajarEnAlgo(generarNumeroEntero(10, 50))
miNumero := generarNumeroEntero(1000, 10000) + otroNumero
// ...y respondo
ginCtx.JSON(http.StatusOK, gin.H{"work": miNumero})
}
func subTarea1(ginCtx *gin.Context) int {
defer startInternalTimer("subTarea1").endTimer()
// empiezo mi trabajo
trabajarEnAlgo(generarNumeroEntero(50, 100))
// pido un dato extra
otroNumero := subTarea2(ginCtx)
// hago mas trabajo
trabajarEnAlgo(generarNumeroEntero(50, 100))
miNumero := generarNumeroEntero(100, 1000) + otroNumero
// ...y respondo
return miNumero
}
func subTarea2(ginCtx *gin.Context) int {
defer startInternalTimer("subTarea2").endTimer()
// hago mi trabajo
trabajarEnAlgo(generarNumeroEntero(50, 100))
var otroNumero int
var req *http.Request
var requestID string
var err error
var resp *http.Response
var bodyBytes []byte
var bodyData map[string]interface{}
var interfaceValue interface{}
var foundNumber bool
// necesito un dato extra
if targetPort > 0 {
llamadaExterna := startInternalTimer("tareaExterna", strconv.Itoa(int(targetPort)))
// lo pido al servidor de destino
req, err = http.NewRequest(http.MethodGet, targetURL, nil)
if req != nil {
requestID = ginCtx.GetString("reqID")
req.Header.Set("X-Request-Id", requestID)
resp, err = httpClient.Do(req)
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
bodyBytes, err = ioutil.ReadAll(resp.Body)
if bodyBytes != nil && len(bodyBytes) > 0 {
err = json.Unmarshal(bodyBytes, &bodyData)
}
if interfaceValue, foundNumber = bodyData["work"]; foundNumber {
switch workValue := interfaceValue.(type) {
case int:
otroNumero = workValue
case float64:
otroNumero = int(workValue)
}
}
}
}
llamadaExterna.endTimer()
}
if targetPort == 0 || !foundNumber || err != nil {
// lo genero aca mismo
otroNumero = generarNumeroEntero(1, 10)
}
// hago mas trabajo
trabajarEnAlgo(generarNumeroEntero(50, 100))
miNumero := generarNumeroEntero(10, 100) + otroNumero
// ...y respondo
return miNumero
}
func trabajarEnAlgo(cantidad int) string {
var c string
for i := 0; i < cantidad*1000; i++ {
c = fmt.Sprintf("%d", i)
//time.Sleep(time.Millisecond)
}
return c
}
func generarNumeroEntero(piso, techo int) int {
if techo < piso {
return piso
}
randomMutex.Lock()
defer randomMutex.Unlock()
return randomGenerator.Intn(techo-piso) + piso
}
func requestCounter(ginCtx *gin.Context) {
// vamos al siguiente handler en la cadena
ginCtx.Next()
// aca va codigo que debe suceder despues del request
// statusCode := ginCtx.Writer.Status()
// if statusCode >= http.StatusInternalServerError {
// fmt.Print("5")
// } else if statusCode >= http.StatusBadRequest {
// fmt.Print("4")
// } else {
// fmt.Print(".")
// }
atomic.AddUint32(&requestCount, 1)
if atomic.CompareAndSwapUint32(&requestCount, requestLimitNewLine, 0) {
fmt.Println()
}
}
func requestID(ginCtx *gin.Context) {
// aca va codigo que debe suceder antes del request
requestID := ginCtx.Request.Header.Get("X-Request-Id")
if len(strings.TrimSpace(requestID)) == 0 {
// Si no había, genero uno nuevo
UniqueID, _ := uuid.NewV4()
requestID = UniqueID.String()
}
// y lo guardo en el contexto de Gin
ginCtx.Set("reqID", requestID)
// vamos al siguiente handler en la cadena
ginCtx.Next()
}
func remoteLogger(ginCtx *gin.Context) {
// aca va codigo que debe suceder antes del request
startTime := time.Now()
// vamos al siguiente handler en la cadena
ginCtx.Next()
// aca va codigo que debe suceder despues del request
endTime := time.Now()
requestID := ginCtx.GetString("reqID")
statusCode := ginCtx.Writer.Status()
go backgroundRemoteLoggerPusher(startTime, endTime, requestID, statusCode)
}
func backgroundRemoteLoggerPusher(startTime, endTime time.Time, requestID string, statusCode int) {
timeDelta := endTime.Sub(startTime)
timeStamp := endTime.Format(time.RFC3339)
logMessage := fmt.Sprintf("%s | api:%s | reqID:%s | status:%d | timeDelta:%f",
timeStamp, apiName, requestID, statusCode, timeDelta.Seconds())
//fmt.Println(logMessage)
requestBodyMap := make(map[string]string)
requestBodyMap["message"] = logMessage
if requestBodyBytes, marshalErr := json.Marshal(requestBodyMap); marshalErr == nil {
http.Post(loggerURL, "application/json", bytes.NewBuffer(requestBodyBytes))
}
}
func startInternalTimer(name string, extras ...string) internalTimer {
newTimer := internalTimer{
timerName: name,
startTime: time.Now(),
}
newTimer.timerExtras = append(newTimer.timerExtras, extras...)
return newTimer
}
type internalTimer struct {
timerName string
timerExtras []string
startTime time.Time
}
func (theTimer internalTimer) endTimer() {
endTime := time.Now()
timeDelta := endTime.Sub(theTimer.startTime)
timeStamp := endTime.Format(time.RFC3339)
logMessage := fmt.Sprintf("%s | where:%s | extras:%v | timeDelta:%f", timeStamp, theTimer.timerName, theTimer.timerExtras, timeDelta.Seconds())
logger.Info(logMessage)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment