Skip to content

Instantly share code, notes, and snippets.

@navono
Created February 5, 2021 11:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save navono/dec41669fb9e2386ea161cc8273e9111 to your computer and use it in GitHub Desktop.
Save navono/dec41669fb9e2386ea161cc8273e9111 to your computer and use it in GitHub Desktop.
watermill sqlite3
package main
import (
stdSQL "database/sql"
"encoding/json"
"fmt"
"strings"
"github.com/pkg/errors"
"github.com/ThreeDotsLabs/watermill-sql/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
type DefaultSQLite3Schema struct {
// GenerateMessagesTableName may be used to override how the messages table name is generated.
GenerateMessagesTableName func(topic string) string
}
func (s DefaultSQLite3Schema) SchemaInitializingQueries(topic string) []string {
createMessagesTable := strings.Join([]string{`
CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(topic) + ` (
offset INTEGER PRIMARY KEY AUTOINCREMENT,
uuid VARCHAR(36) NOT NULL,
created_at DATETIME,
payload TEXT DEFAULT NULL,
metadata TEXT DEFAULT NULL
)`,
}, "\n")
return []string{createMessagesTable}
}
func (s DefaultSQLite3Schema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) {
insertQuery := fmt.Sprintf(
`INSERT INTO %s (uuid, payload, metadata) VALUES %s`,
s.MessagesTable(topic),
strings.TrimRight(strings.Repeat(`(?,?,?),`, len(msgs)), ","),
)
args, err := defaultInsertArgs(msgs)
if err != nil {
return "", nil, err
}
return insertQuery, args, nil
}
func (s DefaultSQLite3Schema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) (string, []interface{}) {
nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup)
selectQuery := `
SELECT offset, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + `
WHERE
offset > (` + nextOffsetQuery + `)
ORDER BY
offset ASC
LIMIT 1`
return selectQuery, nextOffsetArgs
}
func (s DefaultSQLite3Schema) UnmarshalMessage(row *stdSQL.Row) (offset int, msg *message.Message, err error) {
return unmarshalDefaultMessage(row)
}
func (s DefaultSQLite3Schema) MessagesTable(topic string) string {
if s.GenerateMessagesTableName != nil {
return s.GenerateMessagesTableName(topic)
}
return fmt.Sprintf("`watermill_%s`", topic)
}
type defaultSchemaRow struct {
Offset int64
UUID []byte
Payload []byte
Metadata []byte
}
func defaultInsertArgs(msgs message.Messages) ([]interface{}, error) {
var args []interface{}
for _, msg := range msgs {
metadata, err := json.Marshal(msg.Metadata)
if err != nil {
return nil, errors.Wrapf(err, "could not marshal metadata into JSON for message %s", msg.UUID)
}
args = append(args, msg.UUID, msg.Payload, metadata)
}
return args, nil
}
func unmarshalDefaultMessage(row *stdSQL.Row) (offset int, msg *message.Message, err error) {
r := defaultSchemaRow{}
err = row.Scan(&r.Offset, &r.UUID, &r.Payload, &r.Metadata)
if err != nil {
return 0, nil, errors.Wrap(err, "could not scan message row")
}
msg = message.NewMessage(string(r.UUID), r.Payload)
if r.Metadata != nil {
err = json.Unmarshal(r.Metadata, &msg.Metadata)
if err != nil {
return 0, nil, errors.Wrap(err, "could not unmarshal metadata as JSON")
}
}
return int(r.Offset), msg, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment