Skip to content

Instantly share code, notes, and snippets.

@ilyaglow
Last active February 5, 2018 08:46
Show Gist options
  • Save ilyaglow/7efb59d6522b8a7838344147fda9153f to your computer and use it in GitHub Desktop.
Save ilyaglow/7efb59d6522b8a7838344147fda9153f to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"database/sql"
"flag"
"log"
"os"
"path/filepath"
"strings"
"sync"
_ "github.com/mailru/go-clickhouse"
)
type record struct {
Username string
Domain string
Password string
}
func main() {
dir := flag.String("d", "./BreachCompilation", "BreachCompilation directory")
ch := flag.String("churl", "http://127.0.0.1:8123/default", "Clickhouse URL")
flag.Parse()
conn, err := sql.Open("clickhouse", *ch)
if err != nil {
log.Fatal(err)
}
if err := conn.Ping(); err != nil {
log.Fatal(err)
}
_, err = conn.Exec(`
CREATE TABLE IF NOT EXISTS leaks (
username String,
domain String,
password String,
date Date DEFAULT today()
) engine=MergeTree(date,username,8192)
`)
if err != nil {
log.Fatal(err)
}
rchan := make(chan record)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
send(conn, rchan)
}()
filepath.Walk(*dir, func(path string, fi os.FileInfo, err error) error {
f, ferr := os.Open(path)
if ferr != nil {
return err
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
normalize(scanner.Text(), rchan)
}
return nil
})
close(rchan)
wg.Wait()
}
func send(conn *sql.DB, input <-chan record) {
var it uint
tx, err := conn.Begin()
if err != nil {
log.Fatal(err)
}
stmt, err := tx.Prepare("INSERT INTO leaks (username, domain, password) VALUES (?, ?, ?)")
if err != nil {
log.Fatal(err)
}
for rec := range input {
// fmt.Printf("u: %s, d: %s, p: %s\n", rec.Username, rec.Domain, rec.Password)
if _, err := stmt.Exec(
rec.Username,
rec.Domain,
rec.Password); err != nil {
log.Fatal(err)
}
it++
if it == 100000 {
log.Println("Commit transaction with 100 000 entries")
it = 0
if err := tx.Commit(); err != nil {
if strings.Contains(err.Error(), "Transaction") {
log.Println(err)
} else {
log.Println("tx.Commit() failed")
log.Fatal(err)
}
}
tx, err = conn.Begin()
if err != nil {
log.Fatal(err)
}
stmt, err = tx.Prepare("INSERT INTO leaks (username, domain, password) VALUES (?, ?, ?)")
if err != nil {
log.Fatal(err)
}
}
}
log.Println("Committing the tail")
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
}
func normalize(line string, output chan record) error {
var (
domain string
password string
)
ats := strings.SplitN(line, "@", 2)
if len(ats) < 2 {
return nil
}
username, rest := ats[0], ats[1]
colons := strings.SplitN(rest, ":", 2)
if len(colons) < 2 {
semicolons := strings.SplitN(colons[0], ";", 2)
if len(semicolons) < 2 {
return nil
} else {
domain, password = semicolons[0], semicolons[1]
}
} else {
domain, password = colons[0], colons[1]
}
r := record{
Username: username,
Domain: domain,
Password: password,
}
output <- r
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment