Last active
October 10, 2016 16:36
-
-
Save maplebed/eb0f6f0e3dc50e8bad5170c8e2e75582 to your computer and use it in GitHub Desktop.
Experimenting trying to tail the mysql slow query log from a database in Amazon RDS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"strconv" | |
"strings" | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/rds" | |
flag "github.com/jessevdk/go-flags" | |
) | |
// Opts are flags to the CLI | |
type Opts struct { | |
Marker string `short:"m" description:"marker from which to start"` | |
SlowLog string `short:"l" description:"slow log name" default:"slowquery/mysql-slowquery.log"` | |
Region string `short:"r" description:"AWS region" default:"us-east-1"` | |
InstanceIdentifier string `short:"i" long:"identifier" description:"RDS instance identifier"` | |
SleepTime int `short:"s" description:"amount of time to sleep (in seconds) before asking for new data when there's none immediately available." default:"5"` | |
} | |
type app struct { | |
RDS *rds.RDS | |
Options *Opts | |
noDataCount int | |
} | |
type streamPos struct { | |
logFile LogFile | |
marker *string | |
} | |
// LogFile represents one file returned by ListFiles | |
type LogFile struct { | |
Size int64 // in bytes? | |
LogFileName string | |
LastWritten int64 // arrives as msec since epoch | |
LastWrittenTime time.Time | |
Path string | |
} | |
func main() { | |
var opts Opts | |
fp := flag.NewParser(&opts, flag.Default) | |
fp.Parse() | |
fmt.Printf("options: %+v\n", opts) | |
a := &app{ | |
Options: &opts, | |
RDS: rds.New(session.New(), &aws.Config{ | |
Region: aws.String(opts.Region), | |
}), | |
} | |
sPos := streamPos{ | |
logFile: LogFile{LogFileName: opts.SlowLog}, | |
} | |
if opts.Marker != "" { | |
sPos.marker = &opts.Marker | |
} | |
for { | |
resp, err := a.getRecentEntries(sPos) | |
if err != nil { | |
log.Fatal(err) | |
} | |
// don't print out all the logs we just got - it's a lot of text! | |
// f := bufio.NewWriter(os.Stdout) | |
// f.WriteString(*resp.LogFileData) | |
// f.Flush() | |
fmt.Printf("%s ", time.Now().UTC().Format(time.Stamp)) | |
if resp.LogFileData != nil { | |
// pull out the first and last timestamp, report markers. | |
s := *resp.LogFileData | |
times := strings.Split(s, "SET timestamp=") | |
if len(times) > 2 { | |
firstTime, _ := strconv.Atoi(strings.SplitN(times[1], ";", 2)[0]) | |
lastTime, _ := strconv.Atoi(strings.SplitN(times[len(times)-1], ";", 2)[0]) | |
fmt.Printf("starting from %s, got %d content, next section from %s. first time is %s, last time is %s ", | |
*sPos.marker, len(*resp.LogFileData), *resp.Marker, | |
time.Unix(int64(firstTime), 0).UTC().Format("15:04:05"), | |
time.Unix(int64(lastTime), 0).UTC().Format("15:04:05"), | |
) | |
} else { | |
if sPos.marker == nil { | |
s := "nil" | |
sPos.marker = &s | |
} | |
fmt.Printf("starting from %s, got %d content, next section from %s, content: %s ", | |
*sPos.marker, len(*resp.LogFileData), *resp.Marker, *resp.LogFileData) | |
} | |
} else { | |
fmt.Printf("starting from %s, got nil content, next section from %s", *sPos.marker, *resp.Marker) | |
} | |
if !*resp.AdditionalDataPending { | |
fmt.Printf("sleeping %d sec...\n", opts.SleepTime) | |
time.Sleep(time.Duration(opts.SleepTime) * time.Second) | |
} else { | |
fmt.Println("") | |
} | |
sPos.marker = a.getNextMarker(sPos, resp) | |
} | |
} | |
func (a *app) getRecentEntries(sPos streamPos) (*rds.DownloadDBLogFilePortionOutput, error) { | |
params := &rds.DownloadDBLogFilePortionInput{ | |
DBInstanceIdentifier: aws.String(a.Options.InstanceIdentifier), | |
LogFileName: aws.String(sPos.logFile.LogFileName), | |
} | |
// if we have a marker, download from there. otherwise get the most recent line | |
if sPos.marker != nil { | |
params.Marker = sPos.marker | |
} else { | |
// params.Marker = aws.String("0") | |
params.NumberOfLines = aws.Int64(1) | |
} | |
return a.RDS.DownloadDBLogFilePortion(params) | |
} | |
// getNextMarker takes in to account the current and next reported markers and | |
// decides whether to believe the resp.Marker or calculate its own next marker. | |
func (a *app) getNextMarker(sPos streamPos, resp *rds.DownloadDBLogFilePortionOutput) *string { | |
// when we get to the end of a log file, the marker in resp is "0". | |
// if it's not "0", we should trust it's correct and use it. | |
if *resp.Marker != "0" { | |
fmt.Printf("resp.Marker %s != 0. using %s ", *resp.Marker, *resp.Marker) | |
return resp.Marker | |
} | |
// ok, we've hit the end of a segment, but did we get any data? If we got | |
// data, then it's not really the end of the segment and we should calculate a | |
// new marker and use that. | |
if len(*resp.LogFileData) != 0 { | |
newMarkerStr, err := sPos.Add(len(*resp.LogFileData)) | |
if err != nil { | |
fmt.Printf("failed to get next marker. Reverting to no marker. %s\n", err) | |
return nil | |
} | |
fmt.Printf("resp.Marker is 0 but had data. Using %s ", newMarkerStr) | |
return &newMarkerStr | |
} | |
// we hit the end of a file but we didn't get any data. we should try again | |
// for one minute (12*5s), but eventually roll over to the marker for the next hour. | |
if a.noDataCount > 12 { | |
// ok, move on to the next hour | |
fmt.Printf("resp.Marker is 0 and had no data more than 5 times. using %s ", *resp.Marker) | |
a.noDataCount = 0 | |
return resp.Marker | |
} | |
// let's try again from the same place. | |
a.noDataCount++ | |
fmt.Printf("resp.Marker is 0 and had no data. trying again with %s ", *sPos.marker) | |
return sPos.marker | |
} | |
// Add returns a new marker string that is the current marker + dataLen offset | |
func (s *streamPos) Add(dataLen int) (string, error) { | |
splitMarker := strings.Split(*s.marker, ":") | |
if len(splitMarker) != 2 { | |
// something's wrong. marker should have been #:# | |
return "", fmt.Errorf("marker didn't split into two pieces across a colon") | |
} | |
mHour, _ := strconv.Atoi(splitMarker[0]) | |
mOffset, _ := strconv.Atoi(splitMarker[1]) | |
mOffset += dataLen | |
return fmt.Sprintf("%d:%d", mHour, mOffset), nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment