Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Sample for Post
package repository
import (
"context"
"errors"
"fmt"
"time"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/pgjson"
"github.com/google/uuid"
"github.com/wenweih/utxo-cqrs/app/service/block_parser/pkg/model"
rpcproto "github.com/wenweih/bitcoin-rpc-golang/proto"
)
// BlockExistErr customer error
type BlockExistErr struct {
Msg string
}
func (err BlockExistErr) Error() string {
return err.Msg
}
func (repo *repo) BlockGenerated(ctx context.Context, event *rpcproto.GetBlockVerboseTxResult, constraintEvent bool) error {
conn := repo.db.Conn()
defer conn.Close()
// query with block hash and heigh
// block-parser=# explain (analyze,verbose,timing,costs,buffers)select count(*) from events where event_json @> '{"hash":"0000000033a27c42a6d51c428de207c78007492be46d5488e636121e84161a18", "height":"5247"}';
// QUERY PLAN
// ----------------------------------------------------------------------------------------------------------------------------------------------------------
// Aggregate (cost=191.65..191.66 rows=1 width=8) (actual time=0.238..0.239 rows=1 loops=1)
// Output: count(*)
// Buffers: shared hit=34
// -> Bitmap Heap Scan on public.events (cost=120.15..191.60 rows=19 width=0) (actual time=0.237..0.237 rows=0 loops=1)
// Recheck Cond: (events.event_json @> '{"hash": "0000000033a27c42a6d51c428de207c78007492be46d5488e636121e84161a18", "height": "5247"}'::jsonb)
// Buffers: shared hit=34
// -> Bitmap Index Scan on idx_events (cost=0.00..120.14 rows=19 width=0) (actual time=0.236..0.236 rows=0 loops=1)
// Index Cond: (events.event_json @> '{"hash": "0000000033a27c42a6d51c428de207c78007492be46d5488e636121e84161a18", "height": "5247"}'::jsonb)
// Buffers: shared hit=34
// Planning Time: 0.084 ms
// Execution Time: 0.257 ms
// (11 rows)
var num int
// schema := `select count(*) from events where event_json->>'hash'=? and (event_json->>'height')::bigint=?`
schema := fmt.Sprintf(`select count(*) from events where event_json @> '{"hash":"%s", "height":%d}';`, event.Hash, event.Height)
_, err := conn.QueryContext(ctx, pg.Scan(&num), schema)
if err != nil {
return errors.New("fail to query block before insert new block: " + err.Error())
}
if !constraintEvent && num > 0 {
return BlockExistErr{Msg: "block exist: " + event.Hash}
}
txsWithHex := make([]rpcproto.Transaction, len(event.Tx))
for i, tx := range event.Tx {
txsWithHex[i] = rpcproto.Transaction{Hex: tx.Hex}
}
eventMeta, err := pgjson.Marshal(&txsWithHex)
if err != nil {
return errors.New("fail to marshal block tx: " + err.Error())
}
event.Tx = nil
eventJSON, err := pgjson.Marshal(&event)
if err != nil {
return errors.New("fail to marshal block payload: " + err.Error())
}
// Another optimization I learned about recently: avoid having any DEFAULTs on columns which are non-constant (e.g. SERIAL or DEFAULT uuid_generate_v4()) -- pre-generating IDs is much faster.
// https://gist.github.com/valyala/ae3cbfa4104f1a022a2af9b8656b1131#gistcomment-3049363
m := model.Events{
ID: uuid.New(),
AggregateID: uuid.New(),
SequenceNumber: 1,
CreatedAt: time.Now().UTC(),
EventType: "BlockGenerated",
EventJSON: eventJSON,
EventMeta: eventMeta,
}
tx, err := conn.Begin()
if _, err := tx.Model(&m).Insert(); err != nil {
return errors.New("fail to insert block: " + err.Error())
}
// UPDATE events SET event_json=event_json||'{"next_hash":"0000000000000a3290f20e75860d505ce0e948a1d1d846bec7e39015d242884b"}' WHERE event_json->>'hash' = '00000000000008df4269884f1d3bfc2aed3ea747292abb89be3dc3faa8c5d26f';
// https://stackoverflow.com/a/53280508
// https://stackoverflow.com/a/38045827
var updateCount int
// updateSQL := `UPDATE events SET event_json=event_json||'{"next_hash":?}' WHERE event_json->>'hash' = ?;`
// block-parser=# explain (analyze,verbose,timing,costs,buffers) UPDATE events SET event_json=event_json||'{"next_hash":"00000000931d4e729a9c90626f5a0306864c14a9456ec1379205a30b9260704a"}' WHERE event_json @> '{"hash":"00000000931d4e729a9c90626f5a0306864c14a9456ec1379205a30b9260704a"}';
// QUERY PLAN
// --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
// Update on public.events (cost=188.24..304.97 rows=31 width=209) (actual time=0.357..0.357 rows=0 loops=1)
// Buffers: shared hit=53
// -> Bitmap Heap Scan on public.events (cost=188.24..304.97 rows=31 width=209) (actual time=0.339..0.340 rows=1 loops=1)
// Output: id, aggregate_id, sequence_number, created_at, domain_type, domain_id, event_type, (event_json || '{"next_hash": "00000000931d4e729a9c90626f5a0306864c14a9456ec1379205a30b9260704a"}'::jsonb), event_meta, command_id, stream_id, ctid
// Recheck Cond: (events.event_json @> '{"hash": "00000000931d4e729a9c90626f5a0306864c14a9456ec1379205a30b9260704a"}'::jsonb)
// Rows Removed by Index Recheck: 2
// Heap Blocks: exact=1
// Buffers: shared hit=52
// -> Bitmap Index Scan on idx_events (cost=0.00..188.23 rows=31 width=0) (actual time=0.326..0.326 rows=4 loops=1)
// Index Cond: (events.event_json @> '{"hash": "00000000931d4e729a9c90626f5a0306864c14a9456ec1379205a30b9260704a"}'::jsonb)
// Buffers: shared hit=51
// Planning Time: 0.069 ms
// Execution Time: 0.378 ms
// (13 rows)
updateSQL := fmt.Sprintf(`UPDATE events SET event_json=event_json||'{"next_hash":"%s"}' WHERE event_json @> '{"hash":"%s"}';`, event.Hash, event.Previousblockhash)
_, err = tx.QueryContext(ctx, &updateCount, updateSQL)
if err != nil {
return errors.New("fail to update previous nex_block attribute on event_json column: " + err.Error())
}
if err := tx.Commit(); err != nil {
return err
}
tx.Close()
return nil
}
package repository
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/suite"
rpcproto "github.com/wenweih/bitcoin-rpc-golang/proto"
)
type BlockProjectorTestSuite struct {
RepoSuite
}
func (s *BlockProjectorTestSuite) TestBlockGenerated() {
cases := []struct {
BlockCmd *rpcproto.GetBlockVerboseTxResult
constraintEvent bool
err error
}{
{
&rpcproto.GetBlockVerboseTxResult{
Hash: "0000000000000000000aafc247e4f795e1de32f20356bac70ce39e7eb9d53ae1",
Height: 655613,
Tx: []*rpcproto.Transaction{
{
Txid: "1cc8fa58bb861824b8e80eea60e6526838e1b92ef7514fef3e83d5967b13e075",
Hex: "010000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff6403fd000a2cfabe6d6d88a36a2ae4eb8c8bfd831393862debe424f48e68979dc0b51918e349bccf06b710000000f09f909f082f4632506f6f6c2f104d696e6564206279206d746168657269000000000000000000000000000000000000000500621500000000000004f7063928000000001976a914c825a1ecf2a6830c4401620c3a16f1995057c2ab88ac00000000000000002f6a24aa21a9edf58b86b0e263edcee21ad2a4201de872b47c429680f7658430aef70f6195aba908000000000000000000000000000000002c6a4c2952534b424c4f434b3a1a3ed706fe5ff23a589606f352cf0c6cf3e46b7230ba37076906bb21002b547f0000000000000000266a24b9e11b6d7878f988604e099193510765be8678222bb37e34e6107c0facc59e1c2d97794c012000000000000000000000000000000000000000000000000000000000000000000c5bcb3d",
},
{
Txid: "1fa019b32056054d99274348b6a7f1fddaeb0c68a8ae4af12e0ed3cf7a800878",
Hex: "01000000025867051e9262a2191c6627cca7c65a44a1b9080156af96b1f6eaa383dc99a242010000006a47304402205b418030b700877e71aceb1f2fa8c2fd3a6dcfebd5ba295a8e4010dae20d28b102207b704cc6b9f35129e61a22011ea10d4a02ac85af3f33f6d857df49894d15fa66012102308ee57e052de55432e36323203e1e095b0bc173dd98b22c861a82c150c96c80ffffffff2e3e700b6fcd8b3fbe9f7ccec49a93134dee2c3e91d802fcfb8ed1c2140e9f52000000006a473044022024be94286c5c81e529720751d1722ccb1ed7bc2e7b85277558b4b736c4295d0b02201291b52ab6c537f529d3c5ec93f5c482ea5fd7d67ea8ee056ab67fc029377a40012102308ee57e052de55432e36323203e1e095b0bc173dd98b22c861a82c150c96c80ffffffff02001a7118020000001976a91469fe5f802fe34c3e16806f21c8b4229a5b9f518e88ac437c0200000000001976a914bb1c7be480165a2246fa0666eb4e13d8a409f75f88ac00000000",
},
},
},
false,
nil,
},
{
&rpcproto.GetBlockVerboseTxResult{
Hash: "0000000000000000000aafc247e4f795e1de32f20356bac70ce39e7eb9d53ae1",
Height: 655613,
},
true,
fmt.Errorf("%s", `fail to insert block: ERROR #23505 duplicate key value violates unique constraint "events_md5_idx"`),
},
{
&rpcproto.GetBlockVerboseTxResult{
Hash: "0000000000000000000aafc247e4f795e1de32f20356bac70ce39e7eb9d53ae1",
Height: 655613,
},
false,
BlockExistErr{Msg: "block exist: 0000000000000000000aafc247e4f795e1de32f20356bac70ce39e7eb9d53ae1"},
},
}
for _, tc := range cases {
err := s.repository.BlockGenerated(context.Background(), tc.BlockCmd, tc.constraintEvent)
if tc.err == nil {
s.Assert().NoError(err)
} else {
s.Assert().EqualError(tc.err, err.Error())
}
}
}
func TestBlockProjectTestSuite(t *testing.T) {
s := &BlockProjectorTestSuite{
RepoSuite: RepoSuite{
fixtureFiles: []string{"events.yml"},
tables: []string{"events"},
},
}
suite.Run(t, s)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment