Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Kinesisメモ

GoでKinesis用ライブラリを作るメモ

Writer

Kinesisにデータを送るクラス。

コネクションは呼び出し側で作成してもらう。 それを引数にしてWriterを生成する。

Addメソッドで行を追加していく。 例えば改行区切りのデータがあった場合、それを分割するのは呼び出し元の責務。 ライブラリは渡された[]byteを1行として扱うだけ。

Writer内部で行はバッファリングされ、時間経過もしくは一定量が溜まったらKinesisへと送信する。

Kinesis側への送信は非同期で行う。 ライブラリ内でGoroutineを起動するのは、作法としてはよくないような気もするが……。 バッファが空になったらGoroutineは終了するようにすれば、リソースを無駄に使わないからいいか。

NewWriter(conn *kinesis.Kinesis, name string) *Writer

新しいWriterを作成する。

Add(data []byte) error

Writerに行を追加する。

追加された行はWriter内部でバッファリングされ、 時間経過もしくは一定量が溜まったらKinesisへと送信される。

dataのサイズが大きすぎる場合はerrorが返る。

Wait()

Writer内の行がすべてKinesisへ送信されるまで待つ。

SetLogger()

内部の動作を追う場合に使う。 いらない?

GetStats() Stats

送信した行数やバイト数などの情報を返す。

  • 行数
  • バイト数
  • エラー行数
  • リトライ行数
  • 現在のバッファ内の行数
  • 直近のKinesisのレイテンシ
  • 直近の秒間の受信行数
  • 直近の秒間の送信行数

くらいがあればいい?

SetErrorHandler()

Kinesis側からのエラー応答を受け取る関数。

送信が非同期でerrorを返せないので、こういった方式で渡す。 chanだと呼び出し側でGoroutineを作って待ち受けないといけない。 どっちがいいか?

chanだと:

writer.SetErrorChan(errChan)

go func() {
    for errInfo := range errChan {
        // ここでエラー処理してもらう
        // といってもリトライはライブラリ側で行う
        // ログなどに書いてもらうだけ?
    }
}()

funcだと:

writer.SetErrorFunc(errFunc)

func errFunc(data []byte, code, message string) {
    // ログなどに書いてもらうなど
}

funcの場合、ハンドラで長い処理を書いてもらいたくはないが……。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.