Skip to content

Instantly share code, notes, and snippets.

@leandrosilva
Created September 6, 2012 20:12
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leandrosilva/3660045 to your computer and use it in GitHub Desktop.
Save leandrosilva/3660045 to your computer and use it in GitHub Desktop.
My dumb Python script to stream Syslog messages to a Kafka server
# ==========================
# Getting Started with klogd
# ==========================
#
# Klogd is a dumb script to stream Syslog messages to a Kafka server.
#
# 1) Make sure you have all dependencies installed properly
#
# - Twisted
# - PyParsing
# - PyKafka
#
# 2) Make sure you have Kafka up and running
#
# http://incubator.apache.org/kafka/quickstart.html
#
# 3) Configure Syslog messages routing (/etc/syslog.conf on Mac OS X)
#
# *.info;authpriv,remoteauth,ftp,install,internal.none @127.0.0.1:1514
#
# 4) Re-launch Syslog deamon (on Mac OS X):
#
# $ launchctl unload /System/Library/LaunchDaemons/com.apple.syslogd.plist
# $ launchctl load /System/Library/LaunchDaemons/com.apple.syslogd.plist
#
# 5) Start klogd
#
# $ python klogd.py
#
# 6) Test
#
# $ logger -p local0.info -t test.app "bla bla bla info info info"
from pyparsing import Word, alphas, Suppress, Combine, nums, string, Optional, Regex
from twisted.internet import reactor
from twisted.internet.protocol import DatagramProtocol, Factory
from time import strftime
import json, kafka
severity = ["emerg", "alert", "crit", "err", "warn", "notice", "info", "debug",]
facility = ["kern", "user", "mail", "daemon", "auth", "syslog", "lpr", "news",
"uucp", "cron", "authpriv", "ftp", "ntp", "audit", "alert", "at", "local0",
"local1", "local2", "local3", "local4", "local5", "local6", "local7",]
class Parser(object):
def __init__(self):
self.__pattern = self.__build_pattern()
def __build_pattern(self):
ints = Word(nums)
# priority
priority = Suppress("<") + ints + Suppress(">")
# timestamp
month = Word(string.uppercase, string.lowercase, exact=3)
day = ints
hour = Combine(ints + ":" + ints + ":" + ints)
timestamp = month + day + hour
# hostname
hostname = Word(alphas + nums + "_" + "-" + ".")
# appname
appname = Word(alphas + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress(":")
# message
message = Regex(".*")
# pattern build
return priority + timestamp + hostname + appname + message
def parse(self, line, (host, port)):
parsed_line = self.__pattern.parseString(line)
_priority = parsed_line[0]
(_facility, _severity) = self.__get_level(_priority)
payload = {}
payload["priority"] = _priority
payload["facility"] = _facility
payload["severity"] = _severity
payload["timestamp"] = strftime("%Y-%m-%d %H:%M:%S")
payload["hostname"] = parsed_line[4]
payload["hostaddr"] = host
payload["hostport"] = port
payload["appname"] = parsed_line[5]
payload["pid"] = parsed_line[6]
payload["message"] = parsed_line[7]
return json.dumps(payload)
def __get_level(self, priority):
_priority = int(priority)
_facility = _priority / 8
_severity = _priority % 8
return (facility[_facility], severity[_severity])
class Receiver(DatagramProtocol):
def __init__(self):
self.__parser = Parser()
def datagramReceived(self, data, (host, port)):
payload = self.__parser.parse(data, (host, port))
self.__send_to_kafka(payload)
def __send_to_kafka(self, payload):
producer = kafka.producer.Producer('klog', host="127.0.0.1", port=9092)
message = kafka.message.Message(payload)
producer.send(message)
# Let's kick off
def main():
print "Listening UDP on port 1514"
reactor.listenUDP(1514, Receiver())
reactor.run()
if __name__ == "__main__":
main()
#
# Consumes klog topic from Kafka, for test purpose.
#
import kafka
consumer = kafka.consumer.Consumer("klog")
for message in consumer.loop():
print "received:", message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment