Skip to content

Instantly share code, notes, and snippets.

@miku
Last active February 1, 2016 21:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miku/4f4eef1e58995515d23b to your computer and use it in GitHub Desktop.
Save miku/4f4eef1e58995515d23b to your computer and use it in GitHub Desktop.
Irrational Writers
package main
import (
"bufio"
"bytes"
"io"
"log"
"strings"
"github.com/miku/esbulk"
)
type eswriter struct {
opts esbulk.Options
buf bytes.Buffer
}
func NewElasticsearchWriter(opts esbulk.Options) *eswriter {
return &eswriter{opts: opts}
}
func (w *eswriter) Write(p []byte) (int, error) {
return w.buf.Write(p)
}
func (w *eswriter) Close() error {
br := bufio.NewReader(&w.buf)
var docs []string
for {
line, err := br.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
line = strings.TrimSpace(line)
if line == "" {
continue
}
docs = append(docs, line)
}
if err := esbulk.BulkIndex(docs, w.opts); err != nil {
return err
}
return nil
}
func main() {
opts := esbulk.Options{Host: "localhost", Port: 9200}
w := NewElasticsearchWriter(opts)
docs := `
{"name": "alice"}
{"name": "bob"}
{"name": "yella"}
`
if _, err := io.WriteString(w, docs); err != nil {
log.Fatal(err)
}
if err := w.Close(); err != nil {
log.Fatal(err)
}
}
package main
import "io"
// Exister is for things, that support checks like on a check mark.
type Exister interface {
Exists() bool
}
// Target can be anything, but usually it is some sort of file.
type Target interface {
io.ReadWriteCloser
Exister
}
// TaskMap holds a set of tasks, indexed by a string key.
type TaskMap map[string]Task
// Requirer can be implemented by tasks, which need other tasks.
type Requirer interface {
Requires() TaskMap
}
// ExternalTask is something, that does not need to be created.
type ExternalTask interface {
Output() Target
}
// Task has an output, which can be assembled via Run().
type Task interface {
Run() error
Output() Target
}
type ExternalFile struct {
Path string
}
func (ext ExternalFile) Output() {
return LocalTarget{Path: ext.Path}
}
type myTask ...
func (t myTask) Requires() TaskMap {
return TaskMap{"p": ExternalFile{Path: "/etc/passwd"}}
}
// Target can be local file, HDFS file, S3, redis KV, SQL rows in database, search index.
// Same access patterns.
//
// defer task.Output().Close()
// io.WriteString(task.Output(), "my output")
//
package main
import (
"bytes"
"io"
"log"
"os"
)
// rot13writer
type rot13writer struct {
w io.Writer
}
func New(w io.Writer) *rot13writer {
return &rot13writer{w: w}
}
func (w *rot13writer) Write(p []byte) (int, error) {
var buf bytes.Buffer
for _, b := range p {
if (b > 'A' && b < 'N') || (b > 'a' && b < 'n') {
if _, err := buf.Write([]byte{b + 13}); err != nil {
return 0, err
}
} else if (b > 'M' && b < '[') || (b > 'm' && b < '{') {
if _, err := buf.Write([]byte{b - 13}); err != nil {
return 0, err
}
} else {
if _, err := buf.Write([]byte{b}); err != nil {
return 0, err
}
}
}
if _, err := io.Copy(w.w, &buf); err != nil {
return 0, err
}
return len(p), nil
}
func main() {
w := New(os.Stdout)
if _, err := io.WriteString(w, "Hello World 123\n"); err != nil {
log.Fatal(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment