Skip to content

Instantly share code, notes, and snippets.

@rafrombrc
Forked from chancez/go-ircevent_panic
Last active August 29, 2015 14:03
Show Gist options
  • Save rafrombrc/52d610be2ab1bd10841c to your computer and use it in GitHub Desktop.
Save rafrombrc/52d610be2ab1bd10841c to your computer and use it in GitHub Desktop.
panic: runtime error: send on closed channel
goroutine 26 [running]:
runtime.panic(0x854940, 0x1017efe)
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6
github.com/thoj/go-ircevent.(*Connection).Privmsg(0xc210050640, 0x0, 0x0, 0x0, 0x0)
/heka/build/heka/src/github.com/thoj/go-ircevent/irc.go:226 +0x13f
github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Privmsg(0xc210090320, 0x0, 0x0, 0x0, 0x0, ...)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:102 +0xab
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0, 0x0, 0x0, ...)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:144 +0x49
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6
package irc
import (
"crypto/tls"
"errors"
"fmt"
"github.com/mozilla-services/heka/pipeline"
"github.com/mozilla-services/heka/plugins/tcp"
"github.com/thoj/go-ircevent"
"sync/atomic"
"time"
)
type IRCOutputConfig struct {
Server string `toml:"server"`
Nick string `toml:"nick"`
Ident string `toml:"ident"`
Password string `toml:"password"`
Channels []string `toml:"channels"`
UseTLS bool `toml:"use_tls"`
// Subsection for TLS configuration.
Tls tcp.TlsConfig
// Should we join and part an irc channel between sending messages?
JoinAndPart bool `toml:"join_and_part"`
// This controls the size of the OutQueue and Backlog queue for messages.
QueueSize int `toml:"queue_size"`
RejoinOnKick bool `toml:"rejoin_on_kick"`
// Default interval at which IRC messages will be sent is minimum of 2
// seconds between messages.
TickerInterval uint `toml:"ticker_interval"`
}
type IrcMsgQueue chan IrcMsg
type IRCOutput struct {
*IRCOutputConfig
Conn *irc.Connection
OutQueue IrcMsgQueue
BacklogQueues []IrcMsgQueue
JoinedChannels []int32
}
type IrcMsg struct {
Output []byte
IrcChannel string
Idx int
}
const (
// These are replies from the IRC Server
CONNECTED = "001"
ERROR = "ERROR" // This is what we get on a disconnect
QUIT = "QUIT"
PART = "PART"
KICK = "KICK"
IRC_RPL_ENDOFNAMES = "366"
// These are to track our JoinedChannels slice of joined/not joined
NOTJOINED = 0
JOINED = 1
)
func (output *IRCOutput) ConfigStruct() interface{} {
return &IRCOutputConfig{
Server: "irc.freenode.net",
Nick: "heka_bot",
Ident: "heka",
Channels: []string{"#heka_bot"},
QueueSize: 100,
TickerInterval: uint(2),
}
}
// NewIRCConn creates an *irc.Connection. It handles using Heka's tcp plugin to
// create a cryto/tls config
func NewIRCConn(config *IRCOutputConfig) (*irc.Connection, error) {
conn := irc.IRC(config.Nick, config.Ident)
if conn == nil {
return nil, errors.New("Nick or Ident cannot be blank")
}
if config.Server == "" {
return nil, errors.New("IRC server cannot be blank.")
}
if len(config.Channels) < 1 {
return nil, errors.New("Need at least 1 channel to join.")
}
var tlsConf *tls.Config = nil
var err error = nil
if tlsConf, err = tcp.CreateGoTlsConfig(&config.Tls); err != nil {
return nil, fmt.Errorf("TLS init error: %s", err)
}
conn.UseTLS = config.UseTLS
conn.TLSConfig = tlsConf
return conn, nil
}
// Privmsg wraps the irc.Privmsg by accepting an ircMsg struct, and checking if
// we've joined a channel before trying to send a message to it. Returns whether
// or not the message was successfully sent.
func (output *IRCOutput) Privmsg(ircMsg IrcMsg) bool {
idx := ircMsg.Idx
if atomic.LoadInt32(&output.JoinedChannels[idx]) == JOINED {
output.Conn.Privmsg(ircMsg.IrcChannel, string(ircMsg.Output))
} else {
return false
}
if output.JoinAndPart {
// Leave the channel if we're configured to part after sending messages.
output.Conn.Part(ircMsg.IrcChannel)
}
return true
}
// UpdateJoinList atomically updates our global slice of joined channels for a
// particular irc channel. It sets the irc channel's joined status to 'status'.
// Returns whether or not it found the IRC Channel in our slice.
func UpdateJoinList(output *IRCOutput, ircChan string, status int32) bool {
for i, channel := range output.Channels {
if ircChan == channel {
// Update if we have or haven't joined the channel
atomic.StoreInt32(&output.JoinedChannels[i], status)
return true
}
}
return false
}
// UpdateJoinListAll sets the status of all IRC Channels in our config to
// 'status'
func UpdateJoinListAll(output *IRCOutput, status int32) {
for channel := range output.Channels {
atomic.StoreInt32(&output.JoinedChannels[channel], status)
}
}
// SendFromOutQueue attempts to send a message to the IRC Channel specified in
// the ircMsg struct. If sending fails due to not being in the IRC channel, it
// will put the message into that IRC Channel's backlog queue. If the queue is
// full it will drop the message and log an error.
// It returns whether or not a message was successfully delivered to an
// IRC channel.
func SendFromOutQueue(output *IRCOutput, runner pipeline.OutputRunner,
ircMsg IrcMsg) bool {
if output.Privmsg(ircMsg) {
return true
} else {
// We haven't joined this channel yet, so we need to send
// the message to the backlog queue of messages
// Get the proper Channel for the backlog
idx := ircMsg.Idx
backlogQueue := output.BacklogQueues[idx]
select {
// try to put the message into the backlog queue
case backlogQueue <- ircMsg:
// Just putting
default:
// Failed to put, which means the backlog for this IRC
// channel is full. So drop it and log a message.
runner.LogError(fmt.Errorf("backlog queue for "+
"IRC Channel %s, full. Dropping message.",
ircMsg.IrcChannel))
}
return false
}
return false
}
// SendFromBacklogQueue attempts to send a message from the first backlog queue
// which has a message in it. It returns whether or not a message was
// successfully delivered to an IRC channel.
func SendFromBacklogQueue(output *IRCOutput, runner pipeline.OutputRunner,
ircMsg IrcMsg) bool {
// No messages in the out queue, so lets try the backlog queue
for i, queue := range output.BacklogQueues {
if atomic.LoadInt32(&output.JoinedChannels[i]) != JOINED {
continue
}
select {
case ircMsg = <-queue:
if output.Privmsg(ircMsg) {
return true
}
default:
// No backed up messages for this IRC Channel
}
}
return false
}
// ProcessOutQueue attempts to send an IRC message from the OutQueue, or the
// BacklogQueue if nothing is in the OutQueue. It is throttled by a ticker to
// prevent flooding the IRC server.
func ProcessOutQueue(output *IRCOutput, runner pipeline.OutputRunner) {
var delivered bool
var ircMsg ircMsg
ok := true
// ticker := runner.Ticker()
for ok {
delivered = false
// <-ticker
select {
case ircMsg, ok = <-output.OutQueue:
if !ok {
// We havent actually delivered but we want to escape that
// loop
delivered = true
break
}
delivered = SendFromOutQueue(output, runner, ircMsg)
default:
// Nothing
}
if !delivered {
SendFromBacklogQueue(...)
}
}
for _, queue := range output.BacklogQueues {
close(queue)
}
for _, queue := range output.BacklogQueues {
// drain
}
}
// RegisterCallbacks sets up all the event handler callbacks for recieving
// particular irc events.
func RegisterCallbacks(output *IRCOutput, runner pipeline.OutputRunner) {
// add a callback to check if we've gotten successfully connected
output.Conn.AddCallback(CONNECTED, func(event *irc.Event) {
for _, ircChan := range output.Channels {
// Only join on connect if we aren't going to join whenever we send
// a message
if !output.JoinAndPart {
output.Conn.Join(ircChan)
}
}
})
// Once we've recieved the names list, we've successfully joined the channel
// And should begin processing Heka messages
output.Conn.AddCallback(IRC_RPL_ENDOFNAMES, func(event *irc.Event) {
// This is the actual IRC Channel name (ie: #heka)
ircChan := event.Arguments[1]
UpdateJoinList(output, ircChan, JOINED)
})
// We want to handle errors (disconnects) ourself.
output.Conn.ClearCallback(ERROR)
output.Conn.AddCallback(ERROR, func(event *irc.Event) {
UpdateJoinListAll(output, NOTJOINED)
runner.LogMessage("Disconnected from IRC. Retrying to connect in 3 seconds..")
time.Sleep(3 * time.Second)
err := output.Conn.Reconnect()
if err != nil {
runner.LogError(fmt.Errorf("Error reconnecting:", err))
output.Conn.Quit()
}
runner.LogMessage("Reconnected to IRC!")
})
output.Conn.AddCallback(KICK, func(event *irc.Event) {
ircChan := event.Arguments[0]
UpdateJoinList(output, ircChan, NOTJOINED)
if output.RejoinOnKick {
output.Conn.Join(ircChan)
}
})
// These next 2 events shouldn't really matter much, but we should update
// the JoinList anyways.
output.Conn.AddCallback(QUIT, func(event *irc.Event) {
UpdateJoinListAll(output, NOTJOINED)
})
output.Conn.AddCallback(PART, func(event *irc.Event) {
ircChan := event.Arguments[1]
UpdateJoinList(output, ircChan, NOTJOINED)
})
}
func (output *IRCOutput) Init(config interface{}) error {
conf := config.(*IRCOutputConfig)
output.IRCOutputConfig = conf
conn, err := NewIRCConn(conf)
if err != nil {
return fmt.Errorf("Error setting up IRC Connection: %s", err)
}
output.Conn = conn
// Create our chans for passing messages from the main runner InChan to
// the irc channels
numChannels := len(output.Channels)
output.JoinedChannels = make([]int32, numChannels)
output.OutQueue = make(IrcMsgQueue, output.QueueSize)
output.BacklogQueues = make([]IrcMsgQueue, numChannels)
for queue := range output.BacklogQueues {
output.BacklogQueues[queue] = make(IrcMsgQueue, output.QueueSize)
}
return nil
}
func (output *IRCOutput) Run(runner pipeline.OutputRunner,
helper pipeline.PluginHelper) error {
if runner.Encoder() == nil {
return errors.New("Encoder required.")
}
// Register callbacks to handle events
RegisterCallbacks(output, runner)
var err error
// Connect to the IRC Server
err = output.Conn.Connect(output.Server)
if err != nil {
return fmt.Errorf("Unable to connect to irc server %s: %s",
output.Server, err)
}
// Start a goroutine for recieving messages, and throttling before sending
// to the IRC Server
go ProcessOutQueue(output, runner)
var outgoing []byte
for pack := range runner.InChan() {
outgoing, err = runner.Encode(pack)
if err != nil {
runner.LogError(err)
}
// Send the message to each IRC Channel.
// If the out queue is full, then we need to drop the message and log
// an error.
for i, ircChannel := range output.Channels {
ircMsg := IrcMsg{outgoing, ircChannel, i}
select {
case output.OutQueue <- ircMsg:
if output.JoinAndPart {
// We wont have joined on connect in this case.
output.Conn.Join(ircChannel)
}
default:
runner.LogError(errors.New("Dropped message. " +
"irc_output OutQueue is full."))
}
}
pack.Recycle()
}
close(output.OutQueue)
output.Conn.ClearCallback(ERROR)
output.Conn.Quit()
output.Conn.Disconnect()
return nil
}
func init() {
pipeline.RegisterPlugin("IRCOutput", func() interface{} {
return new(IRCOutput)
})
}
panic: runtime error: send on closed channel
goroutine 24 [running]:
runtime.panic(0x854940, 0x1017efe)
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0, 0x0, 0x0, ...)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:155 +0xbf
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment