Created
March 8, 2016 15:57
-
-
Save ema/180f5e6ff680ee143071 to your computer and use it in GitHub Desktop.
varnishkafka prototype in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Usage: vkncsa [conffile] | |
"""varnishkafka prototype using varnishncsa""" | |
import re | |
import os | |
import sys | |
import shlex | |
import socket | |
# import json | |
import cjson | |
from subprocess import Popen, PIPE | |
from rd_kafka import Producer | |
def parse_conffile(filename): | |
"""varnishkafka config file parser. | |
Returns a dictionary like this: | |
{ | |
'varnish.arg.m': 'RxRequest:^(?!PURGE$)', | |
'varnish.arg.n': 'frontend', | |
[...] | |
} | |
""" | |
lexer = shlex.shlex(file(filename).read()) | |
lexer.wordchars += ' .%{}-@:?!=,/^()$' | |
lexer.whitespace = '\t\r\n' | |
elems = [ | |
token.rsplit('=') for token in lexer if re.match('[a-z]', token)] | |
return dict([(key.rstrip(), value.lstrip()) for key, value in elems]) | |
def vncsa_arg(elem): | |
""" | |
Transform a varnishkafka format parameter into a (key, value) tuple. Key is | |
the JSON hash key, value the varnishncsa format string. | |
vncsa_arg('%{Referer@referer}i') -> ('"referer"', '"%{Referer}i"') | |
Note that if the argument includes "num", the second item won't be quoted: | |
vncsa_arg('%{Varnish:time_firstbyte@time_firstbyte!num?0.0}x') -> | |
('"time_firstbyte"', '%{Varnish:time_firstbyte}x') | |
""" | |
split_el = elem.split('@') | |
field = re.sub("!.*", "", split_el[1].split('}')[0]) | |
suffix = split_el[1][-2:] | |
if split_el[0] == "%{": | |
value = "%" + suffix.replace('}', '') | |
else: | |
value = split_el[0] + split_el[1][-2:] | |
if '!num' not in split_el[1]: | |
value = '"%s"' % value | |
return '"%s"' % field, value | |
def varnishncsa_version(): | |
p = Popen(["varnishncsa", "-V"], stdout=PIPE, stderr=PIPE) | |
_, out = p.communicate() | |
vstr = re.match("varnishncsa \(varnish-([0-9])", out).group(1) | |
return int(vstr) | |
class VarnishKafka(object): | |
def __init__(self, conffile='/etc/varnishkafka/webrequest.conf'): | |
self.hostname = socket.getfqdn() | |
self.config = parse_conffile(conffile) | |
buffering_max_messages = self.config.get( | |
"kafka.queue.buffering.max.messages", "100000") | |
kafka_config = { | |
"metadata.broker.list": self.config["kafka.metadata.broker.list"], | |
"queue.buffering.max.ms": buffering_max_messages, | |
} | |
topic_config = { | |
# "kafka.topic": self.config["kafka.topic"], | |
# XXX | |
} | |
self.producer = Producer(kafka_config) | |
self.producer_topic = self.producer.open_topic( | |
self.config["kafka.topic"], topic_config) | |
def produce(self, data): | |
self.producer_topic.produce(cjson.encode(data)) # , key="whatever") | |
self.producer.poll() | |
def build_vncsa_args(self): | |
version = varnishncsa_version() | |
vncsa_fields = [] | |
for elem in self.config['format'].split(): | |
if 'hostname' in elem: | |
continue | |
if 'sequence' in elem: | |
continue | |
vncsa_fields.append("%s: %s" % vncsa_arg(elem)) | |
vncsa_string = '{' + ",".join(vncsa_fields) + '}' | |
args = ["varnishncsa", "-F", vncsa_string] | |
# In varnishncsa v4 the -m flag has been removed. -q can be used | |
# instead. For example: | |
# v3 -> -m RxRequest:^(?!PURGE$) | |
# v4 -> -q 'not ReqMethod eq PURGE' | |
if'varnish.arg.q' in self.config: | |
if version == 3: | |
raise Exception("varnish.arg.q not supported in v3") | |
else: | |
pass | |
# XXX args += [ "-q", self.config['varnish.arg.q'] ] | |
if 'varnish.arg.m' in self.config: | |
if version == 4: | |
raise Exception("varnish.arg.m not supported in v4") | |
else: | |
args += ["-m", self.config['varnish.arg.m']] | |
if 'varnish.arg.n' in self.config: | |
args += ["-n", self.config['varnish.arg.n']] | |
return args | |
def run(self): | |
# XXX | |
sequence = 0 | |
p = Popen(self.build_vncsa_args(), stdout=PIPE, bufsize=-1) | |
try: | |
while True: | |
line = p.stdout.readline() | |
data = cjson.decode(line) | |
# data = json.loads(line) | |
data['sequence'] = sequence | |
data['hostname'] = self.hostname | |
sequence += 1 | |
self.produce(data) | |
except KeyboardInterrupt: | |
pid, _ = os.waitpid(p.pid, 0) | |
print >>sys.stderr, "Exiting", pid, "items processed:", sequence | |
if __name__ == "__main__": | |
try: | |
vk = VarnishKafka(sys.argv[1]) | |
except IndexError: | |
vk = VarnishKafka() | |
vk.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment