Skip to content

Instantly share code, notes, and snippets.

@nfroidure
Last active June 27, 2018 09:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nfroidure/2030aa0f709a0a3995d5a36c0318dd4a to your computer and use it in GitHub Desktop.
Save nfroidure/2030aa0f709a0a3995d5a36c0318dd4a to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"fmt"
"log"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
type FilesMap struct {
daMap map[string]*os.File
mux sync.RWMutex
}
type Message struct {
serial string
t time.Time
data string
signal float64
}
func readMessage(text string) (Message, error) {
fields := strings.Split(text, ",")
timestamp, err := strconv.ParseUint(fields[1], 10, 64)
t := time.Unix(int64(timestamp), 0).UTC()
if err != nil {
return Message{}, err
}
signal, err := strconv.ParseFloat(fields[3], 64)
if err != nil {
return Message{}, err
}
return Message{serial: fields[0], t: t, data: fields[2], signal: signal}, nil
}
func ensureFileIsOpen(filesMap *FilesMap, path string) error {
(*filesMap).mux.RLock()
_, entryExists := (*filesMap).daMap[path]
(*filesMap).mux.RUnlock()
if entryExists {
return nil
}
err := os.MkdirAll(path, 0744)
if err != nil {
return err
}
file, err := os.OpenFile(path+"/data", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
(*filesMap).mux.Lock()
mapIsHeavy := len((*filesMap).daMap) > 4000
if mapIsHeavy {
closeFiles(filesMap)
}
(*filesMap).daMap[path] = file
(*filesMap).mux.Unlock()
return nil
}
func closeFiles(filesMap *FilesMap) error {
for k, v := range (*filesMap).daMap {
err := v.Close()
delete((*filesMap).daMap, k)
if err != nil {
return err
}
}
return nil
}
func processFile(filesMap *FilesMap, path string) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if "" == line {
continue
}
msg, err := readMessage(line)
if err != nil {
return err
}
path := fmt.Sprintf(
"./%02d/%02d/%02d/%02d",
msg.t.Year(),
msg.t.Month(),
msg.t.Day(),
msg.t.Hour(),
)
err = ensureFileIsOpen(filesMap, path)
if err != nil {
return err
}
(*filesMap).mux.RLock()
fmt.Fprintf((*filesMap).daMap[path], "%s\n", line)
(*filesMap).mux.RUnlock()
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}
func main() {
filesMap := FilesMap{
daMap: make(map[string]*os.File),
}
paths := make([]string, 0)
root := "./backup"
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if root == path {
return nil
}
paths = append(paths, path)
return nil
})
var jobs sync.WaitGroup
totalSize := len(paths)
batchSize := 400
for i := int(math.Floor(float64((totalSize)/batchSize))) + 1; i > 0; i-- {
for k := int(math.Min(float64(batchSize*i), float64(totalSize))) - 1; k >= batchSize*(i-1); k-- {
jobs.Add(1)
go func(i int) {
fmt.Printf("Processing: %s # %d (%d/%d)\n", paths[k], i, k, totalSize)
err = processFile(&filesMap, paths[k])
if err != nil {
fmt.Println(fmt.Errorf("error during my impressive processing : %v", err))
}
jobs.Done()
}(i)
jobs.Wait()
}
}
closeFiles(&filesMap)
if err != nil {
fmt.Printf("Got error: %s\n", err)
log.Fatal(err)
panic(err)
}
}
@nfroidure
Copy link
Author

go run bin/explode.go 92.12s user 94.08s system 99% cpu 3:06.50 total with go routines
go run bin/explode.go 99.40s user 114.79s system 99% cpu 3:34.78 total without

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment