Skip to content

Instantly share code, notes, and snippets.

@mh-cbon
Created July 1, 2016 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mh-cbon/765e80126a12357f37889db15ed7d3a5 to your computer and use it in GitHub Desktop.
Save mh-cbon/765e80126a12357f37889db15ed7d3a5 to your computer and use it in GitHub Desktop.
poking stream in go
package main
import (
"fmt"
"os"
// "bytes"
// "encoding/base64"
"github.com/mh-cbon/stream/stream3"
)
func main () {
// src, _ := os.Open("empty")
stdin := os.Stdin
s := stream.CreateStreamReader(stdin)
// s.Pipe(stream.B64Encode(base64.StdEncoding)).Pipe(stream.CreateStreamWriter(os.Stdout))
s.Pipe(stream.GzipEncode()).Pipe(stream.GzipDecode()).Pipe(stream.CreateStreamWriter(os.Stdout))
err := s.Consume()
if err!=nil {
fmt.Println(err)
}
stdin.Close()
}
package stream
import (
"io"
// "fmt"
"bytes"
"encoding/base64"
"compress/gzip"
"github.com/mh-cbon/verbose"
)
var logger = verbose.Auto()
type DataPull func (size int) (*[]byte, error)
type DataHandler func (*[]byte) (*[]byte, error)
type DataFlusher func () (*[]byte, error)
type StreamReader struct{
size int
handler DataPull
pipes []*StreamConsumer
}
type StreamConsumer struct{
size int
handler DataHandler
flusher DataFlusher
pipes []*StreamConsumer
}
func (s *StreamReader) Consume () error {
var err error
for {
buf, err := s.handler(s.size)
if len(*buf) > 0 {
for _, p := range s.pipes {
err = p.Write(buf)
if err!=nil {
break
}
}
} else {
break
}
if err!=nil {
break
}
}
for _, p := range s.pipes {
err = p.Flush()
if err!=nil {
break
}
}
if err==io.EOF {
return nil
}
return err
}
func (s *StreamReader) Pipe (d *StreamConsumer) *StreamConsumer {
s.pipes = append(s.pipes, d)
return d
}
func (s *StreamConsumer) Write(in *[]byte) (error) {
b, err := s.handler(in)
if err==nil {
for _, p := range s.pipes {
err = p.Write(b)
if err!=nil {
return err
}
}
}
return err
}
func (s *StreamConsumer) Flush() (error) {
b, err := s.flusher()
if err==nil {
for _, p := range s.pipes {
err = p.Write(b)
if err!=nil {
return err
}
}
for _, p := range s.pipes {
err = p.Flush()
if err!=nil {
return err
}
}
}
return err
}
func (s *StreamConsumer) Pipe (d *StreamConsumer) *StreamConsumer {
s.pipes = append(s.pipes, d)
return d
}
func CreateStreamReader (r io.Reader) *StreamReader {
s := StreamReader{}
s.size = 16384
buf := make([]byte, s.size)
s.handler = func (size int) (*[]byte, error) {
n, err := r.Read(buf[0:])
if err!=nil {
logger.Printf("CreateStreamReader %s", err)
}
j := buf[:n]
return &j, err
}
return &s
}
func B64Encode (enc *base64.Encoding) *StreamConsumer {
s := StreamConsumer{}
s.size = 16384
buf := new(bytes.Buffer)
encoder := base64.NewEncoder(enc, buf)
s.handler = func (b *[]byte) (*[]byte, error) {
_, err := encoder.Write(*b)
if err!=nil {
logger.Printf("B64Encode %s", err)
}
r := buf.Bytes()
buf.Truncate(0)
return &r, err
}
s.flusher = func () (*[]byte, error) {
err := encoder.Close()
if err!=nil {
logger.Printf("B64Encode flush %s", err)
}
r := buf.Bytes()
buf.Truncate(0)
return &r, err
}
return &s
}
func GzipEncode ( ) *StreamConsumer {
s := StreamConsumer{}
s.size = 16384
buf := new(bytes.Buffer)
encoder := gzip.NewWriter(buf)
s.handler = func (b *[]byte) (*[]byte, error) {
r := make([]byte, 0)
_, err := encoder.Write(*b)
if err != nil {
logger.Printf("GzipEncode Write %s", err)
return &r, err
}
err = encoder.Flush()
if err != nil {
logger.Printf("GzipEncode Flush %s", err)
return &r, err
}
r = buf.Bytes()
buf.Truncate(0)
return &r, nil
}
s.flusher = func () (*[]byte, error) {
r := buf.Bytes()
buf.Truncate(0)
err := encoder.Close()
if err!=nil {
logger.Printf("GzipEncode Close %s", err)
}
return &r, err
}
return &s
}
func GzipDecode ( ) *StreamConsumer {
s := StreamConsumer{}
s.size = 16384
buf := new(bytes.Buffer)
var decoder *gzip.Reader
s.handler = func (b *[]byte) (*[]byte, error) {
_, err := buf.Write(*b)
r := make([]byte, 0)
if err != nil {
logger.Printf("GzipDecode Write %s", err)
return &r, err
}
if decoder==nil {
decoder, err = gzip.NewReader(buf)
if err != nil {
logger.Printf("GzipDecode NewReader %s", err)
}
} else {
r = make([]byte, s.size)
decoder.Read(r)
if err != nil {
logger.Printf("GzipDecode Read %s", err)
}
}
return &r, err
}
s.flusher = func () (*[]byte, error) {
var err error
r := make([]byte, 0)
lastWrite := 0
for {
r = make([]byte, s.size)
lastWrite, err = decoder.Read(r)
buf.Write(r)
if lastWrite < 1 {
err = nil
logger.Printf("GzipDecode flush Read g %s", err, lastWrite)
break
} else if err!=nil {
logger.Printf("GzipDecode flush Read %s", err, lastWrite)
break
}
}
if err!=nil && err!=io.EOF {
logger.Printf("GzipDecode flush Read %s", err)
return &r, err
}
err = decoder.Close()
if lastWrite==0 {
err = nil
}
if err!=nil {
logger.Printf("GzipDecode flush Close %s", err)
}
r = buf.Bytes()
buf.Truncate(0)
return &r, err
}
return &s
}
func CreateStreamWriter (w io.Writer) *StreamConsumer {
s := StreamConsumer{}
b := make([]byte, 0)
s.handler = func (b *[]byte) (*[]byte, error) {
_, err := w.Write(*b)
if err!=nil {
logger.Printf("CreateStreamWriter Write %s", err)
}
return b, err
}
s.flusher = func () (*[]byte, error) {
return &b, nil
}
return &s
}
// license DWTFYW
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment