Skip to content

Instantly share code, notes, and snippets.

@chancez
Last active August 29, 2015 14:03
Show Gist options
  • Save chancez/960fdae4bc34ecfff96e to your computer and use it in GitHub Desktop.
Save chancez/960fdae4bc34ecfff96e to your computer and use it in GitHub Desktop.
panic: runtime error: send on closed channel
goroutine 26 [running]:
runtime.panic(0x854940, 0x1017efe)
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6
github.com/thoj/go-ircevent.(*Connection).Privmsg(0xc210050640, 0x0, 0x0, 0x0, 0x0)
/heka/build/heka/src/github.com/thoj/go-ircevent/irc.go:226 +0x13f
github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Privmsg(0xc210090320, 0x0, 0x0, 0x0, 0x0, ...)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:102 +0xab
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0, 0x0, 0x0, ...)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:144 +0x49
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc210090320, 0x7f1195bf5060, 0xc21000fbe0)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6
package irc
import (
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/mozilla-services/heka/pipeline"
"github.com/thoj/go-ircevent"
)
// Privmsg wraps the irc.Privmsg by accepting an ircMsg struct, and checking if
// we've joined a channel before trying to send a message to it. Returns whether
// or not the message was successfully sent.
func (output *IrcOutput) Privmsg(ircMsg *IrcMsg) bool {
idx := ircMsg.Idx
if atomic.LoadInt32(&output.JoinedChannels[idx]) == JOINED {
output.Conn.Privmsg(ircMsg.IrcChannel, string(ircMsg.Output))
} else {
return false
}
if output.JoinAndPart {
// Leave the channel if we're configured to part after sending messages.
output.Conn.Part(ircMsg.IrcChannel)
}
return true
}
// updateJoinList atomically updates our global slice of joined channels for a
// particular irc channel. It sets the irc channel's joined status to 'status'.
// Returns whether or not it found the irc channel in our slice.
func updateJoinList(output *IrcOutput, ircChan string, status int32) bool {
for i, channel := range output.Channels {
if ircChan == channel {
// Update if we have or haven't joined the channel
atomic.StoreInt32(&output.JoinedChannels[i], status)
return true
}
}
return false
}
// updateJoinListAll sets the status of all irc channels in our config to
// 'status'
func updateJoinListAll(output *IrcOutput, status int32) {
for channel := range output.Channels {
atomic.StoreInt32(&output.JoinedChannels[channel], status)
}
}
// sendFromOutQueue attempts to send a message to the irc channel specified in
// the ircMsg struct. If sending fails due to not being in the irc channel, it
// will put the message into that irc channel's backlog queue. If the queue is
// full it will drop the message and log an error.
// It returns whether or not a message was successfully delivered to an
// irc channel.
func sendFromOutQueue(output *IrcOutput, runner pipeline.OutputRunner,
ircMsg *IrcMsg) bool {
if output.Privmsg(ircMsg) {
return true
} else {
// We haven't joined this channel yet, so we need to send
// the message to the backlog queue of messages
// Get the proper Channel for the backlog
idx := ircMsg.Idx
backlogQueue := output.BacklogQueues[idx]
select {
// try to put the message into the backlog queue
case backlogQueue <- *ircMsg:
// Just putting
default:
// Failed to put, which means the backlog for this Irc
// channel is full. So drop it and log a message.
runner.LogError(fmt.Errorf("backlog queue for "+
"Irc Channel %s, full. Dropping message.",
ircMsg.IrcChannel))
}
return false
}
return false
}
// sendFromBacklogQueue attempts to send a message from the first backlog queue
// which has a message in it. It returns whether or not a message was
// successfully delivered to an irc channel.
func sendFromBacklogQueue(output *IrcOutput, runner pipeline.OutputRunner) bool {
var ircMsg IrcMsg
// No messages in the out queue, so lets try the backlog queue
for i, queue := range output.BacklogQueues {
if atomic.LoadInt32(&output.JoinedChannels[i]) != JOINED {
continue
}
select {
case ircMsg = <-queue:
if output.Privmsg(&ircMsg) {
return true
}
default:
// No backed up messages for this irc channel
}
}
return false
}
// processOutQueue attempts to send an Irc message from the OutQueue, or the
// BacklogQueue if nothing is in the OutQueue. It is throttled by a ticker to
// prevent flooding the Irc server.
func processOutQueue(output *IrcOutput, runner pipeline.OutputRunner) {
var delivered bool
var ircMsg IrcMsg
ok := true
ticker := runner.Ticker()
for ok {
delivered = false
<-ticker
select {
case ircMsg, ok = <-output.OutQueue:
if !ok {
// We havent actually delivered but we want to escape that
// loop.
delivered = true
// Time to cleanup, and close our chans
break
}
delivered = sendFromOutQueue(output, runner, &ircMsg)
default:
// Just here to prevent blocking
}
if !delivered {
sendFromBacklogQueue(output, runner)
}
}
// Cleanup heka
for _, queue := range output.BacklogQueues {
close(queue)
}
// Try to send the rest of our msgs in the backlog before quitting.
for _, queue := range output.BacklogQueues {
for msg := range queue {
output.Privmsg(&msg)
}
}
// Once we have no messages left, we can quit
output.Conn.Quit()
output.Conn.Disconnect()
}
// registerCallbacks sets up all the event handler callbacks for recieving
// particular irc events.
func registerCallbacks(output *IrcOutput, runner pipeline.OutputRunner) {
// add a callback to check if we've gotten successfully connected
output.Conn.AddCallback(CONNECTED, func(event *irc.Event) {
for _, ircChan := range output.Channels {
// Only join on connect if we aren't going to join whenever we send
// a message
if !output.JoinAndPart {
output.Conn.Join(ircChan)
}
}
})
// Once we've recieved the names list, we've successfully joined the channel
// And should begin processing Heka messages
output.Conn.AddCallback(IRC_RPL_ENDOFNAMES, func(event *irc.Event) {
// This is the actual irc channel name (ie: #heka)
ircChan := event.Arguments[1]
updateJoinList(output, ircChan, JOINED)
})
// We want to handle errors (disconnects) ourself.
output.Conn.ClearCallback(ERROR)
output.Conn.AddCallback(ERROR, func(event *irc.Event) {
updateJoinListAll(output, NOTJOINED)
runner.LogMessage("Disconnected from Irc. Retrying to connect in 3 seconds..")
time.Sleep(3 * time.Second)
err := output.Conn.Reconnect()
if err != nil {
runner.LogError(fmt.Errorf("Error reconnecting:", err))
output.Conn.Quit()
}
runner.LogMessage("Reconnected to Irc!")
})
output.Conn.AddCallback(KICK, func(event *irc.Event) {
ircChan := event.Arguments[0]
updateJoinList(output, ircChan, NOTJOINED)
if output.RejoinOnKick {
output.Conn.Join(ircChan)
}
})
// These next 2 events shouldn't really matter much, but we should update
// the JoinList anyways.
output.Conn.AddCallback(QUIT, func(event *irc.Event) {
updateJoinListAll(output, NOTJOINED)
})
output.Conn.AddCallback(PART, func(event *irc.Event) {
ircChan := event.Arguments[1]
updateJoinList(output, ircChan, NOTJOINED)
})
}
func (output *IrcOutput) Init(config interface{}) error {
conf := config.(*IrcOutputConfig)
output.IrcOutputConfig = conf
conn, err := output.InitIrcCon(conf)
if err != nil {
return fmt.Errorf("Error setting up Irc Connection: %s", err)
}
output.Conn = conn
// Create our chans for passing messages from the main runner InChan to
// the irc channels
numChannels := len(output.Channels)
output.JoinedChannels = make([]int32, numChannels)
output.OutQueue = make(IrcMsgQueue, output.QueueSize)
output.BacklogQueues = make([]IrcMsgQueue, numChannels)
for queue := range output.BacklogQueues {
output.BacklogQueues[queue] = make(IrcMsgQueue, output.QueueSize)
}
return nil
}
func (output *IrcOutput) Run(runner pipeline.OutputRunner,
helper pipeline.PluginHelper) error {
if runner.Encoder() == nil {
return errors.New("Encoder required.")
}
// Register callbacks to handle events
registerCallbacks(output, runner)
var err error
// Connect to the Irc Server
err = output.Conn.Connect(output.Server)
if err != nil {
return fmt.Errorf("Unable to connect to irc server %s: %s",
output.Server, err)
}
// Start a goroutine for recieving messages, and throttling before sending
// to the Irc Server
go processOutQueue(output, runner)
var outgoing []byte
for pack := range runner.InChan() {
outgoing, err = runner.Encode(pack)
if err != nil {
runner.LogError(err)
}
// Send the message to each irc channel.
// If the out queue is full, then we need to drop the message and log
// an error.
for i, ircChannel := range output.Channels {
ircMsg := IrcMsg{outgoing, ircChannel, i}
select {
case output.OutQueue <- ircMsg:
if output.JoinAndPart {
// We wont have joined on connect in this case.
output.Conn.Join(ircChannel)
}
default:
runner.LogError(errors.New("Dropped message. " +
"irc_output OutQueue is full."))
}
}
pack.Recycle()
}
close(output.OutQueue)
output.Conn.ClearCallback(ERROR)
return nil
}
func init() {
pipeline.RegisterPlugin("IrcOutput", func() interface{} {
output := new(IrcOutput)
output.InitIrcCon = NewIrcConn
return output
})
}
panic: runtime error: send on closed channel
goroutine 24 [running]:
runtime.panic(0x854940, 0x1017efe)
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6
github.com/mozilla-services/heka/plugins/irc.SendFromOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0, 0x0, 0x0, ...)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:155 +0xbf
github.com/mozilla-services/heka/plugins/irc.ProcessOutQueue(0xc2100a4410, 0x7fb519a51340, 0xc21000f0a0)
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:205 +0xae
created by github.com/mozilla-services/heka/plugins/irc.(*IRCOutput).Run
/heka/build/heka/src/github.com/mozilla-services/heka/plugins/irc/irc_output.go:312 +0x2e6
[100%] Built target flood
root@bcb1a1938d83:/heka/build# hekad -config /heka/config.toml
2014/07/08 22:32:43 Pre-loading: [Dashboard]
2014/07/08 22:32:43 Pre-loading: [CounterFilter]
2014/07/08 22:32:43 Pre-loading: [PayloadEncoder]
2014/07/08 22:32:43 Pre-loading: [test]
2014/07/08 22:32:43 Pre-loading: [TcpInput]
2014/07/08 22:32:43 Pre-loading: [StatAccumInput]
2014/07/08 22:32:43 Pre-loading: [LogOutput]
2014/07/08 22:32:43 Pre-loading: [IRCOutput]
2014/07/08 22:32:43 Pre-loading: [ProtobufDecoder]
2014/07/08 22:32:43 Pre-loading: [ProtobufEncoder]
2014/07/08 22:32:43 Loading: [test]
2014/07/08 22:32:43 Loading: [ProtobufDecoder]
2014/07/08 22:32:43 Loading: [PayloadEncoder]
2014/07/08 22:32:43 Loading: [ProtobufEncoder]
2014/07/08 22:32:43 Loading: [TcpInput]
2014/07/08 22:32:43 Loading: [StatAccumInput]
2014/07/08 22:32:43 Loading: [CounterFilter]
2014/07/08 22:32:43 Loading: [Dashboard]
2014/07/08 22:32:44 Loading: [LogOutput]
2014/07/08 22:32:44 Loading: [IRCOutput]
2014/07/08 22:32:44 Starting hekad...
2014/07/08 22:32:44 Output started: Dashboard
2014/07/08 22:32:44 Output started: LogOutput
2014/07/08 22:32:44 Output started: IRCOutput
2014/07/08 22:32:44 Filter started: CounterFilter
2014/07/08 22:32:44 MessageRouter started.
2014/07/08 22:32:44 Input started: TcpInput
2014/07/08 22:32:44 Input started: StatAccumInput
2014/07/08 22:32:44 Connected to irc.mozilla.org:6667 (63.245.216.214:6667)
Before okay: true
Okay1: false
Okay2: false
Sending from Backlog queue
Okay3: false
Out of loop
2014/07/08 22:32:46 Got 3 messages. 1.51 msg/sec
2014/07/08 22:32:47 Got 6 messages. 2.99 msg/sec
2014/07/08 22:32:48 Got 9 messages. 3.00 msg/sec
^C2014/07/08 22:32:49 Shutdown initiated.
2014/07/08 22:32:49 Stop message sent to input 'TcpInput'
2014/07/08 22:32:49 Stop message sent to input 'StatAccumInput'
2014/07/08 22:32:49 Input 'TcpInput': stopped
2014/07/08 22:32:49 Input 'StatAccumInput': stopped
2014/07/08 22:32:49 Waiting for decoders shutdown
2014/07/08 22:32:49 Decoders shutdown complete
2014/07/08 22:32:49 Stop message sent to filter 'CounterFilter'
2014/07/08 22:32:49 Plugin 'CounterFilter': stopped
2014/07/08 22:32:49 Stop message sent to output 'Dashboard'
2014/07/08 22:32:49 Plugin 'Dashboard': stopped
2014/07/08 22:32:49 Stop message sent to output 'LogOutput'
2014/07/08 22:32:49 Plugin 'LogOutput': stopped
2014/07/08 22:32:49 Stop message sent to output 'IRCOutput'
here
2014/07/08 22:32:49 Plugin 'IRCOutput': stopped
2014/07/08 22:32:49 Shutdown complete.
root@bcb1a1938d83:/heka/build#
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment