Experimenting trying to tail the mysql slow query log from a database in Amazon RDS
package main | |
import ( | |
"fmt" | |
"log" | |
"strconv" | |
"strings" | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/rds" | |
flag "github.com/jessevdk/go-flags" | |
) | |
// Opts are flags to the CLI | |
type Opts struct { | |
Marker string `short:"m" description:"marker from which to start"` | |
SlowLog string `short:"l" description:"slow log name" default:"slowquery/mysql-slowquery.log"` | |
Region string `short:"r" description:"AWS region" default:"us-east-1"` | |
InstanceIdentifier string `short:"i" long:"identifier" description:"RDS instance identifier"` | |
SleepTime int `short:"s" description:"amount of time to sleep (in seconds) before asking for new data when there's none immediately available." default:"5"` | |
} | |
type app struct { | |
RDS *rds.RDS | |
Options *Opts | |
noDataCount int | |
} | |
type streamPos struct { | |
logFile LogFile | |
marker *string | |
} | |
// LogFile represents one file returned by ListFiles | |
type LogFile struct { | |
Size int64 // in bytes? | |
LogFileName string | |
LastWritten int64 // arrives as msec since epoch | |
LastWrittenTime time.Time | |
Path string | |
} | |
func main() { | |
var opts Opts | |
fp := flag.NewParser(&opts, flag.Default) | |
fp.Parse() | |
fmt.Printf("options: %+v\n", opts) | |
a := &app{ | |
Options: &opts, | |
RDS: rds.New(session.New(), &aws.Config{ | |
Region: aws.String(opts.Region), | |
}), | |
} | |
sPos := streamPos{ | |
logFile: LogFile{LogFileName: opts.SlowLog}, | |
} | |
if opts.Marker != "" { | |
sPos.marker = &opts.Marker | |
} | |
for { | |
resp, err := a.getRecentEntries(sPos) | |
if err != nil { | |
log.Fatal(err) | |
} | |
// don't print out all the logs we just got - it's a lot of text! | |
// f := bufio.NewWriter(os.Stdout) | |
// f.WriteString(*resp.LogFileData) | |
// f.Flush() | |
fmt.Printf("%s ", time.Now().UTC().Format(time.Stamp)) | |
if resp.LogFileData != nil { | |
// pull out the first and last timestamp, report markers. | |
s := *resp.LogFileData | |
times := strings.Split(s, "SET timestamp=") | |
if len(times) > 2 { | |
firstTime, _ := strconv.Atoi(strings.SplitN(times[1], ";", 2)[0]) | |
lastTime, _ := strconv.Atoi(strings.SplitN(times[len(times)-1], ";", 2)[0]) | |
fmt.Printf("starting from %s, got %d content, next section from %s. first time is %s, last time is %s ", | |
*sPos.marker, len(*resp.LogFileData), *resp.Marker, | |
time.Unix(int64(firstTime), 0).UTC().Format("15:04:05"), | |
time.Unix(int64(lastTime), 0).UTC().Format("15:04:05"), | |
) | |
} else { | |
if sPos.marker == nil { | |
s := "nil" | |
sPos.marker = &s | |
} | |
fmt.Printf("starting from %s, got %d content, next section from %s, content: %s ", | |
*sPos.marker, len(*resp.LogFileData), *resp.Marker, *resp.LogFileData) | |
} | |
} else { | |
fmt.Printf("starting from %s, got nil content, next section from %s", *sPos.marker, *resp.Marker) | |
} | |
if !*resp.AdditionalDataPending { | |
fmt.Printf("sleeping %d sec...\n", opts.SleepTime) | |
time.Sleep(time.Duration(opts.SleepTime) * time.Second) | |
} else { | |
fmt.Println("") | |
} | |
sPos.marker = a.getNextMarker(sPos, resp) | |
} | |
} | |
func (a *app) getRecentEntries(sPos streamPos) (*rds.DownloadDBLogFilePortionOutput, error) { | |
params := &rds.DownloadDBLogFilePortionInput{ | |
DBInstanceIdentifier: aws.String(a.Options.InstanceIdentifier), | |
LogFileName: aws.String(sPos.logFile.LogFileName), | |
} | |
// if we have a marker, download from there. otherwise get the most recent line | |
if sPos.marker != nil { | |
params.Marker = sPos.marker | |
} else { | |
// params.Marker = aws.String("0") | |
params.NumberOfLines = aws.Int64(1) | |
} | |
return a.RDS.DownloadDBLogFilePortion(params) | |
} | |
// getNextMarker takes in to account the current and next reported markers and | |
// decides whether to believe the resp.Marker or calculate its own next marker. | |
func (a *app) getNextMarker(sPos streamPos, resp *rds.DownloadDBLogFilePortionOutput) *string { | |
// when we get to the end of a log file, the marker in resp is "0". | |
// if it's not "0", we should trust it's correct and use it. | |
if *resp.Marker != "0" { | |
fmt.Printf("resp.Marker %s != 0. using %s ", *resp.Marker, *resp.Marker) | |
return resp.Marker | |
} | |
// ok, we've hit the end of a segment, but did we get any data? If we got | |
// data, then it's not really the end of the segment and we should calculate a | |
// new marker and use that. | |
if len(*resp.LogFileData) != 0 { | |
newMarkerStr, err := sPos.Add(len(*resp.LogFileData)) | |
if err != nil { | |
fmt.Printf("failed to get next marker. Reverting to no marker. %s\n", err) | |
return nil | |
} | |
fmt.Printf("resp.Marker is 0 but had data. Using %s ", newMarkerStr) | |
return &newMarkerStr | |
} | |
// we hit the end of a file but we didn't get any data. we should try again | |
// for one minute (12*5s), but eventually roll over to the marker for the next hour. | |
if a.noDataCount > 12 { | |
// ok, move on to the next hour | |
fmt.Printf("resp.Marker is 0 and had no data more than 5 times. using %s ", *resp.Marker) | |
a.noDataCount = 0 | |
return resp.Marker | |
} | |
// let's try again from the same place. | |
a.noDataCount++ | |
fmt.Printf("resp.Marker is 0 and had no data. trying again with %s ", *sPos.marker) | |
return sPos.marker | |
} | |
// Add returns a new marker string that is the current marker + dataLen offset | |
func (s *streamPos) Add(dataLen int) (string, error) { | |
splitMarker := strings.Split(*s.marker, ":") | |
if len(splitMarker) != 2 { | |
// something's wrong. marker should have been #:# | |
return "", fmt.Errorf("marker didn't split into two pieces across a colon") | |
} | |
mHour, _ := strconv.Atoi(splitMarker[0]) | |
mOffset, _ := strconv.Atoi(splitMarker[1]) | |
mOffset += dataLen | |
return fmt.Sprintf("%d:%d", mHour, mOffset), nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment