Skip to content

Instantly share code, notes, and snippets.

@theonlydoo
Created December 8, 2016 14:02
Show Gist options
  • Save theonlydoo/e876759409d8358f195ace40dedf2899 to your computer and use it in GitHub Desktop.
Save theonlydoo/e876759409d8358f195ace40dedf2899 to your computer and use it in GitHub Desktop.
flume log finder and config generator
#!/usr/bin/env python
import os, time, re, socket, getopt, sys
from jinja2 import Template
logfile=re.compile(ur'.*\.log$', re.IGNORECASE)
def finder(path, brokers):
paths={}
here=os.getcwd()
for dpath, dnames, fnames in os.walk(path):
for i, fname in enumerate([os.path.join(dpath, fname) for fname in fnames]):
if logfile.search(fname) and int(time.time()) - int(os.path.getmtime(fname)) <= 86400:
t=socket.gethostname().split('.')[0]+'/'+fname.strip('./')
t=t.replace('/', '-').replace('.', '-')
paths[t]=fname.replace('./', "/")
return paths, brokers
def agent(paths, brokers):
flume_brokers=brokers
here=os.getcwd()
readers=1
i=1
tails=[]
r=''
s=''
c=''
t=Template('''## channel configuration
a1.channels.c{{ reader }}.type = memory
a1.channels.c{{ reader }}.capacity = 100
a1.channels.c{{ reader }}.transactionCapacity = 100
# source
a1.sources.r{{ reader }}.type = exec
a1.sources.r{{ reader }}.command = tail -F {{ log }}
a1.sources.r{{ reader }}.channels = c{{ reader }}
# sink
a1.sinks.k{{ reader }}.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k{{ reader }}.topic = logs-{{ log_sanitized }}
a1.sinks.k{{ reader }}.brokerList = {{ flume_brokers }}
a1.sinks.k{{ reader }}.requiredAcks = 1
a1.sinks.k{{ reader }}.batchSize = 20
a1.sources.r{{ reader }}.channels = c{{ reader }}
a1.sinks.k{{ reader }}.channel = c{{ reader }}
''')
for topic, path in paths.iteritems():
tails.append(t.render(reader=readers, log=path, log_sanitized=topic, flume_brokers=flume_brokers))
r=r+' r%i' % readers # counting the number of sources to append in file
s=s+' k%i' % readers # counting the number of sinks
c=c+' c%i' % readers # counting the number of sinks
readers=readers+1
header=Template('''# Name the components on this agent
a1.sources = {{ sources }}
a1.sinks = {{ sinks }}
a1.channels = {{ channels }}
''')
print header.render(sources=r, sinks=s, channels=c)
for i in tails:
print i
def usage():
print """
PURPOSE :
Finds logs and configure flume to tail them and ship them to kafka brokers.
USAGE :
logfinder -b brokers -p logpath
OPTIONS :
-short, --long <value> description
-b, --brokers= <brokers> comma separated list of kafka brokers addresses and ports, e.g. : 127.0.0.1:9092,127.0.0.2:9092
-p, --path= <logs directory> where to find the logs that you which to tail
-h, --help prints help
"""
if __name__ == '__main__':
try:
opts, args = getopt.getopt(sys.argv[1:], 'b:p:h', ['brokers=', 'path=', 'help'])
if len(opts) <= 1:
usage()
sys.exit(2)
except getopt.GetoptError:
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
sys.exit(2)
elif opt in ('-b', '--brokers'):
brokers = arg
elif opt in ('-p', '--path'):
path = arg
else:
usage()
sys.exit(2)
path, brokers=finder(path, brokers)
agent(path, brokers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment