Skip to content

Instantly share code, notes, and snippets.

@dangdennis
Last active May 4, 2021 03:18
Show Gist options
  • Save dangdennis/c045206b44b75bfba001b84c9ad5b060 to your computer and use it in GitHub Desktop.
Save dangdennis/c045206b44b75bfba001b84c9ad5b060 to your computer and use it in GitHub Desktop.
Query Flow events concurrently
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/gin-gonic/gin"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/client"
"google.golang.org/grpc"
"gorm.io/gorm"
"github.com/NiftyNiche/o30/events-engine/database"
"github.com/NiftyNiche/o30/events-engine/schema"
)
func main() {
db := database.New(os.Getenv("SUPABASE_DB_HOST"), os.Getenv("SUPABASE_DB_PASSWORD"))
defer func() {
sqlDB, err := db.DB()
panicOnError(err)
sqlDB.Close()
}()
flowClient, err := client.New("127.0.0.1:3569", grpc.WithInsecure())
panicOnError(err)
go func() {
InitEventEngine(db, flowClient)
}()
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(200, gin.H{
"message": "pong",
})
})
r.Run(":8000")
}
// InitEventEngine starts up the listeners periodically to poll for new events
func InitEventEngine(db *gorm.DB, fc *client.Client) {
fmt.Println("Initiate events job")
maxConcurrency := 1
increment := uint64(200)
eventJobs := make(chan EventJob, maxConcurrency)
ctx := context.Background()
go func() {
EnqueueJob(db, fc, ctx, eventJobs, increment)
}()
for j := range eventJobs {
go func(job EventJob) {
DequeueJob(db, fc, ctx, job)
}(j)
}
}
// EventJob is a job for events processing
type EventJob struct {
FromBlock uint64
ToBlock uint64
BlockCursor schema.BlockCursor
}
func EnqueueJob(db *gorm.DB, fc *client.Client, ctx context.Context, eventJobs chan<- EventJob, increment uint64) {
for {
var cursors []schema.BlockCursor
err := db.Where("active = true and is_queued = false").Find(&cursors).Error
if err != nil {
fmt.Printf("Failed to get cursors %+v\n", err)
return
}
if len(cursors) == 0 {
fmt.Println("No active cursors to process")
return
}
latestBlock, err := fc.GetLatestBlock(ctx, true)
if err != nil {
fmt.Printf("Failed to latest block %+v\n", err)
return
}
fmt.Printf("Latest block height is %d\n", latestBlock.Height)
// collect ids to toggle queue status
var queuedCursorIDs []uint
for _, c := range cursors {
if latestBlock.Height <= c.CurrentBlockHeight {
fmt.Printf("Cannot query from latest block height. block_cursor id #%d\n", c.BlockCursorID)
continue
}
fromBlock := c.CurrentBlockHeight + 1
toBlock := c.CurrentBlockHeight + increment
if toBlock >= latestBlock.Height {
toBlock = latestBlock.Height
}
eventJobs <- EventJob{
FromBlock: fromBlock, // start at the next block because the query is inclusive-end,,
ToBlock: toBlock,
BlockCursor: c,
}
queuedCursorIDs = append(queuedCursorIDs, c.BlockCursorID)
}
err = db.Table("block_cursors").Where("block_cursor_id IN ?", queuedCursorIDs).Updates(map[string]interface{}{"is_queued": true}).Error
if err != nil {
fmt.Printf("Failed to to set block_cursors.is_queued to true %+v", err)
return
}
fmt.Printf("%d jobs are enqueued\n", len(cursors))
time.Sleep(2000 * time.Millisecond)
}
}
func DequeueJob(db *gorm.DB, fc *client.Client, ctx context.Context, job EventJob) {
dequeueSelf := func() error {
err := db.Table("block_cursors").Where("block_cursor_id = ?", job.BlockCursor.BlockCursorID).Updates(map[string]interface{}{"is_queued": false}).Error
if err != nil {
fmt.Printf("Failed to set block_cursor.is_queued to false %+v", err)
return err
}
return nil
}
fmt.Printf("%d-%d\n", job.FromBlock, job.ToBlock)
results, err := fc.GetEventsForHeightRange(ctx, client.EventRangeQuery{
Type: job.BlockCursor.EventName,
StartHeight: job.FromBlock, // start at the next block because the query is inclusive-end
EndHeight: job.ToBlock,
})
if err != nil {
fmt.Printf("Failed to query events %+v\n", err)
dequeueSelf()
return
}
for _, block := range results {
for _, event := range block.Events {
err = SaveFlowEvent(db, job.BlockCursor, block, event)
if err != nil {
fmt.Printf("Failed to save event %s %+v\n", event.ID(), err)
fmt.Printf("Failed event value %s", event.Value.String())
}
}
}
job.BlockCursor.CurrentBlockHeight = job.ToBlock
err = db.Save(&job.BlockCursor).Error
if err != nil {
fmt.Printf("Failed to update block_cursor #%d %+v\n", job.BlockCursor.BlockCursorID, err)
}
err = dequeueSelf()
if err != nil {
fmt.Println("Job failed")
}
fmt.Println("Job complete")
}
func SaveFlowEvent(db *gorm.DB, cursor schema.BlockCursor, block client.BlockEvents, event flow.Event) (err error) {
return db.Create(&schema.Event{
BlockHeight: block.Height,
EventName: cursor.EventName,
TransactionID: event.TransactionID.String(),
TransactionIndex: event.TransactionIndex,
EventIndex: event.EventIndex,
BlockCursorID: cursor.BlockCursorID,
Payload: event.Value.String(),
}).Error
}
func panicOnError(err error) {
if err != nil {
fmt.Println("err:", err.Error())
panic(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment