Skip to content

Instantly share code, notes, and snippets.

@maxbeutel
Created October 31, 2018 07:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxbeutel/4b445cbf8289f1b4a540f18c34a0fc84 to your computer and use it in GitHub Desktop.
Save maxbeutel/4b445cbf8289f1b4a540f18c34a0fc84 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/chrislusf/glow/flow"
)
import (
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
type splunkQuery struct {
name string
measurement string
query string
}
func newCmdSource(name string, arg ...string) func(out chan string) {
return func(out chan string) {
log.Println("About to run cmd source")
res, err := exec.Command(name, arg...).Output()
if err != nil {
log.Fatal(err)
// @TODO handle error, get stderr
}
// println("cmd result")
// println(string(res))
// println("------------")
// Trim result to ignore trailing newlines
lines := strings.Split(strings.TrimSpace(string(res)), "\n")
log.Println("Done executing command")
for _, line := range lines {
log.Println("Current line is", line)
out <- line
}
}
}
func jq(data string, arg ...string) (string, error) {
cmd := exec.Command("jq", arg...)
stdin, err := cmd.StdinPipe()
if err != nil {
return "", err
}
go func() {
defer stdin.Close()
io.WriteString(stdin, data)
}()
res, err := cmd.CombinedOutput()
if err != nil {
return "", err
}
return string(res), nil
}
func main() {
// Connect to local db
db, err := sqlx.Connect("mysql", "root:@/analytics")
if err != nil {
log.Fatal(err)
}
// Setup new flow for splunk queries
flag.Parse()
f1 := flow.New()
concurrency := 1
f1.Source(
newCmdSource(
"find", "/var/tmp/splunk-queries",
"-type", "f",
"-name", "*.txt",
),
concurrency,
).Map(func(path string) splunkQuery {
log.Println("Processing path", path)
lines, err := readLines(path)
if err != nil {
// @TODO handle error
}
return splunkQuery{
name: filepath.Base(path),
measurement: lines[0],
query: strings.Join(lines[1:], "\n"),
}
//return !strings.HasPrefix(line, "#")
}).Map(func(q splunkQuery) queryResult {
log.Println("Running query", q.name)
resp, err := http.Get(
fmt.Sprintf(
"https://www.flickr.com/services/feeds/photos_public.gne?tags=%s&format=json&nojsoncallback=1",
q.query,
),
)
if err != nil {
// @TODO: handle error
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
log.Println("Got result for", q.query)
log.Println(string(body[0:100]), "...")
return queryResult{
query: q,
res: string(body),
}
}).Map(func(r queryResult) []map[string]interface{} {
log.Println("Extracting item names")
//log.Println(string(r.res[0:100]), "...")
dat, err := jq(r.res, "[.items[] | {title: .title, author: .author}]", "--raw-output")
if err != nil {
// @TODO handle error
}
var mapp []map[string]interface{}
if err := json.Unmarshal([]byte(dat), &mapp); err != nil {
panic(err)
}
//log.Println(dat)
return mapp
}).MergeReduce(func(a, b []map[string]interface{}) []map[string]interface{} {
return append(a, b...)
}).Map(func(rows []map[string]interface{}) {
log.Println("Accumulated rows:")
log.Println(len(rows))
// @TODO use transaction
// @TODO do batch insert
for _, row := range rows {
// @TODO this is not very efficient to do for every row,
// but be aware that the order of keys is not the same
// for every row!
// @see https://github.com/iancoleman/orderedmap/blob/master/orderedmap.go maybe?
var keys []string
var values []interface{}
var placeholders []string
for k, v := range row {
keys = append(keys, k)
values = append(values, v)
placeholders = append(placeholders, "?")
}
sql := fmt.Sprintf(
"INSERT INTO query_results(%s) VALUES(%s)",
strings.Join(keys, ","),
strings.Join(placeholders, ","),
)
db.MustExec(sql, values...)
}
log.Println("Inserts done")
}).Run()
}
type queryResult struct {
query splunkQuery
res string
}
/*
flag.Parse()
flow.New().TextFile(
"/etc/passwd", 3,
).Filter(func(line string) bool {
return !strings.HasPrefix(line, "#")
}).Map(func(line string, ch chan string) {
println("got line, but ignoring, haha")
//for _, token := range strings.Split(line, ":") {
/// ch <- token
//}
}).Map(func(key string) int {
return 1
}).Reduce(func(x int, y int) int {
return x + y
}).Map(func(x int) {
println("count:", x)
}).Run()
*/
func readLines(path string) ([]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
return lines, scanner.Err()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment