Skip to content

Instantly share code, notes, and snippets.

@Ja7ad
Last active October 20, 2023 06:59
Show Gist options
  • Save Ja7ad/09be29eb26b98f1a0077443f63d7e7ad to your computer and use it in GitHub Desktop.
Save Ja7ad/09be29eb26b98f1a0077443f63d7e7ad to your computer and use it in GitHub Desktop.
package core
import (
"context"
"encoding/json"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/google/uuid"
"strings"
"time"
)
const (
maxRetries = 5
interval = 1 * time.Second
)
type Subscriber interface {
Run()
Shutdown(shutdownSignal struct{})
}
type Subscription struct {
logger logger.Logger
publisher amqp.Publisher
project *config.Project
subscriptions map[string]map[ethereum.Subscription]chan types.Log
networkClients map[string]*ethclient.Client
shutdownSignal chan struct{}
}
func NewSubscription(ctx context.Context, rabbitmq amqp.Initiator, logger logger.Logger, project *config.Project) (Subscriber, error) {
pub, err := rabbitmq.Publisher(amqp.PublisherConfig{})
if err != nil {
return nil, err
}
sub := &Subscription{
publisher: pub,
logger: logger,
project: project,
subscriptions: make(map[string]map[ethereum.Subscription]chan types.Log),
networkClients: make(map[string]*ethclient.Client),
shutdownSignal: make(chan struct{}),
}
for _, chain := range sub.project.Chains {
chainClient, err := sub.createClient(chain.Address)
if err != nil {
return nil, err
}
sub.networkClients[chain.ContractAddress] = chainClient
query := ethereum.FilterQuery{
Addresses: []common.Address{
common.HexToAddress(chain.ContractAddress),
},
}
logCh := make(chan types.Log)
subscription, err := chainClient.SubscribeFilterLogs(ctx, query, logCh)
if err != nil {
return nil, err
}
sub.subscriptions[chain.ContractAddress] = map[ethereum.Subscription]chan types.Log{
subscription: logCh,
}
}
return sub, nil
}
func (o *Subscription) Shutdown(signal struct{}) {
o.shutdownSignal <- signal
for _, subs := range o.subscriptions {
for sub, _ := range subs {
sub.Unsubscribe()
}
}
for _, client := range o.networkClients {
client.Close()
}
}
func (o *Subscription) Run() {
unhealthyContractCh := make(chan string)
defer close(unhealthyContractCh)
for contract, sub := range o.subscriptions {
for subscription, logs := range sub {
go stream(contract, o.project.Service, o.project.GetChainByContractAddress(contract), o.logger, o.publisher, subscription, logs, unhealthyContractCh, o.shutdownSignal)
o.logger.Info(false, "stream has been started", "service", o.project.Service, "contract", contract)
}
}
for unhealthyContract := range unhealthyContractCh {
go func(contract string) {
for {
sub, logCh, err := o.recovery(contract)
if err != nil {
o.logger.Error(true, "failed to recovery",
"service", o.project.Service,
"contract", contract,
"err", err)
time.Sleep(2 * time.Second)
continue
}
go func() {
stream(contract, o.project.Service, o.project.GetChainByContractAddress(contract), o.logger, o.publisher, sub, logCh, unhealthyContractCh, o.shutdownSignal)
o.logger.Info(false, "recovery subscription has been completed", "contract", contract)
}()
break
}
}(unhealthyContract)
}
}
func (o *Subscription) createClient(address string) (*ethclient.Client, error) {
return ethclient.Dial(address)
}
func stream(contractAddress string, serviceName string, chain *config.Chain, logger logger.Logger, pub amqp.Publisher,
subscription ethereum.Subscription, logCh chan types.Log, unhealthyContractCh chan<- string, shutdownSignal <-chan struct{}) {
for {
select {
case <-shutdownSignal:
return
case err := <-subscription.Err():
logger.Warn(false, "subscription got error, trying recovery subscription...",
"service", serviceName,
"network", chain.Network,
"err", err.Error(),
"contract", contractAddress)
close(logCh)
unhealthyContractCh <- contractAddress
return
case log := <-logCh:
if strings.ToLower(log.Address.Hex()) != strings.ToLower(chain.ContractAddress) {
continue
}
logger.Debug(false, "new event has been received",
"service", serviceName,
"block", log.BlockNumber,
"transaction", log.TxHash.Hex())
b, err := json.Marshal(&goTypes.Event{
ContractAddress: contractAddress,
TransactionHash: log.TxHash.Hex(),
Data: log.Data,
Topics: log.Topics,
})
if err != nil {
logger.Error(true, "failed json marshal",
"service", serviceName,
"transaction", log.TxHash.Hex(),
"err", err.Error())
continue
}
go func() {
err := pub.PublishWithRetry(
context.Background(),
config.ExchangeName,
serviceName+config.QueuePostfix,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
MessageId: uuid.New().String(),
Body: b,
},
)
if err != nil {
logger.Error(true, err.Error())
}
}()
}
}
}
func (o *Subscription) recovery(contract string) (ethereum.Subscription, chan types.Log, error) {
o.logger.Warn(false, "trying to recovery subscription...", "contract", contract)
delete(o.networkClients, contract)
delete(o.subscriptions, contract)
chainAddress := ""
for _, chain := range o.project.Chains {
if chain.ContractAddress == contract {
chainAddress = chain.Address
}
}
chainClient, err := o.createClient(chainAddress)
if err != nil {
return nil, nil, err
}
query := ethereum.FilterQuery{
Addresses: []common.Address{
common.HexToAddress(contract),
},
}
logCh := make(chan types.Log)
subscription, err := chainClient.SubscribeFilterLogs(context.Background(), query, logCh)
if err != nil {
return nil, nil, err
}
o.networkClients[contract] = chainClient
o.subscriptions[contract] = map[ethereum.Subscription]chan types.Log{
subscription: logCh,
}
return subscription, logCh, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment