Skip to content

Instantly share code, notes, and snippets.

@dustin
Created July 19, 2011 20:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dustin/1093582 to your computer and use it in GitHub Desktop.
Save dustin/1093582 to your computer and use it in GitHub Desktop.
bitcoin market watcher
*.6
*~
watch-exchange
/bitcoin-exchange
package main
import (
"log"
"runtime"
)
func Fatal(format string, v ...interface{}) {
log.Printf(format, v...)
runtime.Goexit()
}
package main
import (
"bufio"
"encoding/json"
"log"
"net"
"time"
)
type Trade struct {
Timestamp int64
Price float64
Volume float64
Currency string
TID int64
Symbol string
}
func handleData(ch chan Trade, line []byte) {
var doc map[string]interface{}
if err := json.Unmarshal(line, &doc); err != nil {
Fatal("Error parsing ``%s'': %s", line, err)
}
t := Trade{
Timestamp: int64(doc["timestamp"].(float64)),
Price: doc["price"].(float64),
Volume: doc["volume"].(float64),
Currency: doc["currency"].(string),
Symbol: doc["symbol"].(string),
}
if doc["tid"] == nil {
t.TID = -1
} else {
t.TID = int64(doc["tid"].(float64))
}
ch <- t
}
func watchExchange(ch chan Trade) {
defer close(ch)
conn, err := net.Dial("tcp", "bitcoincharts.com:27007")
if err != nil {
log.Fatalf("Error connecting to the place with the things: %s", err)
}
r := bufio.NewReader(MakeNullEater(conn))
for {
// Five minute read timeout
now := time.Now()
conn.SetReadDeadline(now.Add(5 * time.Minute))
line, _, lerr := r.ReadLine()
if lerr != nil {
log.Fatalf("Error reading a line: %s", lerr)
}
go handleData(ch, line)
}
}
func WatchExchange() (rv chan Trade) {
rv = make(chan Trade)
go watchExchange(rv)
return rv
}
package main
import (
"bytes"
"io"
)
/* Eat null bytes in the input stream */
type ByteEater struct {
eat byte
underlying io.Reader
buffer, available []byte
}
func (be *ByteEater) Read(p []byte) (n int, err error) {
if len(be.available) == 0 {
sz, e := be.underlying.Read(be.buffer)
if e != nil {
return 0, e
}
be.available = bytes.Replace(be.buffer[0:sz],
[]byte{be.eat}, []byte{}, -1)
}
sz := copy(p, be.available)
be.available = be.available[sz:]
return sz, nil
}
func MakeByteEater(r io.Reader, eat byte) io.Reader {
return &ByteEater{underlying: r, eat: eat, buffer: make([]byte, 8192)}
}
func MakeNullEater(r io.Reader) io.Reader {
return MakeByteEater(r, 0)
}
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"math/rand"
"net/http"
"time"
)
var ISO8601 string = "2006-01-02T15:04:05"
var couchURL = flag.String("couch", "http://localhost:5984/btc-test",
"URL of the CouchDB")
func store(id string, data []byte) {
var client http.Client
docUrl := fmt.Sprintf("%s/%s", *couchURL, id)
buf := bytes.NewBuffer(data)
req, rerr := http.NewRequest("PUT", docUrl, buf)
if rerr != nil {
Fatal("Error making HTTP request: %s", rerr)
}
req.Close = true
res, reqerr := client.Do(req)
if reqerr != nil {
Fatal("Error issuing request: %s", reqerr)
}
defer res.Body.Close()
if res.StatusCode != 201 {
Fatal("Error storing stuff: %s", res)
}
}
func jsonify(t Trade) (id string, data []byte) {
keydata := ""
doc := map[string]interface{}{}
doc["price"] = t.Price
doc["volume"] = t.Volume
doc["currency"] = t.Currency
doc["tid"] = t.TID
doc["symbol"] = t.Symbol
if t.TID < 0 {
keydata = fmt.Sprintf("%s-%d", t.Timestamp, rand.Int31())
doc["tid"] = nil
} else {
keydata = fmt.Sprintf("%d", t.TID)
}
id = fmt.Sprintf("%s-%s", t.Symbol, keydata)
doc["_id"] = id
doc["date"] = time.Unix(t.Timestamp, 0).UTC().Format(ISO8601)
outdoc, merr := json.Marshal(doc)
if merr != nil {
Fatal("Error marashalling doc: %s", merr)
}
return id, outdoc
}
func main() {
flag.Parse()
for t := range WatchExchange() {
go func() {
id, data := jsonify(t)
store(id, data)
}()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment