Created
August 15, 2019 20:11
-
-
Save rkuzner/81d8d55e3bb9d0f758bdc8deb063b8d0 to your computer and use it in GitHub Desktop.
Workshop Resiliencia - Demo Distributed Tracing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"net/http" | |
"sync/atomic" | |
"time" | |
// obtener las dependencias con: "go get -u DEPENDENCIA" | |
uuid "github.com/satori/go.uuid" | |
) | |
const ( | |
requestLimitNewLine = 60 | |
useCircuitBreaker = true | |
defaultAPIName = "Cliente" | |
targetAddress = "localhost" | |
defaultTargetPort = 4003 | |
targetPath = "work" | |
) | |
var ( | |
ticker *time.Ticker | |
requestCount uint32 | |
apiName string | |
targetPort uint | |
targetURL string | |
loggerURL string | |
httpClient http.Client | |
) | |
func init() { | |
flag.StringVar(&apiName, "apiName", defaultAPIName, "nombre de este cliente") | |
flag.UintVar(&targetPort, "targetPort", defaultTargetPort, "puerto del servidor de destino") | |
flag.Parse() | |
httpClient = http.Client{Timeout: time.Duration(1 * time.Second)} | |
targetURL = fmt.Sprintf("http://%s:%d/%s", targetAddress, targetPort, targetPath) | |
requestCount = 0 | |
loggerURL = "http://localhost:4444/log" | |
} | |
func main() { | |
fmt.Println(fmt.Sprintf("Cliente iniciado y solicitando números al Servidor (%s)", targetURL)) | |
var req *http.Request | |
var requestID string | |
var err error | |
var resp *http.Response | |
// inicializo un ticker para poder invocar ritmicamente al endpoint de healthCheck del server | |
for ticker = time.NewTicker(1 * time.Second); true; <-ticker.C { | |
req, _ = http.NewRequest(http.MethodGet, targetURL, nil) | |
if req != nil { | |
UniqueID, _ := uuid.NewV4() | |
requestID = UniqueID.String() | |
req.Header.Set("X-Request-Id", requestID) | |
startTime := time.Now() | |
resp, err = httpClient.Do(req) | |
endTime := time.Now() | |
statusCode := -1 | |
if resp != nil { | |
statusCode = resp.StatusCode | |
} | |
go backgroundRemoteLoggerPusher(startTime, endTime, requestID, statusCode) | |
if err != nil { | |
fmt.Print("X") | |
} else if resp.StatusCode >= http.StatusInternalServerError { | |
fmt.Print("5") | |
} else if resp.StatusCode >= http.StatusBadRequest { | |
fmt.Print("4") | |
} else { | |
fmt.Print(".") | |
} | |
if resp != nil && resp.Body != nil { | |
resp.Body.Close() | |
} | |
} else { | |
fmt.Print("x") | |
} | |
incrementRequestCount() | |
} | |
} | |
func incrementRequestCount() { | |
atomic.AddUint32(&requestCount, 1) | |
if atomic.CompareAndSwapUint32(&requestCount, requestLimitNewLine, 0) { | |
fmt.Println() | |
} | |
} | |
func backgroundRemoteLoggerPusher(startTime, endTime time.Time, requestID string, statusCode int) { | |
timeDelta := endTime.Sub(startTime) | |
timeStamp := endTime.Format(time.RFC3339) | |
logMessage := fmt.Sprintf("%s | api:%s | reqID:%s | status:%d | timeDelta:%f", | |
timeStamp, apiName, requestID, statusCode, timeDelta.Seconds()) | |
requestBodyMap := make(map[string]string) | |
requestBodyMap["message"] = logMessage | |
if requestBodyBytes, marshalErr := json.Marshal(requestBodyMap); marshalErr == nil { | |
http.Post(loggerURL, "application/json", bytes.NewBuffer(requestBodyBytes)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"net/http" | |
// obtener las dependencias con: "go get -u DEPENDENCIA" | |
"github.com/gin-gonic/gin" | |
) | |
const ( | |
serverPort = 4444 | |
) | |
var ( | |
loggerChannelBuffer uint | |
loggerChannel chan string | |
) | |
func init() { | |
loggerChannelBuffer = 100 | |
loggerChannel = make(chan string, loggerChannelBuffer) | |
} | |
func main() { | |
router := gin.New() | |
// declaramos los endpoints con sus handlers (Notaron la diferencia con http?) | |
router.GET("/ping", pingHandler) | |
router.POST("/log", logHandler) | |
fmt.Println("Levantamos el background printer") | |
go backgroundPrinter() | |
// Levantamos un server con el middleware | |
fmt.Println(fmt.Sprintf("Server iniciado y recibiendo logs en el puerto:%d", serverPort)) | |
router.Run(fmt.Sprintf(":%d", serverPort)) | |
} | |
func pingHandler(ginCtx *gin.Context) { | |
ginCtx.String(http.StatusOK, fmt.Sprintln("pong")) | |
} | |
func logHandler(ginCtx *gin.Context) { | |
// empiezo mi trabajo | |
var request map[string]string | |
if err := ginCtx.ShouldBindJSON(&request); err != nil { | |
ginCtx.JSON(http.StatusBadRequest, gin.H{"message": "invalid request"}) | |
} | |
loggerChannel <- request["message"] | |
// ...y respondo | |
ginCtx.JSON(http.StatusAccepted, gin.H{"message": "accepted"}) | |
} | |
func backgroundPrinter() { | |
for mensaje := range loggerChannel { | |
fmt.Println(mensaje) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"math/rand" | |
"net/http" | |
"os" | |
"strconv" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"time" | |
// obtener las dependencias con: "go get -u DEPENDENCIA" | |
"github.com/gin-contrib/pprof" | |
"github.com/gin-gonic/gin" | |
uuid "github.com/satori/go.uuid" | |
"github.com/sirupsen/logrus" | |
) | |
const ( | |
requestLimitNewLine = 60 | |
defaultServerPort = 4003 | |
defaultAPIName = "api" | |
defaultTargetAddress = "localhost" | |
defaultTargetPort = 0 | |
defaultTargetPath = "work" | |
) | |
var ( | |
requestCount uint32 | |
countMutex sync.Mutex | |
randomGenerator *rand.Rand | |
randomMutex sync.Mutex | |
apiName string | |
serverPort uint | |
targetAddress string | |
targetPort uint | |
targetPath string | |
targetURL string | |
loggerURL string | |
httpClient http.Client | |
logger *logrus.Logger | |
) | |
func init() { | |
flag.StringVar(&apiName, "apiName", defaultAPIName, "nombre de la API de este servidor") | |
flag.UintVar(&serverPort, "serverPort", defaultServerPort, "puerto de escucha del servidor") | |
flag.StringVar(&targetAddress, "targetAddress", defaultTargetAddress, "dirección del servidor de destino") | |
flag.UintVar(&targetPort, "targetPort", defaultTargetPort, "puerto del servidor de destino") | |
flag.StringVar(&targetPath, "targetPath", defaultTargetPath, "endpoint path del servidor de destino") | |
flag.Parse() | |
if strings.EqualFold(targetAddress, defaultTargetAddress) && serverPort == targetPort { | |
fmt.Println(fmt.Sprintf("referencia circular con el servidor de destino (%s:%d)", targetAddress, targetPort)) | |
flag.PrintDefaults() | |
os.Exit(2) | |
} | |
requestCount = 0 | |
randomGenerator = rand.New(rand.NewSource(time.Now().UnixNano())) | |
httpClient = http.Client{Timeout: time.Duration(1 * time.Second)} | |
targetURL = fmt.Sprintf("http://%s:%d/%s", targetAddress, targetPort, targetPath) | |
loggerURL = "http://localhost:4444/log" | |
logger = logrus.New() | |
} | |
func main() { | |
router := gin.New() | |
router.Use(requestCounter) | |
// declaramos los endpoints con sus handlers (Notaron la diferencia con http?) | |
router.GET("/ping", pingHandler) | |
router.GET("/work", requestID, remoteLogger, workHandler) | |
// agregamos los endpoints de PProf | |
pprof.Register(router) | |
// Levantamos un server con el middleware | |
fmt.Println(fmt.Sprintf("Server iniciado y escuchando en el puerto:%d", serverPort)) | |
if targetPort != 0 { | |
fmt.Println(fmt.Sprintf("...y solicitando números al Servidor (%s)", targetURL)) | |
} | |
router.Run(fmt.Sprintf(":%d", serverPort)) | |
} | |
func pingHandler(ginCtx *gin.Context) { | |
ginCtx.String(http.StatusOK, fmt.Sprintln("pong")) | |
} | |
func workHandler(ginCtx *gin.Context) { | |
defer startInternalTimer("workHandler").endTimer() | |
// empiezo mi trabajo | |
trabajarEnAlgo(generarNumeroEntero(10, 50)) | |
// pido un dato extra | |
otroNumero := subTarea1(ginCtx) | |
// hago mas trabajo | |
trabajarEnAlgo(generarNumeroEntero(10, 50)) | |
miNumero := generarNumeroEntero(1000, 10000) + otroNumero | |
// ...y respondo | |
ginCtx.JSON(http.StatusOK, gin.H{"work": miNumero}) | |
} | |
func subTarea1(ginCtx *gin.Context) int { | |
defer startInternalTimer("subTarea1").endTimer() | |
// empiezo mi trabajo | |
trabajarEnAlgo(generarNumeroEntero(50, 100)) | |
// pido un dato extra | |
otroNumero := subTarea2(ginCtx) | |
// hago mas trabajo | |
trabajarEnAlgo(generarNumeroEntero(50, 100)) | |
miNumero := generarNumeroEntero(100, 1000) + otroNumero | |
// ...y respondo | |
return miNumero | |
} | |
func subTarea2(ginCtx *gin.Context) int { | |
defer startInternalTimer("subTarea2").endTimer() | |
// hago mi trabajo | |
trabajarEnAlgo(generarNumeroEntero(50, 100)) | |
var otroNumero int | |
var req *http.Request | |
var requestID string | |
var err error | |
var resp *http.Response | |
var bodyBytes []byte | |
var bodyData map[string]interface{} | |
var interfaceValue interface{} | |
var foundNumber bool | |
// necesito un dato extra | |
if targetPort > 0 { | |
llamadaExterna := startInternalTimer("tareaExterna", strconv.Itoa(int(targetPort))) | |
// lo pido al servidor de destino | |
req, err = http.NewRequest(http.MethodGet, targetURL, nil) | |
if req != nil { | |
requestID = ginCtx.GetString("reqID") | |
req.Header.Set("X-Request-Id", requestID) | |
resp, err = httpClient.Do(req) | |
if resp != nil && resp.Body != nil { | |
defer resp.Body.Close() | |
bodyBytes, err = ioutil.ReadAll(resp.Body) | |
if bodyBytes != nil && len(bodyBytes) > 0 { | |
err = json.Unmarshal(bodyBytes, &bodyData) | |
} | |
if interfaceValue, foundNumber = bodyData["work"]; foundNumber { | |
switch workValue := interfaceValue.(type) { | |
case int: | |
otroNumero = workValue | |
case float64: | |
otroNumero = int(workValue) | |
} | |
} | |
} | |
} | |
llamadaExterna.endTimer() | |
} | |
if targetPort == 0 || !foundNumber || err != nil { | |
// lo genero aca mismo | |
otroNumero = generarNumeroEntero(1, 10) | |
} | |
// hago mas trabajo | |
trabajarEnAlgo(generarNumeroEntero(50, 100)) | |
miNumero := generarNumeroEntero(10, 100) + otroNumero | |
// ...y respondo | |
return miNumero | |
} | |
func trabajarEnAlgo(cantidad int) string { | |
var c string | |
for i := 0; i < cantidad*1000; i++ { | |
c = fmt.Sprintf("%d", i) | |
//time.Sleep(time.Millisecond) | |
} | |
return c | |
} | |
func generarNumeroEntero(piso, techo int) int { | |
if techo < piso { | |
return piso | |
} | |
randomMutex.Lock() | |
defer randomMutex.Unlock() | |
return randomGenerator.Intn(techo-piso) + piso | |
} | |
func requestCounter(ginCtx *gin.Context) { | |
// vamos al siguiente handler en la cadena | |
ginCtx.Next() | |
// aca va codigo que debe suceder despues del request | |
// statusCode := ginCtx.Writer.Status() | |
// if statusCode >= http.StatusInternalServerError { | |
// fmt.Print("5") | |
// } else if statusCode >= http.StatusBadRequest { | |
// fmt.Print("4") | |
// } else { | |
// fmt.Print(".") | |
// } | |
atomic.AddUint32(&requestCount, 1) | |
if atomic.CompareAndSwapUint32(&requestCount, requestLimitNewLine, 0) { | |
fmt.Println() | |
} | |
} | |
func requestID(ginCtx *gin.Context) { | |
// aca va codigo que debe suceder antes del request | |
requestID := ginCtx.Request.Header.Get("X-Request-Id") | |
if len(strings.TrimSpace(requestID)) == 0 { | |
// Si no había, genero uno nuevo | |
UniqueID, _ := uuid.NewV4() | |
requestID = UniqueID.String() | |
} | |
// y lo guardo en el contexto de Gin | |
ginCtx.Set("reqID", requestID) | |
// vamos al siguiente handler en la cadena | |
ginCtx.Next() | |
} | |
func remoteLogger(ginCtx *gin.Context) { | |
// aca va codigo que debe suceder antes del request | |
startTime := time.Now() | |
// vamos al siguiente handler en la cadena | |
ginCtx.Next() | |
// aca va codigo que debe suceder despues del request | |
endTime := time.Now() | |
requestID := ginCtx.GetString("reqID") | |
statusCode := ginCtx.Writer.Status() | |
go backgroundRemoteLoggerPusher(startTime, endTime, requestID, statusCode) | |
} | |
func backgroundRemoteLoggerPusher(startTime, endTime time.Time, requestID string, statusCode int) { | |
timeDelta := endTime.Sub(startTime) | |
timeStamp := endTime.Format(time.RFC3339) | |
logMessage := fmt.Sprintf("%s | api:%s | reqID:%s | status:%d | timeDelta:%f", | |
timeStamp, apiName, requestID, statusCode, timeDelta.Seconds()) | |
//fmt.Println(logMessage) | |
requestBodyMap := make(map[string]string) | |
requestBodyMap["message"] = logMessage | |
if requestBodyBytes, marshalErr := json.Marshal(requestBodyMap); marshalErr == nil { | |
http.Post(loggerURL, "application/json", bytes.NewBuffer(requestBodyBytes)) | |
} | |
} | |
func startInternalTimer(name string, extras ...string) internalTimer { | |
newTimer := internalTimer{ | |
timerName: name, | |
startTime: time.Now(), | |
} | |
newTimer.timerExtras = append(newTimer.timerExtras, extras...) | |
return newTimer | |
} | |
type internalTimer struct { | |
timerName string | |
timerExtras []string | |
startTime time.Time | |
} | |
func (theTimer internalTimer) endTimer() { | |
endTime := time.Now() | |
timeDelta := endTime.Sub(theTimer.startTime) | |
timeStamp := endTime.Format(time.RFC3339) | |
logMessage := fmt.Sprintf("%s | where:%s | extras:%v | timeDelta:%f", timeStamp, theTimer.timerName, theTimer.timerExtras, timeDelta.Seconds()) | |
logger.Info(logMessage) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment