Skip to content

Instantly share code, notes, and snippets.

@ema
Created March 8, 2016 15:57
Show Gist options
  • Save ema/180f5e6ff680ee143071 to your computer and use it in GitHub Desktop.
Save ema/180f5e6ff680ee143071 to your computer and use it in GitHub Desktop.
varnishkafka prototype in python
#!/usr/bin/python
# Usage: vkncsa [conffile]
"""varnishkafka prototype using varnishncsa"""
import re
import os
import sys
import shlex
import socket
# import json
import cjson
from subprocess import Popen, PIPE
from rd_kafka import Producer
def parse_conffile(filename):
"""varnishkafka config file parser.
Returns a dictionary like this:
{
'varnish.arg.m': 'RxRequest:^(?!PURGE$)',
'varnish.arg.n': 'frontend',
[...]
}
"""
lexer = shlex.shlex(file(filename).read())
lexer.wordchars += ' .%{}-@:?!=,/^()$'
lexer.whitespace = '\t\r\n'
elems = [
token.rsplit('=') for token in lexer if re.match('[a-z]', token)]
return dict([(key.rstrip(), value.lstrip()) for key, value in elems])
def vncsa_arg(elem):
"""
Transform a varnishkafka format parameter into a (key, value) tuple. Key is
the JSON hash key, value the varnishncsa format string.
vncsa_arg('%{Referer@referer}i') -> ('"referer"', '"%{Referer}i"')
Note that if the argument includes "num", the second item won't be quoted:
vncsa_arg('%{Varnish:time_firstbyte@time_firstbyte!num?0.0}x') ->
('"time_firstbyte"', '%{Varnish:time_firstbyte}x')
"""
split_el = elem.split('@')
field = re.sub("!.*", "", split_el[1].split('}')[0])
suffix = split_el[1][-2:]
if split_el[0] == "%{":
value = "%" + suffix.replace('}', '')
else:
value = split_el[0] + split_el[1][-2:]
if '!num' not in split_el[1]:
value = '"%s"' % value
return '"%s"' % field, value
def varnishncsa_version():
p = Popen(["varnishncsa", "-V"], stdout=PIPE, stderr=PIPE)
_, out = p.communicate()
vstr = re.match("varnishncsa \(varnish-([0-9])", out).group(1)
return int(vstr)
class VarnishKafka(object):
def __init__(self, conffile='/etc/varnishkafka/webrequest.conf'):
self.hostname = socket.getfqdn()
self.config = parse_conffile(conffile)
buffering_max_messages = self.config.get(
"kafka.queue.buffering.max.messages", "100000")
kafka_config = {
"metadata.broker.list": self.config["kafka.metadata.broker.list"],
"queue.buffering.max.ms": buffering_max_messages,
}
topic_config = {
# "kafka.topic": self.config["kafka.topic"],
# XXX
}
self.producer = Producer(kafka_config)
self.producer_topic = self.producer.open_topic(
self.config["kafka.topic"], topic_config)
def produce(self, data):
self.producer_topic.produce(cjson.encode(data)) # , key="whatever")
self.producer.poll()
def build_vncsa_args(self):
version = varnishncsa_version()
vncsa_fields = []
for elem in self.config['format'].split():
if 'hostname' in elem:
continue
if 'sequence' in elem:
continue
vncsa_fields.append("%s: %s" % vncsa_arg(elem))
vncsa_string = '{' + ",".join(vncsa_fields) + '}'
args = ["varnishncsa", "-F", vncsa_string]
# In varnishncsa v4 the -m flag has been removed. -q can be used
# instead. For example:
# v3 -> -m RxRequest:^(?!PURGE$)
# v4 -> -q 'not ReqMethod eq PURGE'
if'varnish.arg.q' in self.config:
if version == 3:
raise Exception("varnish.arg.q not supported in v3")
else:
pass
# XXX args += [ "-q", self.config['varnish.arg.q'] ]
if 'varnish.arg.m' in self.config:
if version == 4:
raise Exception("varnish.arg.m not supported in v4")
else:
args += ["-m", self.config['varnish.arg.m']]
if 'varnish.arg.n' in self.config:
args += ["-n", self.config['varnish.arg.n']]
return args
def run(self):
# XXX
sequence = 0
p = Popen(self.build_vncsa_args(), stdout=PIPE, bufsize=-1)
try:
while True:
line = p.stdout.readline()
data = cjson.decode(line)
# data = json.loads(line)
data['sequence'] = sequence
data['hostname'] = self.hostname
sequence += 1
self.produce(data)
except KeyboardInterrupt:
pid, _ = os.waitpid(p.pid, 0)
print >>sys.stderr, "Exiting", pid, "items processed:", sequence
if __name__ == "__main__":
try:
vk = VarnishKafka(sys.argv[1])
except IndexError:
vk = VarnishKafka()
vk.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment