Created
February 23, 2020 08:34
-
-
Save alexellis/f232fe1fd2fe5278d411732af2645d79 to your computer and use it in GitHub Desktop.
cri-logging-example.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"os" | |
"os/exec" | |
"path" | |
"sync" | |
"time" | |
"github.com/alexellis/logs-test/runtime" | |
"github.com/sirupsen/logrus" | |
) | |
const ( | |
// delimiter used in CRI logging format. | |
delimiter = ' ' | |
// eof is end-of-line. | |
eol = '\n' | |
// timestampFormat is the timestamp format used in CRI logging format. | |
timestampFormat = time.RFC3339Nano | |
// defaultBufSize is the default size of the read buffer in bytes. | |
defaultBufSize = 4096 | |
) | |
// StreamType is the type of the stream, stdout/stderr. | |
type StreamType string | |
const ( | |
// Stdin stream type. | |
Stdin StreamType = "stdin" | |
// Stdout stream type. | |
Stdout StreamType = StreamType(runtime.Stdout) | |
// Stderr stream type. | |
Stderr StreamType = StreamType(runtime.Stderr) | |
) | |
func main() { | |
res := exec.Command("/sbin/ping", "192.168.0.1") | |
p, pErr := res.StdoutPipe() | |
if pErr != nil { | |
panic(pErr) | |
} | |
// go func() { | |
// io.Copy(os.Stdout, p) | |
// }() | |
err := res.Start() | |
if err != nil { | |
panic(err) | |
} | |
time.Sleep(time.Second * 5) | |
filePath := path.Join(os.TempDir(), "my-log.txt") | |
fileIn, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) | |
if err != nil { | |
panic(err) | |
} | |
defer fileIn.Close() | |
fmt.Println("file path ", filePath) | |
closer, chann := NewCRILogger(filePath, | |
fileIn, | |
Stdout, | |
1000) | |
defer closer.Close() | |
fmt.Println(closer, chann) | |
wg := sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
io.Copy(closer, p) | |
}() | |
go func() { | |
time.Sleep(time.Second * 10) | |
wg.Done() | |
}() | |
wg.Wait() | |
content, err := ioutil.ReadFile(filePath) | |
fmt.Println(string(content), err) | |
} | |
// NewCRILogger returns a write closer which redirect container log into | |
// log file, and decorate the log line into CRI defined format. It also | |
// returns a channel which indicates whether the logger is stopped. | |
// maxLen is the max length limit of a line. A line longer than the | |
// limit will be cut into multiple lines. | |
func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.WriteCloser, <-chan struct{}) { | |
logrus.Debugf("Start writing stream %q to log file %q", stream, path) | |
prc, pwc := io.Pipe() | |
stop := make(chan struct{}) | |
go func() { | |
redirectLogs(path, prc, w, stream, maxLen) | |
close(stop) | |
}() | |
return pwc, stop | |
} | |
// bufio.ReadLine in golang eats both read errors and tailing newlines | |
// (See https://golang.org/pkg/bufio/#Reader.ReadLine). When reading | |
// to io.EOF, it is impossible for the caller to figure out whether | |
// there is a newline at the end, for example: | |
// 1) When reading "CONTENT\n", it returns "CONTENT" without error; | |
// 2) When reading "CONTENT", it also returns "CONTENT" without error. | |
// | |
// To differentiate these 2 cases, we need to write a readLine function | |
// ourselves to not ignore the error. | |
// | |
// The code is similar with https://golang.org/src/bufio/bufio.go?s=9537:9604#L359. | |
// The only difference is that it returns all errors from `ReadSlice`. | |
// | |
// readLine returns err != nil if and only if line does not end with a new line. | |
func readLine(b *bufio.Reader) (line []byte, isPrefix bool, err error) { | |
line, err = b.ReadSlice('\n') | |
if err == bufio.ErrBufferFull { | |
// Handle the case where "\r\n" straddles the buffer. | |
if len(line) > 0 && line[len(line)-1] == '\r' { | |
// Unread the last '\r' | |
if err := b.UnreadByte(); err != nil { | |
panic(fmt.Sprintf("invalid unread %v", err)) | |
} | |
line = line[:len(line)-1] | |
} | |
return line, true, nil | |
} | |
if len(line) == 0 { | |
if err != nil { | |
line = nil | |
} | |
return | |
} | |
if line[len(line)-1] == '\n' { | |
// "ReadSlice returns err != nil if and only if line does not end in delim" | |
// (See https://golang.org/pkg/bufio/#Reader.ReadSlice). | |
if err != nil { | |
panic(fmt.Sprintf("full read with unexpected error %v", err)) | |
} | |
drop := 1 | |
if len(line) > 1 && line[len(line)-2] == '\r' { | |
drop = 2 | |
} | |
line = line[:len(line)-drop] | |
} | |
return | |
} | |
func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) { | |
defer rc.Close() | |
var ( | |
stream = []byte(s) | |
delimiter = []byte{delimiter} | |
partial = []byte(runtime.LogTagPartial) | |
full = []byte(runtime.LogTagFull) | |
buf [][]byte | |
length int | |
bufSize = defaultBufSize | |
) | |
// Make sure bufSize <= maxLen | |
if maxLen > 0 && maxLen < bufSize { | |
bufSize = maxLen | |
} | |
r := bufio.NewReaderSize(rc, bufSize) | |
writeLine := func(tag, line []byte) { | |
timestamp := time.Now().AppendFormat(nil, timestampFormat) | |
data := bytes.Join([][]byte{timestamp, stream, tag, line}, delimiter) | |
data = append(data, eol) | |
if _, err := w.Write(data); err != nil { | |
logrus.WithError(err).Errorf("Fail to write %q log to log file %q", s, path) | |
// Continue on write error to drain the container output. | |
} | |
} | |
for { | |
var stop bool | |
newLine, isPrefix, err := readLine(r) | |
// NOTE(random-liu): readLine can return actual content even if there is an error. | |
if len(newLine) > 0 { | |
// Buffer returned by ReadLine will change after | |
// next read, copy it. | |
l := make([]byte, len(newLine)) | |
copy(l, newLine) | |
buf = append(buf, l) | |
length += len(l) | |
} | |
if err != nil { | |
if err == io.EOF { | |
logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path) | |
} else { | |
logrus.WithError(err).Errorf("An error occurred when redirecting stream %q to log file %q", s, path) | |
} | |
if length == 0 { | |
// No content left to write, break. | |
break | |
} | |
// Stop after writing the content left in buffer. | |
stop = true | |
} | |
if maxLen > 0 && length > maxLen { | |
exceedLen := length - maxLen | |
last := buf[len(buf)-1] | |
if exceedLen > len(last) { | |
// exceedLen must <= len(last), or else the buffer | |
// should have be written in the previous iteration. | |
panic("exceed length should <= last buffer size") | |
} | |
buf[len(buf)-1] = last[:len(last)-exceedLen] | |
writeLine(partial, bytes.Join(buf, nil)) | |
buf = [][]byte{last[len(last)-exceedLen:]} | |
length = exceedLen | |
} | |
if isPrefix { | |
continue | |
} | |
if stop { | |
// readLine only returns error when the message doesn't | |
// end with a newline, in that case it should be treated | |
// as a partial line. | |
writeLine(partial, bytes.Join(buf, nil)) | |
} else { | |
writeLine(full, bytes.Join(buf, nil)) | |
} | |
buf = nil | |
length = 0 | |
if stop { | |
break | |
} | |
} | |
logrus.Debugf("Finish redirecting stream %q to log file %q", s, path) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment