Skip to content

Instantly share code, notes, and snippets.

@alexzorin
Created December 17, 2014 02:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexzorin/38c6dfe8705aa5f72d76 to your computer and use it in GitHub Desktop.
Save alexzorin/38c6dfe8705aa5f72d76 to your computer and use it in GitHub Desktop.
// imap2heka
//
// Take things out of IMAP and emit them to heka.
// Keep track of last message id so we dont miss anything after the first run.
//
// i.e.
// $ ./imap2heka
// 2014/12/17 13:26:03 Resuming from message 26166
// 2014/12/17 13:27:09 Delivered-To: user@domain.com
// Received: by 10.170.99.195 with SMTP id q186csp1295558yka; Tue, 16 Dec 2014
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"code.google.com/p/go-imap/go1/imap"
)
var config struct {
IMAPHost string
IMAPUser string
IMAPPass string
LastCount uint32
}
type headers string
func main() {
if err := loadConfig(); err != nil {
log.Fatal(err)
}
if config.LastCount != 0 {
log.Println("Resuming from message", config.LastCount)
}
// setup imap client
cl, err := connect()
if err != nil {
log.Fatal(cl)
}
defer cl.Close(false)
msgCh := make(chan headers)
// Poll on it
go func() {
if err := poll(cl, msgCh); err != nil {
log.Fatal(err)
}
}()
// recieve shit
go func() {
for msg := range msgCh {
// I guess, parse and emit json/protobufs to heka?
// or do it raw and let heka deconstruct it
log.Println(msg)
}
}()
sigCh := make(chan os.Signal, 1)
defer close(sigCh)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
if err := saveConfig(); err != nil {
log.Fatal(err)
}
}
func connect() (*imap.Client, error) {
cl, err := imap.DialTLS(config.IMAPHost, nil)
if err != nil {
log.Fatal(err)
}
if state := cl.State(); state != imap.Login {
return nil, errors.New(fmt.Sprintf("Wanted imap.Login, got %#v", state))
}
if _, err := cl.Login(config.IMAPUser, config.IMAPPass); err != nil {
return nil, err
}
if _, err = cl.Select("INBOX", true); err != nil {
return nil, err
}
return cl, nil
}
func poll(cl *imap.Client, ch chan headers) error {
defer close(ch)
set, _ := imap.NewSeqSet("")
set.AddRange(cl.Mailbox.Messages-1, cl.Mailbox.Messages)
for {
if config.LastCount == 0 {
// will pick up any unread messages on the next new message
config.LastCount = cl.Mailbox.Messages
}
if _, err := cl.Idle(); err != nil {
return err
}
if err := cl.Recv(-1); err != nil {
return err
}
if _, err := cl.IdleTerm(); err != nil {
return err
}
set.Clear()
set.AddRange(config.LastCount+1, cl.Mailbox.Messages)
cmd, err := imap.Wait(cl.Fetch(set, "RFC822.HEADER"))
if err != nil {
log.Println(err)
continue // dont increment lastCount, get it next time around
}
for _, v := range cmd.Data {
ch <- headers(imap.AsString(v.MessageInfo().Attrs["RFC822.HEADER"]))
}
}
}
func checkOk(c *imap.Command, e error) {
if e != nil {
log.Fatal(e)
}
}
func loadConfig() error {
confFile := os.Getenv("CONF_FILE")
if confFile == "" {
confFile = "config.json"
}
f, err := os.Open(confFile)
if err != nil {
return err
}
defer f.Close()
decoder := json.NewDecoder(f)
return decoder.Decode(&config)
}
func saveConfig() error {
confFile := os.Getenv("CONF_FILE")
if confFile == "" {
confFile = "config.json"
}
f, err := os.OpenFile(confFile, os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return err
}
defer f.Close()
encoder := json.NewEncoder(f)
return encoder.Encode(&config)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment