Last active
February 5, 2021 11:20
-
-
Save navono/5d469b774910f7ea4bb6794bbbdf1945 to your computer and use it in GitHub Desktop.
watermill sqlite3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "fmt" | |
type DefaultSQLite3OffsetsAdapter struct { | |
// GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated. | |
GenerateMessagesOffsetsTableName func(topic string) string | |
} | |
func (a DefaultSQLite3OffsetsAdapter) SchemaInitializingQueries(topic string) []string { | |
return []string{` | |
CREATE TABLE IF NOT EXISTS ` + a.MessagesOffsetsTable(topic) + ` ( | |
consumer_group VARCHAR(255) NOT NULL, | |
offset_acked INTEGER, | |
offset_consumed INTEGER NOT NULL, | |
PRIMARY KEY(consumer_group) | |
)`} | |
} | |
func (a DefaultSQLite3OffsetsAdapter) AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{}) { | |
ackQuery := `UPDATE ` + a.MessagesOffsetsTable(topic) + ` SET offset_acked = ? WHERE consumer_group = ?` | |
return ackQuery, []interface{}{offset, consumerGroup} | |
} | |
func (a DefaultSQLite3OffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) { | |
return `SELECT COALESCE( | |
(SELECT offset_acked | |
FROM ` + a.MessagesOffsetsTable(topic) + ` | |
WHERE consumer_group=? | |
), 0)`, | |
[]interface{}{consumerGroup} | |
} | |
func (a DefaultSQLite3OffsetsAdapter) MessagesOffsetsTable(topic string) string { | |
if a.GenerateMessagesOffsetsTableName != nil { | |
return a.GenerateMessagesOffsetsTableName(topic) | |
} | |
return fmt.Sprintf("`watermill_offsets_%s`", topic) | |
} | |
func (a DefaultSQLite3OffsetsAdapter) ConsumedMessageQuery( | |
topic string, | |
offset int, | |
consumerGroup string, | |
consumerULID []byte, | |
) (string, []interface{}) { | |
// offset_consumed is not queried anywhere, it's used only to detect race conditions with NextOffsetQuery. | |
ackQuery := `INSERT OR REPLACE INTO ` + a.MessagesOffsetsTable(topic) + ` (offset_consumed, consumer_group) | |
VALUES (?, ?);` | |
return ackQuery, []interface{}{offset, consumerGroup} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment