Skip to content

Instantly share code, notes, and snippets.

@fionera
Last active April 9, 2021 19:10
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fionera/fdc469b4ed608502227d252c6c03dbc5 to your computer and use it in GitHub Desktop.
Save fionera/fdc469b4ed608502227d252c6c03dbc5 to your computer and use it in GitHub Desktop.
Pr0gramm Stocks Crawler
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
"github.com/dgraph-io/badger/v3"
"github.com/go-redis/redis/v8"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.")
const (
stockPriceCurrentName = "stock_price_current"
stockPriceLastName = "stock_price_last"
stockCirculatingName = "stock_circulating"
)
var (
stockPriceCurrent = prometheus.NewDesc(stockPriceCurrentName, "The current price of a Stock", []string{"stock"}, nil)
stockPriceLast = prometheus.NewDesc(stockPriceLastName, "The last price of a Stock", []string{"stock"}, nil)
stockCirculating = prometheus.NewDesc(stockCirculatingName, "The amount of circulating stocks", []string{"stock"}, nil)
)
type market struct {
rdb *redis.Client
db *badger.DB
}
func (m *market) Describe(descs chan<- *prometheus.Desc) {
descs <- stockPriceCurrent
descs <- stockPriceLast
}
func (m *market) Collect(metrics chan<- prometheus.Metric) {
prices, err := m.requestPrices()
if err != nil {
log.Println(err)
return
}
for name, stock := range prices.Prices {
m.rdb.HSet(context.Background(), stockPriceCurrentName, name, stock.Cur)
m.rdb.HSet(context.Background(), stockPriceLastName, name, stock.Last)
m.rdb.HSet(context.Background(), stockCirculatingName, name, stock.Circulating)
metrics <- prometheus.MustNewConstMetric(stockPriceCurrent, prometheus.GaugeValue, stock.Cur, name)
metrics <- prometheus.MustNewConstMetric(stockPriceLast, prometheus.GaugeValue, stock.Last, name)
metrics <- prometheus.MustNewConstMetric(stockCirculating, prometheus.GaugeValue, stock.Circulating, name)
if _, err := m.requestOrderBook(name); err != nil {
log.Println(err)
}
if _, err := m.requestTrades(name); err != nil {
log.Println(err)
}
}
}
type Stock struct {
Cur float64 `json:"cur"`
Last float64 `json:"last"`
Circulating float64 `json:"circulating"`
}
type Prices struct {
Prices map[string]Stock `json:"prices"`
Ts int `json:"ts"`
}
type Order struct {
Created float64 `json:"created"`
Outstanding float64 `json:"outstanding"`
Price float64 `json:"price"`
}
type Spread struct {
Bid float64 `json:"bid"`
Ask float64 `json:"ask"`
Spread float64 `json:"spread"`
}
type Trades struct {
Spread Spread `json:"spread"`
Trades []Order `json:"trades"`
Ts int `json:"ts"`
}
type OrderBook struct {
Spread Spread `json:"spread"`
Orders struct {
Buy []Order `json:"buy"`
Sell []Order `json:"sell"`
} `json:"orders"`
Ts int `json:"ts"`
}
func (m *market) requestPrices() (*Prices, error) {
resp, err := http.Get("https://pr0gramm.com/api/stocks/prices")
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var p Prices
if err := json.NewDecoder(bytes.NewReader(data)).Decode(&p); err != nil {
return nil, err
}
err = m.db.Update(func(txn *badger.Txn) error {
key := fmt.Sprintf("prices:%d", p.Ts)
if err := txn.Set([]byte(key), data); err != nil {
return err
}
return nil
})
if err != nil {
log.Println(err)
}
return &p, nil
}
func (m *market) requestOrderBook(s string) (*OrderBook, error) {
resp, err := http.Get("https://pr0gramm.com/api/stocks/orderbook?symbol=" + s)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var o OrderBook
if err := json.NewDecoder(bytes.NewReader(data)).Decode(&o); err != nil {
return nil, err
}
err = m.db.Update(func(txn *badger.Txn) error {
key := fmt.Sprintf("order_book:%s:%d", s, o.Ts)
if err := txn.Set([]byte(key), data); err != nil {
return err
}
return nil
})
if err != nil {
log.Println(err)
}
return &o, nil
}
func (m *market) requestTrades(s string) (*Trades, error) {
resp, err := http.Get("https://pr0gramm.com/api/stocks/trades?symbol=" + s)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var t Trades
if err := json.NewDecoder(bytes.NewReader(data)).Decode(&t); err != nil {
return nil, err
}
err = m.db.Update(func(txn *badger.Txn) error {
key := fmt.Sprintf("trades:%s:%d", s, t.Ts)
if err := txn.Set([]byte(key), data); err != nil {
return err
}
return nil
})
if err != nil {
log.Println(err)
}
return &t, nil
}
func main() {
flag.Parse()
db, err := badger.Open(badger.DefaultOptions("."))
if err != nil {
log.Fatal(err)
}
defer db.Close()
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
again:
err := db.RunValueLogGC(0.7)
if err == nil {
goto again
}
}
}()
rdb := redis.NewClient(&redis.Options{
Addr: "172.17.0.1:6379",
})
if err := prometheus.Register(&market{rdb, db}); err != nil {
log.Fatal(err)
}
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/" {
err := db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
w.Write(k)
w.Write([]byte("\n"))
if err != nil {
return err
}
}
return nil
})
if err != nil {
log.Println(err)
}
return
}
err := db.View(func(txn *badger.Txn) error {
it, err := txn.Get([]byte(req.URL.Path[1:]))
if err != nil {
return err
}
v, err := it.ValueCopy(nil)
if err != nil {
return err
}
w.Write(v)
return nil
})
if err != nil && err != badger.ErrKeyNotFound {
log.Println(err)
}
return
})
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(*addr, nil))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment