Skip to content

Instantly share code, notes, and snippets.

@maplebed
Last active October 10, 2016 16:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maplebed/eb0f6f0e3dc50e8bad5170c8e2e75582 to your computer and use it in GitHub Desktop.
Save maplebed/eb0f6f0e3dc50e8bad5170c8e2e75582 to your computer and use it in GitHub Desktop.
Experimenting trying to tail the mysql slow query log from a database in Amazon RDS
package main
import (
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/rds"
flag "github.com/jessevdk/go-flags"
)
// Opts are flags to the CLI
type Opts struct {
Marker string `short:"m" description:"marker from which to start"`
SlowLog string `short:"l" description:"slow log name" default:"slowquery/mysql-slowquery.log"`
Region string `short:"r" description:"AWS region" default:"us-east-1"`
InstanceIdentifier string `short:"i" long:"identifier" description:"RDS instance identifier"`
SleepTime int `short:"s" description:"amount of time to sleep (in seconds) before asking for new data when there's none immediately available." default:"5"`
}
type app struct {
RDS *rds.RDS
Options *Opts
noDataCount int
}
type streamPos struct {
logFile LogFile
marker *string
}
// LogFile represents one file returned by ListFiles
type LogFile struct {
Size int64 // in bytes?
LogFileName string
LastWritten int64 // arrives as msec since epoch
LastWrittenTime time.Time
Path string
}
func main() {
var opts Opts
fp := flag.NewParser(&opts, flag.Default)
fp.Parse()
fmt.Printf("options: %+v\n", opts)
a := &app{
Options: &opts,
RDS: rds.New(session.New(), &aws.Config{
Region: aws.String(opts.Region),
}),
}
sPos := streamPos{
logFile: LogFile{LogFileName: opts.SlowLog},
}
if opts.Marker != "" {
sPos.marker = &opts.Marker
}
for {
resp, err := a.getRecentEntries(sPos)
if err != nil {
log.Fatal(err)
}
// don't print out all the logs we just got - it's a lot of text!
// f := bufio.NewWriter(os.Stdout)
// f.WriteString(*resp.LogFileData)
// f.Flush()
fmt.Printf("%s ", time.Now().UTC().Format(time.Stamp))
if resp.LogFileData != nil {
// pull out the first and last timestamp, report markers.
s := *resp.LogFileData
times := strings.Split(s, "SET timestamp=")
if len(times) > 2 {
firstTime, _ := strconv.Atoi(strings.SplitN(times[1], ";", 2)[0])
lastTime, _ := strconv.Atoi(strings.SplitN(times[len(times)-1], ";", 2)[0])
fmt.Printf("starting from %s, got %d content, next section from %s. first time is %s, last time is %s ",
*sPos.marker, len(*resp.LogFileData), *resp.Marker,
time.Unix(int64(firstTime), 0).UTC().Format("15:04:05"),
time.Unix(int64(lastTime), 0).UTC().Format("15:04:05"),
)
} else {
if sPos.marker == nil {
s := "nil"
sPos.marker = &s
}
fmt.Printf("starting from %s, got %d content, next section from %s, content: %s ",
*sPos.marker, len(*resp.LogFileData), *resp.Marker, *resp.LogFileData)
}
} else {
fmt.Printf("starting from %s, got nil content, next section from %s", *sPos.marker, *resp.Marker)
}
if !*resp.AdditionalDataPending {
fmt.Printf("sleeping %d sec...\n", opts.SleepTime)
time.Sleep(time.Duration(opts.SleepTime) * time.Second)
} else {
fmt.Println("")
}
sPos.marker = a.getNextMarker(sPos, resp)
}
}
func (a *app) getRecentEntries(sPos streamPos) (*rds.DownloadDBLogFilePortionOutput, error) {
params := &rds.DownloadDBLogFilePortionInput{
DBInstanceIdentifier: aws.String(a.Options.InstanceIdentifier),
LogFileName: aws.String(sPos.logFile.LogFileName),
}
// if we have a marker, download from there. otherwise get the most recent line
if sPos.marker != nil {
params.Marker = sPos.marker
} else {
// params.Marker = aws.String("0")
params.NumberOfLines = aws.Int64(1)
}
return a.RDS.DownloadDBLogFilePortion(params)
}
// getNextMarker takes in to account the current and next reported markers and
// decides whether to believe the resp.Marker or calculate its own next marker.
func (a *app) getNextMarker(sPos streamPos, resp *rds.DownloadDBLogFilePortionOutput) *string {
// when we get to the end of a log file, the marker in resp is "0".
// if it's not "0", we should trust it's correct and use it.
if *resp.Marker != "0" {
fmt.Printf("resp.Marker %s != 0. using %s ", *resp.Marker, *resp.Marker)
return resp.Marker
}
// ok, we've hit the end of a segment, but did we get any data? If we got
// data, then it's not really the end of the segment and we should calculate a
// new marker and use that.
if len(*resp.LogFileData) != 0 {
newMarkerStr, err := sPos.Add(len(*resp.LogFileData))
if err != nil {
fmt.Printf("failed to get next marker. Reverting to no marker. %s\n", err)
return nil
}
fmt.Printf("resp.Marker is 0 but had data. Using %s ", newMarkerStr)
return &newMarkerStr
}
// we hit the end of a file but we didn't get any data. we should try again
// for one minute (12*5s), but eventually roll over to the marker for the next hour.
if a.noDataCount > 12 {
// ok, move on to the next hour
fmt.Printf("resp.Marker is 0 and had no data more than 5 times. using %s ", *resp.Marker)
a.noDataCount = 0
return resp.Marker
}
// let's try again from the same place.
a.noDataCount++
fmt.Printf("resp.Marker is 0 and had no data. trying again with %s ", *sPos.marker)
return sPos.marker
}
// Add returns a new marker string that is the current marker + dataLen offset
func (s *streamPos) Add(dataLen int) (string, error) {
splitMarker := strings.Split(*s.marker, ":")
if len(splitMarker) != 2 {
// something's wrong. marker should have been #:#
return "", fmt.Errorf("marker didn't split into two pieces across a colon")
}
mHour, _ := strconv.Atoi(splitMarker[0])
mOffset, _ := strconv.Atoi(splitMarker[1])
mOffset += dataLen
return fmt.Sprintf("%d:%d", mHour, mOffset), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment