Skip to content

Instantly share code, notes, and snippets.

@navono
Last active February 5, 2021 11:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save navono/5d469b774910f7ea4bb6794bbbdf1945 to your computer and use it in GitHub Desktop.
Save navono/5d469b774910f7ea4bb6794bbbdf1945 to your computer and use it in GitHub Desktop.
watermill sqlite3
package main
import "fmt"
type DefaultSQLite3OffsetsAdapter struct {
// GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated.
GenerateMessagesOffsetsTableName func(topic string) string
}
func (a DefaultSQLite3OffsetsAdapter) SchemaInitializingQueries(topic string) []string {
return []string{`
CREATE TABLE IF NOT EXISTS ` + a.MessagesOffsetsTable(topic) + ` (
consumer_group VARCHAR(255) NOT NULL,
offset_acked INTEGER,
offset_consumed INTEGER NOT NULL,
PRIMARY KEY(consumer_group)
)`}
}
func (a DefaultSQLite3OffsetsAdapter) AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{}) {
ackQuery := `UPDATE ` + a.MessagesOffsetsTable(topic) + ` SET offset_acked = ? WHERE consumer_group = ?`
return ackQuery, []interface{}{offset, consumerGroup}
}
func (a DefaultSQLite3OffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) {
return `SELECT COALESCE(
(SELECT offset_acked
FROM ` + a.MessagesOffsetsTable(topic) + `
WHERE consumer_group=?
), 0)`,
[]interface{}{consumerGroup}
}
func (a DefaultSQLite3OffsetsAdapter) MessagesOffsetsTable(topic string) string {
if a.GenerateMessagesOffsetsTableName != nil {
return a.GenerateMessagesOffsetsTableName(topic)
}
return fmt.Sprintf("`watermill_offsets_%s`", topic)
}
func (a DefaultSQLite3OffsetsAdapter) ConsumedMessageQuery(
topic string,
offset int,
consumerGroup string,
consumerULID []byte,
) (string, []interface{}) {
// offset_consumed is not queried anywhere, it's used only to detect race conditions with NextOffsetQuery.
ackQuery := `INSERT OR REPLACE INTO ` + a.MessagesOffsetsTable(topic) + ` (offset_consumed, consumer_group)
VALUES (?, ?);`
return ackQuery, []interface{}{offset, consumerGroup}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment