Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save achille-roussel/6d512e69fc1471ef9584e520eae8617c to your computer and use it in GitHub Desktop.

Select an option

Save achille-roussel/6d512e69fc1471ef9584e520eae8617c to your computer and use it in GitHub Desktop.
kafka-go record batch ideas
type Record struct {
Key []byte
Value []byte
}
func (b *RecordBatch) Close() error { ... }
func (b *RecordBatch) ReadRecords(records []Record) (int, error) { ... }
func NewRecordBatch(records ...Record) *RecordBatch {
}
// in ReadRecords
for i := range records {
r, err := b.readNextRecord()
...
records[i].Key = append(records[i].Key[:0], r.key...)
records[i].Value = append(records[i].Value[:0], r.value...)
}
func (r *Reader) ReadBatch() (*RecordBatch, error) { ... }
for {
batch, err := r.ReadBatch()
if err != nil {
...
}
handleRecordBatch(batch)
r.CommitBatch(batch) ?
r.CommitOffset(batch.Offset() + batch.NumRecords()) ?
}
func handleRecordBatch(batch *kafka.RecordBatch) {
defer batch.Close()
...
records := make([]kafka.Record, ...)
for {
n, err := batch.ReadRecords(records)
...
for i := range records[:n] {
...
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment