Last active
April 5, 2016 01:19
-
-
Save tigrus/e5d9bed4cde33773284d2c7dc4468beb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/csv" | |
"fmt" | |
"github.com/docopt/docopt-go" | |
"io" | |
"log" | |
"os" | |
"strconv" | |
"strings" | |
) | |
type Coordinates struct { | |
X int | |
Y int | |
} | |
type SConc struct { | |
Conc float64 | |
ToxConc float64 | |
} | |
type Filter func([]string) bool | |
func BuildFilter(option string, field int, arguments map[string]interface{}) Filter { | |
toadd := arguments[option] | |
if toadd == nil { | |
return nil | |
} | |
_numbers := strings.Split(toadd.(string), ",") | |
mapall := make(map[string]bool) | |
for _, val := range _numbers { | |
mapall[val] = true | |
} | |
return func(row []string) bool { | |
data := row[field] | |
return mapall[data] | |
} | |
} | |
func CheckFilter(filt Filter, filters []Filter) []Filter { | |
if filt != nil { | |
return append (filters, filt) | |
} | |
return filters | |
} | |
func KeepRecord(record []string, filters []Filter) bool { | |
if len(filters) == 0 { | |
return true | |
} | |
for _, filt := range filters { | |
if filt(record) { | |
return true; | |
} | |
} | |
return false | |
} | |
func makeWriter(fname string, writer_number int) (csv.Writer, os.File) { | |
if (writer_number != -1) { | |
str := strconv.Itoa(writer_number) | |
fname = fname + ".temp." + str | |
} | |
csvfile, err := os.Create(fname) | |
if err != nil { | |
fmt.Println("Error:", err) | |
} | |
writer := csv.NewWriter(csvfile) | |
return *writer, *csvfile | |
} | |
func handleCommandLine() (map[string]interface{}, error) { | |
usage := `Chemical Sum. | |
Usage: | |
chemical | |
chemical <input_file> | |
chemical <input_file> <output_file> [--chemical=<ch>] [--facility=<fc>] [--media=<md>] [--restore=<file>] | |
chemical --help | |
Options: | |
--chemical=<ch> Query by chemical number | |
--facility=<fc> Query by facility number | |
--media=<md> Query by media | |
--restore=<file> Restore from file | |
--help Show this screen | |
` | |
arguments, err := docopt.Parse(usage, nil, true, "Chemical Sum 0.0.3", false) | |
return arguments, err | |
} | |
func main() { | |
arguments, err := handleCommandLine() | |
if err != nil { | |
log.Fatal(err) | |
} | |
countOfWriters := 20 | |
input_file := arguments["<input_file>"].(string) | |
output_file := arguments["<output_file>"].(string) | |
filters := []Filter{} | |
filters = CheckFilter(BuildFilter("--chemical", 4, arguments), filters) | |
filters = CheckFilter(BuildFilter("--facility", 5, arguments), filters) | |
filters = CheckFilter(BuildFilter("--media", 6, arguments), filters) | |
fmt.Println(arguments) | |
fmt.Println("Count of chunks ", countOfWriters) | |
rf, err := os.Open(input_file) | |
if err != nil { | |
log.Fatal("Error: %s", err) | |
} | |
r := csv.NewReader(rf) | |
writers := []csv.Writer{} | |
csvfiles := []os.File{} | |
for i := 1; i <= countOfWriters; i++ { | |
writer, csvfile := makeWriter(output_file, i) | |
writers = append(writers, writer) | |
csvfiles = append(csvfiles, csvfile) | |
} | |
fmt.Println() | |
if _, err := r.Read(); err != nil { | |
log.Fatalf("Unable to read the header: %v", err) | |
} | |
i := 0 | |
for { | |
record, err := r.Read() | |
i += 1 | |
fmt.Printf("\rProcessed %d\r", i) | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
log.Fatal(err) | |
} | |
if !KeepRecord(record, filters) { | |
continue | |
} | |
X := record[1] | |
Y := record[2] | |
Conc := record[7] | |
ToxConc := record[8] | |
new_record := []string{X, Y, Conc, ToxConc} | |
writeNumber, _ := strconv.Atoi(X) | |
if writeNumber < 0 { | |
writeNumber = -writeNumber | |
} | |
writeNumber = writeNumber % countOfWriters | |
writer := writers[writeNumber] | |
writer.Write(new_record) | |
} | |
rf.Close() | |
for _, writer := range writers { | |
writer.Flush() | |
} | |
for _, outputfile := range csvfiles { | |
outputfile.Close() | |
} | |
fmt.Printf("\n") | |
fmt.Println("Preparings done.") | |
fmt.Println("Aggregating data..") | |
//readers := []csv.Readers{} | |
writer, outputFile := makeWriter(output_file, -1) | |
defer outputFile.Close() | |
for i := 1; i <= countOfWriters; i++ { | |
fmt.Println("Aggregating chunk ", i) | |
str := strconv.Itoa(i) | |
rf2, err := os.Open(output_file + ".temp." + str) | |
if err != nil { | |
log.Fatal("Error: %s", err) | |
} | |
r2 := csv.NewReader(rf2) | |
results := make(map[Coordinates]SConc) | |
for { | |
record, err := r2.Read() | |
if err == io.EOF { | |
break | |
} | |
X, _ := strconv.Atoi(record[0]) | |
Y, _ := strconv.Atoi(record[1]) | |
Conc, _ := strconv.ParseFloat(record[2], 32) | |
ToxConc, _ := strconv.ParseFloat(record[3], 32) | |
var key Coordinates | |
key.X = X | |
key.Y = Y | |
conc := results[key] | |
conc.Conc += Conc | |
conc.ToxConc += ToxConc | |
results[key] = conc | |
} | |
for k, v := range results { | |
new_record := []string{strconv.Itoa(k.X), | |
strconv.Itoa(k.Y), | |
strconv.FormatFloat(v.Conc, 'f', 10, 64), | |
strconv.FormatFloat(v.ToxConc, 'f', 10, 64)} | |
writer.Write(new_record) | |
} | |
rf2.Close() | |
} | |
writer.Flush() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment