Skip to content

Instantly share code, notes, and snippets.

@baburgess
Last active February 28, 2021 03:24
Show Gist options
  • Save baburgess/a0885019833d3d3bc9f6198542b65ac9 to your computer and use it in GitHub Desktop.
Save baburgess/a0885019833d3d3bc9f6198542b65ac9 to your computer and use it in GitHub Desktop.
OpenEthereum Healthcheck
package main
import (
"bytes"
"fmt"
"encoding/json"
"net/http"
"net/http/httptest"
"net/http/httputil"
"strings"
"time"
"github.com/heptiolabs/healthcheck"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/google/go-cmp/cmp"
"strconv"
"io/ioutil"
"sync"
"io"
"log"
"net"
"os"
"errors"
)
// The following environment variables are required with appropriate settings
//
// OPENETHEREUM_API := "127.0.0.1:8545"
// HEALTHCHECK_RPC := = "127.0.0.1:8080"
// NAMESPACE := "ethereum-nodes"
// PRINT_REQUESTS := false
// REQUEST_TIMEOUT := 10
// POLLING_RATE := 10
// BLOCK_UPDATE_TIMEOUT := 45
// BLOCK_SYNC_DIFFERENCE := 10
// PEER_MINIMUM := 1
// PRINT_CONFIG_MSG = false
//Configuration struct
type configStruct struct {
//Address to the OpenEthereum API
openethereumAPI string
//Address to bind the healthcheck server to
healthcheckRPC string
//Namespace for the prometheus endpoint
nameSpace string
//Maximum time to wait for http requests
requestTimeout time.Duration
//Rate to poll the OpenEthereum API at
pollingRate time.Duration
//Number of blocks OpenEthereum's current block can be away from the
//estimated current block before a healthcheck error is triggered
blockSyncDifference int64
//Minimum number of peers that can be connected to OpenEthereum
//without a healthcheck error being triggered
peerMinimum int
//Maximum time OpenEthereum can take to change its current block
//before a healthcheck error is triggered
blockUpdateTimeout time.Duration
//Debug variable to print the responses from the OpenEthereum API
printRequests bool
//Debug variable to print the status of the configuration load
printConfigMsg bool
}
var EnvVarEmpty = errors.New("getenv: environment variable empty")
//Get string enviroment variable
func getenvStr(key string) (string, error) {
v := os.Getenv(key)
if v == "" {
return v, EnvVarEmpty
}
return v, nil
}
//Get integer enviroment variable
func getenvInt(key string) (int, error) {
s, err := getenvStr(key)
if err != nil {
return 0, err
}
v, err := strconv.Atoi(s)
if err != nil {
return 0, err
}
return v, nil
}
//Get bool enviroment variable
func getenvBool(key string) (bool, error) {
s, err := getenvStr(key)
if err != nil {
return false, err
}
v, err := strconv.ParseBool(s)
if err != nil {
return false, err
}
return v, nil
}
//Create a ListenAndServe function that accepts an io.Closer
func ListenAndServeWithClose(addr string, handler http.Handler) (sc io.Closer, err error) {
var listener net.Listener
srv := &http.Server{Addr: addr, Handler: handler}
if addr == "" {
addr = ":http"
}
listener, err = net.Listen("tcp", addr)
if err != nil {
return nil, err
}
go func() {
err := srv.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)})
if err != nil {
log.Println("HTTP Server Error - ", err)
}
}()
return listener, nil
}
//Create tcpKeepAliveListener for deadline
type tcpKeepAliveListener struct {
*net.TCPListener
}
//Implement TCP deadline to remove connected clients when we close
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetDeadline(time.Now().Add(2 * time.Second))
return tc, nil
}
// Default configuration values for the healthcheck server
func(config * configStruct) loadConfig() {
var err error
config.printConfigMsg, err = getenvBool("PRINT_CONFIG_MSG")
if err != nil {
config.printRequests = false
}
config.openethereumAPI = os.Getenv("OPENETHEREUM_API")
if config.openethereumAPI == "" {
if config.printConfigMsg {
fmt.Println("could not load OPENETHEREUM_API using default")
}
config.openethereumAPI = "127.0.0.1:8545"
}
config.healthcheckRPC = os.Getenv("HEALTHCHECK_RPC")
if config.healthcheckRPC == "" {
if config.printConfigMsg {
fmt.Println("could not load HEALTHCHECK_RPC using default")
}
config.healthcheckRPC = "127.0.0.1:8080"
}
config.nameSpace = os.Getenv("NAMESPACE")
if config.nameSpace == "" {
if config.printConfigMsg {
fmt.Println("could not load NAMESPACE using default")
}
config.nameSpace = "ethereum-nodes"
}
config.printRequests, err = getenvBool("PRINT_REQUESTS")
if err != nil {
if config.printConfigMsg {
fmt.Println("could not load PRINT_REQUESTS using default")
}
config.printRequests = false
}
requestTimeout, err := getenvInt("REQUEST_TIMEOUT")
if err != nil {
if config.printConfigMsg {
fmt.Println("could not load REQUEST_TIMEOUT using default")
}
config.requestTimeout = 10 * time.Second
} else {
config.requestTimeout = time.Duration(requestTimeout) * time.Second}
pollingRate, err := getenvInt("POLLING_RATE")
if err != nil {
if config.printConfigMsg {
fmt.Println("could not load POLLING_RATE using default")
}
config.pollingRate = 10 * time.Second
} else {
config.pollingRate = time.Duration(pollingRate) * time.Second}
blockUpdateTimeout, err := getenvInt("BLOCK_UPDATE_TIMEOUT")
if err != nil {
if config.printConfigMsg {
fmt.Println("could not load BLOCK_UPDATE_TIMEOUT using default")
}
config.blockUpdateTimeout = 45 * time.Second
} else {
config.blockUpdateTimeout = time.Duration(blockUpdateTimeout) * time.Second}
blockSyncDifference, err := getenvInt("BLOCK_SYNC_DIFFERENCE")
if err != nil {
if config.printConfigMsg {
fmt.Println("could not load BLOCK_SYNC_DIFFERENCE using default")
}
config.blockSyncDifference = 10
} else {
config.blockSyncDifference = int64(blockSyncDifference)}
config.peerMinimum, err = getenvInt("PEER_MINIMUM")
if err != nil {
if config.printConfigMsg {
fmt.Println("could not load PEER_MINIMUM using default")
}
config.peerMinimum = 1
}
}
type Message struct {
Name, Text string
}
//Asynchronous upstream struct
type aSyncDataStruct struct {
//Response structs to process the OpenEthereum responses
ethSyncResp OpenEthereumResponse
parityNetPeersResp OpenEthereumResponse
//Healthchecks to allow the status to be shared between handlers
ethSyncCheck healthcheck.Check
parityNetPeersCheck healthcheck.Check
tcpDialCheck healthcheck.Check
blockRefreshCheck healthcheck.Check
minimumPeersCheck healthcheck.Check
blockSyncCheck healthcheck.Check
}
//OpenEthereum response struct for json parsing
type OpenEthereumResponse struct {
//Raw json response from the OpenEthereum API
respBody string
//Request ID
Id int `json:"id"`
//JSON RPC version
Jsonrpc string `json:"jsonrpc"`
//Result struct to process the json
Result OpenEthereumResult `json:"result"`
}
//OpenEthereum result field struct for json parsing
type OpenEthereumResult struct {
//Result field for parityNetPeers
Active int `json:"active"`
Connected int `json:"connected"`
Max int `json:"max"`
Peers[] OpenEthereumPeer `json:"peers"`
//Result field for ethSyncing
StartingBlock string `json:"startingBlock"`
CurrentBlock string `json:"currentBlock"`
HighestBlock string `json:"highestBlock"`
WarpChunksAmount string `json:"warpChunksAmount"`
WarpChunksProcessed string `json:"warpChunksProcessed"`
}
//OpenEthereum result peer field struct for json parsing
type OpenEthereumPeer struct {
//Peer client ID
Id string `json:"id"`
// Network OpenEthereumPeerNetwork `json:"network"`
// Caps [] string `json:"caps"`
// Name [] string `json:"name"`
// LocalAddress string `json:"network.localAddress"`
// RemoteAddress string `json:"network.remoteAddress"`
// Protocols []string `json:"protocols"`
}
//OpenEthereum result peer network field struct for json parsing
type OpenEthereumPeerNetwork struct {
//Peer local IP address
localAddress string
//Peer remote IP address
remoteAddress string
}
//Dump an http request to a string for debugging
func dumpRequest(handler http.Handler, method string, path string) string {
req, err := http.NewRequest(method, path, nil)
if err != nil {
panic(err)
}
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
dump, err := httputil.DumpResponse(rr.Result(), true)
if err != nil {
panic(err)
}
return strings.Replace(string(dump), "\r\n", "\n", -1)
}
//Calculate the block difference between the estimated current block and the block OpenEthereum is on
func blockDifference(aSyncData * aSyncDataStruct)(int64, error) {
//Check if OpenEthereum is not currently syncing
if strings.Contains(aSyncData.ethSyncResp.respBody, `"result":false`) {
return 0, nil
}
if strings.Contains(aSyncData.ethSyncResp.respBody, `failed`) {
err := fmt.Errorf("GET request failed")
return 0, err
}
//Convert the hex string to an integer
currentBlockInt, err := strconv.ParseInt(strings.Replace(
aSyncData.ethSyncResp.Result.CurrentBlock, "0x", "", -1), 16, 64)
if err != nil {
return -1, err
}
//Convert the hex string to an integer
highestBlockInt, err := strconv.ParseInt(strings.Replace(
aSyncData.ethSyncResp.Result.HighestBlock, "0x", "", -1), 16, 64)
if err != nil {
return -1, err
}
//Calculate how many blocks behind OpenEthereum is
blockDifference := highestBlockInt - currentBlockInt
return blockDifference, nil
}
//Perform all upstream checks at a set time interval in an asynchronous manner
func aSyncUpstream(aSyncData * aSyncDataStruct, config configStruct) {
//Request eth_syncing status from OpenEthereum and parse it
aSyncData.ethSyncCheck = healthcheck.Async(func() error {
//OpenEthereum API call to send
var jsonRequest = [] byte(`{"method":"eth_syncing","params":[],"id":1,"jsonrpc":"2.0"}`)
//Generate POST request to OpenEthereum
req, err := http.NewRequest("POST", "http://" + config.openethereumAPI,
bytes.NewBuffer(jsonRequest))
if err != nil {
panic(err)
}
req.Header.Set("X-Custom-Header", "openethereum-healthcheck-client")
req.Header.Set("Content-Type", "application/json")
client := & http.Client {}
//Perform POST request
resp, err := client.Do(req)
if err != nil {
aSyncData.ethSyncResp.respBody = "failed"
return (err)
}
defer resp.Body.Close()
//Decode reponse into a string for ease of use
body, err := ioutil.ReadAll(resp.Body)
aSyncData.ethSyncResp.respBody = string(body)
//Check if OpenEthereum is not currently syncing
if strings.Contains(aSyncData.ethSyncResp.respBody, `"result":false`) {
return nil
}
if strings.Contains(aSyncData.ethSyncResp.respBody, `failed`) {
err := fmt.Errorf("GET request failed")
return err
}
//If OpenEthereum is currently syncing, parse the response
err = json.Unmarshal(body, & aSyncData.ethSyncResp)
if err != nil {
return (err)
}
//Debug statement to print the response status, header, and body
if config.printRequests == true {
fmt.Println("response Status:", resp.Status)
fmt.Println("response Headers:", resp.Header)
fmt.Println("response Body:", aSyncData.ethSyncResp.respBody)
}
return err
}, config.pollingRate)
//Request eth_syncing status from OpenEthereum and parse it
aSyncData.parityNetPeersCheck = healthcheck.Async(func() error {
//OpenEthereum API call to send
var jsonRequest = [] byte(`{"method":"parity_netPeers","params":[],"id":1,"jsonrpc":"2.0"}`)
//Generate POST request to OpenEthereum
req, err := http.NewRequest("POST", "http://" + config.openethereumAPI,
bytes.NewBuffer(jsonRequest))
if err != nil {
panic(err)
}
req.Header.Set("X-Custom-Header",
"openethereum-healthcheck-client")
req.Header.Set("Content-Type", "application/json")
client := & http.Client {}
//Perform POST request
resp, err := client.Do(req)
if err != nil {
aSyncData.parityNetPeersResp.respBody = "failed"
return (err)
}
defer resp.Body.Close()
//Decode reponse into a string for ease of use
body, err := ioutil.ReadAll(resp.Body)
aSyncData.parityNetPeersResp.respBody = string(
body)
//Parse the response into a struct
err = json.Unmarshal(body, &aSyncData.parityNetPeersResp)
if err != nil {
return (err)
}
//Debug statement to print the response status, header, and body
if config.printRequests == true {
fmt.Println("response Status:", resp.Status)
fmt.Println("response Headers:", resp.Header)
fmt.Println("response Body:", aSyncData.parityNetPeersResp.respBody)
}
return err
}, config.pollingRate)
//Check OpenEthereum has more than peerMinimum peers currently connected to it
aSyncData.minimumPeersCheck = healthcheck.Async(func() error {
//Check if GET request to OpenEthereum failed
if strings.Contains(aSyncData.ethSyncResp.respBody, `failed`) {
err := fmt.Errorf("GET request failed")
return err
}
//Compare currently connected peers to config struct
if aSyncData.parityNetPeersResp.Result.Connected < config.peerMinimum {
err := fmt.Errorf("minimumPeers :%d",
aSyncData.parityNetPeersResp.Result.Connected)
return err
}
return nil
}, config.pollingRate)
//Check OpenEthereum is refreshing its currentBlock quicker than the blockUpdateTimeout
aSyncData.blockRefreshCheck = healthcheck.Async(func() error {
//Pause to allow request to be captured
time.Sleep(2*time.Second)
//Check if OpenEthereum is not currently syncing
if strings.Contains(aSyncData.ethSyncResp.respBody, `"result":false`) {
return nil
}
if strings.Contains(aSyncData.ethSyncResp.respBody, `failed`) {
err := fmt.Errorf("GET request failed")
return err
}
//Retrieve current block from response struct
currentBlock := aSyncData.ethSyncResp.Result.CurrentBlock
//Retrieve snapshot status
warpBlock := aSyncData.ethSyncResp.Result.WarpChunksProcessed
//Wait the blockUpdateTimeout
time.Sleep(config.blockUpdateTimeout)
//Check if the new block is equal to the old block
if currentBlock == aSyncData.ethSyncResp.Result.CurrentBlock {
if warpBlock == aSyncData.ethSyncResp.Result.WarpChunksProcessed {
err := fmt.Errorf("currentBlock/warpBlock have not refreshed within timeout")
return err
}
}
return nil
}, 2 * config.blockUpdateTimeout)
//Check if OpenEthereum is within blockSyncDifference from the estimated block
aSyncData.blockSyncCheck = healthcheck.Async(func() error {
//Check if OpenEthereum is not currently syncing
if strings.Contains(aSyncData.ethSyncResp.respBody, `"result":false`) {
return nil
}
if strings.Contains(aSyncData.ethSyncResp.respBody, `failed`) {
err := fmt.Errorf("GET request failed")
return err
}
//Retrieve the current block difference between the estimated block and the current block
currentBlockDifference,err := blockDifference(aSyncData)
if err != nil {
return err
}
//Compare to blockSyncDifference
if currentBlockDifference > config.blockSyncDifference {
err := fmt.Errorf("blockDifference :%d",
currentBlockDifference)
return err
}
return nil
}, config.pollingRate)
//Check if the OpenEthereum API is accepting pings
aSyncData.tcpDialCheck = healthcheck.Async(
healthcheck.TCPDialCheck(config.openethereumAPI,
config.requestTimeout), config.pollingRate)
}
//Expose the prometheus metrics API along with the raw responses from OpenEthereum
func openEthereumMetrics(health healthcheck.Handler, httpMux * http.ServeMux,
prometheusRegistry * prometheus.Registry,
aSyncData * aSyncDataStruct, config configStruct) {
//Create an endpoint to serve the raw eth_syncing response JSON
httpMux.HandleFunc("/eth-syncing-response", func(w http.ResponseWriter,
r * http.Request) {
fmt.Fprintf(w, aSyncData.ethSyncResp.respBody)
})
//Create an endpoint to serve the parity netPeers response JSON
httpMux.HandleFunc("/parity-netPeers-response", func(w http.ResponseWriter,
r * http.Request) {
fmt.Fprintf(w, aSyncData.parityNetPeersResp.respBody)
})
//Create an endpoint to serve the prometheus gauge
httpMux.Handle("/metrics", promhttp.HandlerFor(
prometheusRegistry, promhttp.HandlerOpts {}))
}
//Define which healthchecks to use for the readiness API and expose the readiness API
func openEthereumReadinessChecks(health healthcheck.Handler, httpMux *http.ServeMux,
aSyncData * aSyncDataStruct, config configStruct) {
//Add healthchecks to the readiness check
health.AddReadinessCheck(
"openethereum-api-status",
aSyncData.tcpDialCheck)
health.AddReadinessCheck(
"openethereum-sync-response-status",
aSyncData.ethSyncCheck)
health.AddReadinessCheck(
"openethereum-netpeers-response-status",
aSyncData.parityNetPeersCheck)
health.AddReadinessCheck(
"openethereum-sync-status",
aSyncData.blockSyncCheck)
health.AddReadinessCheck(
"openethereum-peer-status",
aSyncData.minimumPeersCheck)
health.AddReadinessCheck(
"openethereum-block-refresh-status",
aSyncData.blockRefreshCheck)
//Create an endpoint to serve the readiness check
httpMux.HandleFunc("/ready", health.ReadyEndpoint)
}
//Define which healthchecks to use for the liveness API and expose the liveness API
func openEthereumLivenessChecks(health healthcheck.Handler, httpMux *http.ServeMux,
aSyncData * aSyncDataStruct, config configStruct) {
//Add healthchecks to the liveness check
health.AddLivenessCheck(
"openethereum-api-status",
aSyncData.tcpDialCheck)
health.AddLivenessCheck(
"openethereum-sync-response-status",
aSyncData.ethSyncCheck)
health.AddLivenessCheck(
"openethereum-netpeers-response-status",
aSyncData.parityNetPeersCheck)
health.AddReadinessCheck(
"openethereum-peer-status",
aSyncData.minimumPeersCheck)
health.AddLivenessCheck(
"openethereum-block-refresh-status",
aSyncData.blockRefreshCheck)
//Create an endpoint to serve the liveness check
httpMux.HandleFunc("/live", health.LiveEndpoint)
}
//Monitor the HTTP server return codes and restart the server if they are incorrect
func serverWatchdog(config configStruct, srvCloser io.Closer) {
for {
//Wait long enough for the HTTP server to come up
time.Sleep(config.requestTimeout)
//Perform a GET request to the liveness endpoint and check the status code
res, err := http.Get("http://" + config.healthcheckRPC + "/live")
if err != nil {
if res.StatusCode != 503 {
err = srvCloser.Close()
if err != nil {
log.Fatalln("Server Close Error - ", err)
}
log.Println("Server Closed")
return
} else if res.StatusCode != 200 {
//The server is returning an unexpected status code
//Signal done to the waitgroup to trigger a restart of the server
err = srvCloser.Close()
if err != nil {
log.Fatalln("Server Close Error - ", err)
}
log.Println("Server Closed")
return}}
//Perform a GET request to the readiness endpoint and check the status code
res, err = http.Get("http://" + config.healthcheckRPC + "/ready")
if err != nil {
if res.StatusCode != 503 {
err = srvCloser.Close()
if err != nil {
log.Fatalln("Server Close Error - ", err)
}
log.Println("Server Closed")
return
} else if res.StatusCode != 200 {
//The server is returning an unexpected status code
//Signal done to the waitgroup to trigger a restart of the server
err = srvCloser.Close()
if err != nil {
log.Fatalln("Server Close Error - ", err)
}
log.Println("Server Closed")
return}}
newConfig := configStruct {}
newConfig.loadConfig()
if cmp.Equal(newConfig, config, cmp.AllowUnexported(configStruct{})) {
continue
} else {
err = srvCloser.Close()
if err != nil {
log.Fatalln("Server Close Error - ", err)
}
log.Println("Server Closed")
return
}
}
}
//Create the HTTP server and start a watchdog to monitor its return codes
func healthcheckServer(httpMux * http.ServeMux, config configStruct) {
//Spawn the HTTP server
srvCloser, err := ListenAndServeWithClose(config.healthcheckRPC, httpMux)
if err != nil {
log.Fatalln("ListenAndServeWithClose Error - ", err)
}
//Start the server watchdog
serverWatchdog(config, srvCloser)
return
}
//Start the healthcheck for OpenEthereum
func OpenEthereumHealthCheck(config configStruct, wg * sync.WaitGroup) {
//Defer wait group release
defer wg.Done()
//Allocate storage for the aSync calls
aSyncData := aSyncDataStruct {}
//Create the main healthcheck handler
health := healthcheck.NewHandler()
//Create an HTTP server mux to serve the endpoints
httpMux := http.NewServeMux()
//Create an prometheus registry for the gauges
prometheusRegistry := prometheus.NewRegistry()
//Schedule the async calls
aSyncUpstream(&aSyncData, config)
//Define which healthchecks to use for the liveness API and expose the liveness API
openEthereumLivenessChecks(health, httpMux, &aSyncData, config)
//Define which healthchecks to use for the readiness API and expose the readiness API
openEthereumReadinessChecks(health, httpMux, &aSyncData, config)
//Expose the prometheus metrics API along with the raw responses from OpenEthereum
openEthereumMetrics(health, httpMux, prometheusRegistry, &aSyncData, config)
//Create the HTTP server and start a watchdog to monitor its return codes
healthcheckServer(httpMux, config)
}
func main() {
//Create the configuration struct
config := configStruct {}
//Load the default configuration
config.loadConfig()
//Create a waitgroup and spawn the healthcheck in a loop
//to allow the watchdog to function
var wg sync.WaitGroup
for {
fmt.Println(config)
wg.Add(1)
OpenEthereumHealthCheck(config, & wg)
wg.Wait()
config.loadConfig()
time.Sleep(2 * time.Second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment