Created
October 31, 2018 07:31
-
-
Save maxbeutel/4b445cbf8289f1b4a540f18c34a0fc84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"os/exec" | |
"path/filepath" | |
"strings" | |
"github.com/chrislusf/glow/flow" | |
) | |
import ( | |
_ "github.com/go-sql-driver/mysql" | |
"github.com/jmoiron/sqlx" | |
) | |
type splunkQuery struct { | |
name string | |
measurement string | |
query string | |
} | |
func newCmdSource(name string, arg ...string) func(out chan string) { | |
return func(out chan string) { | |
log.Println("About to run cmd source") | |
res, err := exec.Command(name, arg...).Output() | |
if err != nil { | |
log.Fatal(err) | |
// @TODO handle error, get stderr | |
} | |
// println("cmd result") | |
// println(string(res)) | |
// println("------------") | |
// Trim result to ignore trailing newlines | |
lines := strings.Split(strings.TrimSpace(string(res)), "\n") | |
log.Println("Done executing command") | |
for _, line := range lines { | |
log.Println("Current line is", line) | |
out <- line | |
} | |
} | |
} | |
func jq(data string, arg ...string) (string, error) { | |
cmd := exec.Command("jq", arg...) | |
stdin, err := cmd.StdinPipe() | |
if err != nil { | |
return "", err | |
} | |
go func() { | |
defer stdin.Close() | |
io.WriteString(stdin, data) | |
}() | |
res, err := cmd.CombinedOutput() | |
if err != nil { | |
return "", err | |
} | |
return string(res), nil | |
} | |
func main() { | |
// Connect to local db | |
db, err := sqlx.Connect("mysql", "root:@/analytics") | |
if err != nil { | |
log.Fatal(err) | |
} | |
// Setup new flow for splunk queries | |
flag.Parse() | |
f1 := flow.New() | |
concurrency := 1 | |
f1.Source( | |
newCmdSource( | |
"find", "/var/tmp/splunk-queries", | |
"-type", "f", | |
"-name", "*.txt", | |
), | |
concurrency, | |
).Map(func(path string) splunkQuery { | |
log.Println("Processing path", path) | |
lines, err := readLines(path) | |
if err != nil { | |
// @TODO handle error | |
} | |
return splunkQuery{ | |
name: filepath.Base(path), | |
measurement: lines[0], | |
query: strings.Join(lines[1:], "\n"), | |
} | |
//return !strings.HasPrefix(line, "#") | |
}).Map(func(q splunkQuery) queryResult { | |
log.Println("Running query", q.name) | |
resp, err := http.Get( | |
fmt.Sprintf( | |
"https://www.flickr.com/services/feeds/photos_public.gne?tags=%s&format=json&nojsoncallback=1", | |
q.query, | |
), | |
) | |
if err != nil { | |
// @TODO: handle error | |
} | |
defer resp.Body.Close() | |
body, err := ioutil.ReadAll(resp.Body) | |
log.Println("Got result for", q.query) | |
log.Println(string(body[0:100]), "...") | |
return queryResult{ | |
query: q, | |
res: string(body), | |
} | |
}).Map(func(r queryResult) []map[string]interface{} { | |
log.Println("Extracting item names") | |
//log.Println(string(r.res[0:100]), "...") | |
dat, err := jq(r.res, "[.items[] | {title: .title, author: .author}]", "--raw-output") | |
if err != nil { | |
// @TODO handle error | |
} | |
var mapp []map[string]interface{} | |
if err := json.Unmarshal([]byte(dat), &mapp); err != nil { | |
panic(err) | |
} | |
//log.Println(dat) | |
return mapp | |
}).MergeReduce(func(a, b []map[string]interface{}) []map[string]interface{} { | |
return append(a, b...) | |
}).Map(func(rows []map[string]interface{}) { | |
log.Println("Accumulated rows:") | |
log.Println(len(rows)) | |
// @TODO use transaction | |
// @TODO do batch insert | |
for _, row := range rows { | |
// @TODO this is not very efficient to do for every row, | |
// but be aware that the order of keys is not the same | |
// for every row! | |
// @see https://github.com/iancoleman/orderedmap/blob/master/orderedmap.go maybe? | |
var keys []string | |
var values []interface{} | |
var placeholders []string | |
for k, v := range row { | |
keys = append(keys, k) | |
values = append(values, v) | |
placeholders = append(placeholders, "?") | |
} | |
sql := fmt.Sprintf( | |
"INSERT INTO query_results(%s) VALUES(%s)", | |
strings.Join(keys, ","), | |
strings.Join(placeholders, ","), | |
) | |
db.MustExec(sql, values...) | |
} | |
log.Println("Inserts done") | |
}).Run() | |
} | |
type queryResult struct { | |
query splunkQuery | |
res string | |
} | |
/* | |
flag.Parse() | |
flow.New().TextFile( | |
"/etc/passwd", 3, | |
).Filter(func(line string) bool { | |
return !strings.HasPrefix(line, "#") | |
}).Map(func(line string, ch chan string) { | |
println("got line, but ignoring, haha") | |
//for _, token := range strings.Split(line, ":") { | |
/// ch <- token | |
//} | |
}).Map(func(key string) int { | |
return 1 | |
}).Reduce(func(x int, y int) int { | |
return x + y | |
}).Map(func(x int) { | |
println("count:", x) | |
}).Run() | |
*/ | |
func readLines(path string) ([]string, error) { | |
file, err := os.Open(path) | |
if err != nil { | |
return nil, err | |
} | |
defer file.Close() | |
var lines []string | |
scanner := bufio.NewScanner(file) | |
for scanner.Scan() { | |
lines = append(lines, scanner.Text()) | |
} | |
return lines, scanner.Err() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment