Skip to content

Instantly share code, notes, and snippets.

@tigrus
Last active April 5, 2016 01:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tigrus/e5d9bed4cde33773284d2c7dc4468beb to your computer and use it in GitHub Desktop.
Save tigrus/e5d9bed4cde33773284d2c7dc4468beb to your computer and use it in GitHub Desktop.
package main
import (
"encoding/csv"
"fmt"
"github.com/docopt/docopt-go"
"io"
"log"
"os"
"strconv"
"strings"
)
type Coordinates struct {
X int
Y int
}
type SConc struct {
Conc float64
ToxConc float64
}
type Filter func([]string) bool
func BuildFilter(option string, field int, arguments map[string]interface{}) Filter {
toadd := arguments[option]
if toadd == nil {
return nil
}
_numbers := strings.Split(toadd.(string), ",")
mapall := make(map[string]bool)
for _, val := range _numbers {
mapall[val] = true
}
return func(row []string) bool {
data := row[field]
return mapall[data]
}
}
func CheckFilter(filt Filter, filters []Filter) []Filter {
if filt != nil {
return append (filters, filt)
}
return filters
}
func KeepRecord(record []string, filters []Filter) bool {
if len(filters) == 0 {
return true
}
for _, filt := range filters {
if filt(record) {
return true;
}
}
return false
}
func makeWriter(fname string, writer_number int) (csv.Writer, os.File) {
if (writer_number != -1) {
str := strconv.Itoa(writer_number)
fname = fname + ".temp." + str
}
csvfile, err := os.Create(fname)
if err != nil {
fmt.Println("Error:", err)
}
writer := csv.NewWriter(csvfile)
return *writer, *csvfile
}
func handleCommandLine() (map[string]interface{}, error) {
usage := `Chemical Sum.
Usage:
chemical
chemical <input_file>
chemical <input_file> <output_file> [--chemical=<ch>] [--facility=<fc>] [--media=<md>] [--restore=<file>]
chemical --help
Options:
--chemical=<ch> Query by chemical number
--facility=<fc> Query by facility number
--media=<md> Query by media
--restore=<file> Restore from file
--help Show this screen
`
arguments, err := docopt.Parse(usage, nil, true, "Chemical Sum 0.0.3", false)
return arguments, err
}
func main() {
arguments, err := handleCommandLine()
if err != nil {
log.Fatal(err)
}
countOfWriters := 20
input_file := arguments["<input_file>"].(string)
output_file := arguments["<output_file>"].(string)
filters := []Filter{}
filters = CheckFilter(BuildFilter("--chemical", 4, arguments), filters)
filters = CheckFilter(BuildFilter("--facility", 5, arguments), filters)
filters = CheckFilter(BuildFilter("--media", 6, arguments), filters)
fmt.Println(arguments)
fmt.Println("Count of chunks ", countOfWriters)
rf, err := os.Open(input_file)
if err != nil {
log.Fatal("Error: %s", err)
}
r := csv.NewReader(rf)
writers := []csv.Writer{}
csvfiles := []os.File{}
for i := 1; i <= countOfWriters; i++ {
writer, csvfile := makeWriter(output_file, i)
writers = append(writers, writer)
csvfiles = append(csvfiles, csvfile)
}
fmt.Println()
if _, err := r.Read(); err != nil {
log.Fatalf("Unable to read the header: %v", err)
}
i := 0
for {
record, err := r.Read()
i += 1
fmt.Printf("\rProcessed %d\r", i)
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
if !KeepRecord(record, filters) {
continue
}
X := record[1]
Y := record[2]
Conc := record[7]
ToxConc := record[8]
new_record := []string{X, Y, Conc, ToxConc}
writeNumber, _ := strconv.Atoi(X)
if writeNumber < 0 {
writeNumber = -writeNumber
}
writeNumber = writeNumber % countOfWriters
writer := writers[writeNumber]
writer.Write(new_record)
}
rf.Close()
for _, writer := range writers {
writer.Flush()
}
for _, outputfile := range csvfiles {
outputfile.Close()
}
fmt.Printf("\n")
fmt.Println("Preparings done.")
fmt.Println("Aggregating data..")
//readers := []csv.Readers{}
writer, outputFile := makeWriter(output_file, -1)
defer outputFile.Close()
for i := 1; i <= countOfWriters; i++ {
fmt.Println("Aggregating chunk ", i)
str := strconv.Itoa(i)
rf2, err := os.Open(output_file + ".temp." + str)
if err != nil {
log.Fatal("Error: %s", err)
}
r2 := csv.NewReader(rf2)
results := make(map[Coordinates]SConc)
for {
record, err := r2.Read()
if err == io.EOF {
break
}
X, _ := strconv.Atoi(record[0])
Y, _ := strconv.Atoi(record[1])
Conc, _ := strconv.ParseFloat(record[2], 32)
ToxConc, _ := strconv.ParseFloat(record[3], 32)
var key Coordinates
key.X = X
key.Y = Y
conc := results[key]
conc.Conc += Conc
conc.ToxConc += ToxConc
results[key] = conc
}
for k, v := range results {
new_record := []string{strconv.Itoa(k.X),
strconv.Itoa(k.Y),
strconv.FormatFloat(v.Conc, 'f', 10, 64),
strconv.FormatFloat(v.ToxConc, 'f', 10, 64)}
writer.Write(new_record)
}
rf2.Close()
}
writer.Flush()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment