Skip to content

Instantly share code, notes, and snippets.

@JustinAzoff
Created October 11, 2018 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JustinAzoff/4086a0a0a84f2e5b85d15dafcc83cb45 to your computer and use it in GitHub Desktop.
Save JustinAzoff/4086a0a0a84f2e5b85d15dafcc83cb45 to your computer and use it in GitHub Desktop.
bro zeromq examples
event log_one (n:count)
{
Reporter::info(fmt("Hello %d", n));
if(n != 0) {
schedule 1sec { log_one(n-1) };
}
}
event bro_init()
{
event log_one(10000000);
}
@load NCSA/ZeroMQWriter
redef LogZeroMQ::endpoint = "tcp://127.0.0.1:9999";
redef Log::default_scope_sep="_";
event bro_init()
{
for ( stream_id in Log::active_streams )
Log::remove_default_filter(stream_id);
}
type Extension: record {
write_ts: time &log;
#stream: string &log;
system_name: string &log;
};
function add_extension(path: string): Extension
{
return Extension($write_ts = network_time(),
#$stream = path,
$system_name = peer_description);
}
redef Log::default_ext_func = add_extension;
event new_connection(c: connection) {
print c$id;
}
#!/usr/bin/env python
from __future__ import print_function
import json
import sys
import zmq
def sub(bind="tcp://*:9999"):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind(bind)
socket.setsockopt_string(zmq.SUBSCRIBE, u"http")
socket.setsockopt_string(zmq.SUBSCRIBE, u"ssl")
while True:
stream = socket.recv_string()
entry = socket.recv_string()
rec = json.loads(entry)
try:
if stream == "http":
print("{id_orig_h} {method} http://{host}:{id_resp_p}{uri}".format(**rec))
elif stream == "ssl":
print("{id_orig_h} unknown https://{server_name}:{id_resp_p} unknown".format(**rec))
except:
print(rec)
if __name__ == "__main__":
try:
bind = sys.argv[1]
except IndexError:
bind = "tcp://*:9999"
sub(bind=bind)
#!/usr/bin/env python
from __future__ import print_function
import psycopg2
import sys
import zmq
def sub(bind="tcp://*:9999", topics=None):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind(bind)
if not topics:
topics = u""
for topic in topics.split(u","):
socket.setsockopt_string(zmq.SUBSCRIBE, topic)
while True:
stream = socket.recv_string()
entry = socket.recv_string()
yield stream, entry
def insert(conn, records):
cur = conn.cursor()
cur.execute('''
create table if not exists logs (
stream varchar(30),
rec jsonb
)
''')
cur.execute(' create index if not exists logs_stream on logs(stream) ')
conn.commit()
cur.execute('''
prepare insert as
insert into logs (stream, rec) values ($1, $2)
''')
for i, (stream, rec) in enumerate(records):
cur.execute("execute insert (%s, %s)", (stream, rec))
if i % 10 == 0:
conn.commit()
print("Inserted", i)
if __name__ == "__main__":
try:
bind = sys.argv[1]
except IndexError:
bind = "tcp://*:9999"
try:
pg = sys.argv[2]
except IndexError:
pg = "dbname='bro' user='postgres' host='192.168.99.100'"
try:
topics = sys.argv[3]
except IndexError:
topics = ""
conn = psycopg2.connect(pg)
records = sub(bind=bind, topics=topics)
insert(conn, records)
#!/usr/bin/env python
from __future__ import print_function
import psycopg2
import sys
import zmq
def sub(bind="tcp://*:9999", topics=None):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind(bind)
if not topics:
topics = u""
for topic in topics.split(u","):
socket.setsockopt_string(zmq.SUBSCRIBE, topic)
while True:
stream = socket.recv_string()
entry = socket.recv_string()
yield stream, entry
def insert(conn, records):
cur = conn.cursor()
cur.execute('''
create stream if not exists logs (
log varchar(30),
rec jsonb
)
''')
#cur.execute(' create index if not exists logs_log on logs(log) ')
#conn.commit()
cur.execute('''
prepare insert as
insert into logs (log, rec) values ($1, $2)
''')
for i, (stream, rec) in enumerate(records):
cur.execute("execute insert (%s, %s)", (stream, rec))
if i % 10 == 0:
conn.commit()
print("Inserted", i)
if __name__ == "__main__":
try:
bind = sys.argv[1]
except IndexError:
bind = "tcp://*:9999"
try:
pg = sys.argv[2]
except IndexError:
pg = "dbname='pipeline' user='pipeline' password='pipeline' host='192.168.99.100'"
try:
topics = sys.argv[3]
except IndexError:
topics = ""
conn = psycopg2.connect(pg)
records = sub(bind=bind, topics=topics)
insert(conn, records)
DROP CONTINUOUS VIEW top_dns;
CREATE CONTINUOUS VIEW top_dns WITH (sw = '2 minute') AS
SELECT rec->'query' as query,
count(*) as c,
count ( DISTINCT rec->'id_orig_h') as sources
FROM logs
WHERE log='dns'
GROUP BY rec->'query';
DROP CONTINUOUS VIEW top_dns_clients;
CREATE CONTINUOUS VIEW top_dns_clients WITH (sw = '2 minute') AS
SELECT rec->'id_orig_h' as query,
count(*) as c,
count ( DISTINCT rec->'query') as queries
FROM logs
WHERE log='dns'
GROUP BY rec->'id_orig_h';
package main
import (
"context"
"encoding/json"
"flag"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/go-zeromq/zmq4"
)
var (
zmqBind = flag.String("zmq", "tcp://127.0.0.1:9999", "endpoint to bind the zmq sub socket to")
)
type Conn struct {
Orig string `json:"id_orig_h"`
Resp string `json:"id_resp_h"`
}
type Data struct {
conns map[string][]string
sync.Mutex
}
func recv(data *Data) {
sub := zmq4.NewSub(context.Background())
defer sub.Close()
log.Printf("Binding zmq SUB socket to %q", *zmqBind)
sub.Listen(*zmqBind)
err := sub.SetOption(zmq4.OptionSubscribe, "conn")
if err != nil {
log.Fatalf("could not subscribe: %v", err)
}
for {
// Read envelope with address
msg, err := sub.Recv()
if err != nil {
log.Fatal(err)
}
contents := msg.Frames[1]
var record Conn
if err := json.Unmarshal(contents, &record); err != nil {
log.Printf("Can't decode %q", contents)
continue
}
if record.Orig == "" {
continue
}
log.Printf("%v", record)
data.Lock()
data.conns[record.Orig] = append(data.conns[record.Orig], record.Resp)
if len(data.conns[record.Orig]) > 10 {
data.conns[record.Orig] = data.conns[record.Orig][1:]
}
data.Unlock()
}
}
func web(data *Data) {
http.HandleFunc("/recent", func(w http.ResponseWriter, r *http.Request) {
query := r.FormValue("q")
if query == "" {
http.Error(w, "Missing parameter: q", http.StatusBadRequest)
return
}
data.Lock()
conns := data.conns[query]
json.NewEncoder(w).Encode(conns)
data.Unlock()
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
func main() {
flag.Usage = func() {
flag.PrintDefaults()
}
flag.Parse()
conns := make(map[string][]string)
data := Data{conns: conns}
go recv(&data)
go web(&data)
for {
data.Lock()
log.Printf("Tracking %d sources", len(data.conns))
data.Unlock()
time.Sleep(2 * time.Second)
}
os.Exit(0)
}
package main
import (
"context"
"flag"
"log"
"os"
"github.com/go-zeromq/zmq4"
)
var (
zmqBind = flag.String("zmq", "tcp://127.0.0.1:9999", "endpoint to bind the zmq sub socket to")
)
func main() {
flag.Usage = func() {
flag.PrintDefaults()
}
flag.Parse()
sub := zmq4.NewSub(context.Background())
defer sub.Close()
log.Printf("Binding zmq SUB socket to %q", *zmqBind)
sub.Listen(*zmqBind)
err := sub.SetOption(zmq4.OptionSubscribe, "conn")
if err != nil {
log.Fatalf("could not subscribe: %v", err)
}
for {
msg, err := sub.Recv()
if err != nil {
log.Fatal(err)
}
log.Printf("%s %s", msg.Frames[0], msg.Frames[1])
}
os.Exit(0)
}
#!/usr/bin/env python
from __future__ import print_function
import json
import pprint
import sys
import time
import zmq
import pprint
def sub(bind="tcp://*:9999", topics=None):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind(bind)
if not topics:
topics = u""
for topic in topics.split(u","):
socket.setsockopt_string(zmq.SUBSCRIBE, topic)
while True:
stream = socket.recv_string()
entry = socket.recv_string()
rec = json.loads(entry)
#pprint.pprint((stream, rec))
print(entry)
if __name__ == "__main__":
try:
bind = sys.argv[1]
except IndexError:
bind = "tcp://*:9999"
try:
topics = sys.argv[2]
except IndexError:
topics = ""
sub(bind=bind, topics=topics)
from __future__ import print_function
import time
import zmq
port = "9999"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("tcp://127.0.0.1:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, "")
x = bytes = lines = 0
last = time.time()
while True:
stream, entry = socket.recv_multipart()
lines += 1
x += 1
bytes += len(entry)
if x % 20 == 0:
x = 0
now = time.time()
dur = now - last
if dur >= 1:
last = now
print("{}s lines={} bytes={} mbytes={}".format(dur, lines, bytes, bytes/1024/1024))
lines = bytes = 0
psql -h $(docker-machine ip) -U pipeline pipeline
docker run -t -i --rm -p 5432:5432 pipelinedb/pipelinedb
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment