Created
February 2, 2017 19:03
-
-
Save degemer/0ff74cc1c14f45a8b2c2f5a33be23b75 to your computer and use it in GitHub Desktop.
Forwarder 5.10.1 ipv4 only
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/opt/datadog-agent/embedded/bin/python | |
''' | |
Datadog | |
www.datadoghq.com | |
---- | |
Make sense of your IT Data | |
Licensed under Simplified BSD License (see LICENSE) | |
(C) Boxed Ice 2010 all rights reserved | |
(C) Datadog, Inc. 2010-2016 all rights reserved | |
''' | |
# set up logging before importing any other components | |
from config import initialize_logging # noqa | |
initialize_logging('forwarder') | |
# stdlib | |
import copy | |
from datetime import timedelta | |
import logging | |
import os | |
from Queue import Full, Queue | |
from socket import error as socket_error, gaierror | |
import sys | |
import threading | |
import zlib | |
# For pickle & PID files, see issue 293 | |
os.umask(022) | |
# 3p | |
import simplejson as json | |
try: | |
import pycurl | |
except ImportError: | |
# For the source install, pycurl might not be installed | |
pycurl = None | |
from tornado.escape import json_decode | |
import tornado.httpclient | |
import tornado.httpserver | |
import tornado.ioloop | |
from tornado.options import define, options, parse_command_line | |
import tornado.web | |
# project | |
from checks.check_status import ForwarderStatus | |
from config import ( | |
get_config, | |
get_logging_config, | |
get_url_endpoint, | |
get_version | |
) | |
import modules | |
from transaction import Transaction, TransactionManager | |
from util import ( | |
get_uuid, | |
Watchdog, | |
) | |
from utils.hostname import get_hostname | |
from utils.logger import RedactedLogRecord | |
logging.LogRecord = RedactedLogRecord | |
log = logging.getLogger('forwarder') | |
log.setLevel(get_logging_config()['log_level'] or logging.INFO) | |
DD_ENDPOINT = "dd_url" | |
# Transactions | |
TRANSACTION_FLUSH_INTERVAL = 5000 # Every 5 seconds | |
# Watchdog settings | |
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval | |
WATCHDOG_HIGH_ACTIVITY_THRESHOLD = 1000 # Threshold to detect pathological activity | |
# Misc | |
HEADERS_TO_REMOVE = [ | |
'Host', | |
'Content-Length', | |
] | |
# Maximum delay before replaying a transaction | |
MAX_WAIT_FOR_REPLAY = timedelta(seconds=90) | |
# Maximum queue size in bytes (when this is reached, old messages are dropped) | |
MAX_QUEUE_SIZE = 30 * 1024 * 1024 # 30MB | |
# Some responses should be rejected, rather than replayed. This list will be rejected. | |
RESPONSES_TO_REJECT = [413, 400] | |
THROTTLING_DELAY = timedelta(microseconds=1000000 / 2) # 2 msg/second | |
class EmitterThread(threading.Thread): | |
def __init__(self, *args, **kwargs): | |
self.__name = kwargs['name'] | |
self.__emitter = kwargs.pop('emitter')() | |
self.__logger = kwargs.pop('logger') | |
self.__config = kwargs.pop('config') | |
self.__max_queue_size = kwargs.pop('max_queue_size', 100) | |
self.__queue = Queue(self.__max_queue_size) | |
threading.Thread.__init__(self, *args, **kwargs) | |
self.daemon = True | |
def run(self): | |
while True: | |
(data, headers) = self.__queue.get() | |
try: | |
self.__logger.debug('Emitter %r handling a packet', self.__name) | |
self.__emitter(data, self.__logger, self.__config) | |
except Exception: | |
self.__logger.error('Failure during operation of emitter %r', self.__name, exc_info=True) | |
def enqueue(self, data, headers): | |
try: | |
self.__queue.put((data, headers), block=False) | |
except Full: | |
self.__logger.warn('Dropping packet for %r due to backlog', self.__name) | |
class EmitterManager(object): | |
"""Track custom emitters""" | |
def __init__(self, config): | |
self.agentConfig = config | |
self.emitterThreads = [] | |
for emitter_spec in [s.strip() for s in self.agentConfig.get('custom_emitters', '').split(',')]: | |
if len(emitter_spec) == 0: | |
continue | |
logging.info('Setting up custom emitter %r', emitter_spec) | |
try: | |
thread = EmitterThread( | |
name=emitter_spec, | |
emitter=modules.load(emitter_spec, 'emitter'), | |
logger=logging, | |
config=config, | |
) | |
thread.start() | |
self.emitterThreads.append(thread) | |
except Exception: | |
logging.error('Unable to start thread for emitter: %r', emitter_spec, exc_info=True) | |
logging.info('Done with custom emitters') | |
def send(self, data, headers=None): | |
if not self.emitterThreads: | |
return # bypass decompression/decoding | |
if headers and headers.get('Content-Encoding') == 'deflate': | |
data = zlib.decompress(data) | |
data = json_decode(data) | |
for emitterThread in self.emitterThreads: | |
logging.info('Queueing for emitter %r', emitterThread.name) | |
emitterThread.enqueue(data, headers) | |
class AgentTransaction(Transaction): | |
_application = None | |
_trManager = None | |
_endpoints = {} | |
_emitter_manager = None | |
_type = None | |
_request_timeout = 20 | |
@classmethod | |
def set_application(cls, app): | |
cls._application = app | |
cls._emitter_manager = EmitterManager(cls._application._agentConfig) | |
@classmethod | |
def set_tr_manager(cls, manager): | |
cls._trManager = manager | |
@classmethod | |
def set_endpoints(cls, endpoints): | |
cls._endpoints = endpoints | |
@classmethod | |
def set_request_timeout(cls, request_timeout): | |
cls._request_timeout = request_timeout | |
@classmethod | |
def get_tr_manager(cls): | |
return cls._trManager | |
def __init__(self, data, headers, msg_type=""): | |
self._data = data | |
self._headers = headers | |
self._headers['DD-Forwarder-Version'] = get_version() | |
self._msg_type = msg_type | |
# Call after data has been set (size is computed in Transaction's init) | |
Transaction.__init__(self) | |
# Emitters operate outside the regular transaction framework | |
if self._emitter_manager is not None: | |
self._emitter_manager.send(data, headers) | |
# Insert the transaction(s) in the Manager | |
for endpoint in self._endpoints: | |
for api_key in self._endpoints[endpoint]: | |
transaction = copy.copy(self) | |
transaction._endpoint = endpoint | |
transaction._api_key = api_key | |
self._trManager.append(transaction) | |
log.debug("Created transaction %d" % transaction.get_id()) | |
self._trManager.flush() | |
def __sizeof__(self): | |
return sys.getsizeof(self._data) | |
def get_url(self, endpoint, api_key): | |
endpoint_base_url = get_url_endpoint(endpoint) | |
return "{0}/intake/{1}?api_key={2}".format(endpoint_base_url, self._msg_type, api_key) | |
def flush(self): | |
# Getting proxy settings | |
proxy_settings = self._application._agentConfig.get('proxy_settings', None) | |
tornado_client_params = { | |
'method': 'POST', | |
'body': self._data, | |
'headers': self._headers, | |
'validate_cert': not self._application.skip_ssl_validation, | |
'allow_ipv6': False, | |
'request_timeout': self._request_timeout, | |
} | |
# Remove headers that were passed by the emitter. Those don't apply anymore | |
# This is pretty hacky though as it should be done in pycurl or curl or tornado | |
for h in HEADERS_TO_REMOVE: | |
if h in tornado_client_params['headers']: | |
del tornado_client_params['headers'][h] | |
log.debug("Removing {0} header.".format(h)) | |
force_use_curl = False | |
if proxy_settings is not None: | |
force_use_curl = True | |
if pycurl is not None: | |
log.debug("Configuring tornado to use proxy settings: %s:****@%s:%s" % (proxy_settings['user'], | |
proxy_settings['host'], proxy_settings['port'])) | |
tornado_client_params['proxy_host'] = proxy_settings['host'] | |
tornado_client_params['proxy_port'] = proxy_settings['port'] | |
tornado_client_params['proxy_username'] = proxy_settings['user'] | |
tornado_client_params['proxy_password'] = proxy_settings['password'] | |
if self._application._agentConfig.get('proxy_forbid_method_switch'): | |
# See http://stackoverflow.com/questions/8156073/curl-violate-rfc-2616-10-3-2-and-switch-from-post-to-get | |
tornado_client_params['prepare_curl_callback'] = lambda curl: curl.setopt(pycurl.POSTREDIR, pycurl.REDIR_POST_ALL) | |
if (not self._application.use_simple_http_client or force_use_curl) and pycurl is not None: | |
ssl_certificate = self._application._agentConfig.get('ssl_certificate', None) | |
tornado_client_params['ca_certs'] = ssl_certificate | |
use_curl = force_use_curl or self._application._agentConfig.get("use_curl_http_client") and not self._application.use_simple_http_client | |
if use_curl: | |
if pycurl is None: | |
log.error("dd-agent is configured to use the Curl HTTP Client, but pycurl is not available on this system.") | |
else: | |
log.debug("Using CurlAsyncHTTPClient") | |
tornado.httpclient.AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient") | |
else: | |
log.debug("Using SimpleHTTPClient") | |
http = tornado.httpclient.AsyncHTTPClient() | |
url = self.get_url(self._endpoint, self._api_key) | |
log.debug( | |
u"Sending %s to endpoint %s at %s", | |
self._type, self._endpoint, url | |
) | |
req = tornado.httpclient.HTTPRequest(url=url, **tornado_client_params) | |
http.fetch(req, callback=self.on_response) | |
def on_response(self, response): | |
if response.error: | |
log.error("Response: %s" % response) | |
if response.code in RESPONSES_TO_REJECT: | |
self._trManager.tr_error_reject_request(self) | |
else: | |
self._trManager.tr_error(self) | |
else: | |
self._trManager.tr_success(self) | |
self._trManager.flush_next() | |
class MetricTransaction(AgentTransaction): | |
_type = "metrics" | |
class APIMetricTransaction(MetricTransaction): | |
def get_url(self, endpoint, api_key): | |
endpoint_base_url = get_url_endpoint(endpoint) | |
return "{0}/api/v1/series/?api_key={1}".format(endpoint_base_url, api_key) | |
def get_data(self): | |
return self._data | |
class APIServiceCheckTransaction(AgentTransaction): | |
_type = "service checks" | |
def get_url(self, endpoint, api_key): | |
endpoint_base_url = get_url_endpoint(endpoint) | |
return "{0}/api/v1/check_run/?api_key={1}".format(endpoint_base_url, api_key) | |
class StatusHandler(tornado.web.RequestHandler): | |
def get(self): | |
threshold = int(self.get_argument('threshold', -1)) | |
m = MetricTransaction.get_tr_manager() | |
self.write("<table><tr><td>Id</td><td>Size</td><td>Error count</td><td>Next flush</td></tr>") | |
transactions = m.get_transactions() | |
for tr in transactions: | |
self.write("<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>" % | |
(tr.get_id(), tr.get_size(), tr.get_error_count(), tr.get_next_flush())) | |
self.write("</table>") | |
if threshold >= 0: | |
if len(transactions) > threshold: | |
self.set_status(503) | |
class AgentInputHandler(tornado.web.RequestHandler): | |
_MSG_TYPE = "" | |
def post(self): | |
"""Read the message and forward it to the intake""" | |
# read message | |
msg = self.request.body | |
headers = self.request.headers | |
msg_type = self._MSG_TYPE | |
if msg is not None: | |
# Setup a transaction for this message | |
tr = MetricTransaction(msg, headers, msg_type) | |
else: | |
raise tornado.web.HTTPError(500) | |
self.write("Transaction: %s" % tr.get_id()) | |
class MetricsAgentInputHandler(AgentInputHandler): | |
_MSG_TYPE = "metrics" | |
class MetadataAgentInputHandler(AgentInputHandler): | |
_MSG_TYPE = "metadata" | |
class ApiInputHandler(tornado.web.RequestHandler): | |
def post(self): | |
"""Read the message and forward it to the intake""" | |
# read message | |
msg = self.request.body | |
headers = self.request.headers | |
if msg is not None: | |
# Setup a transaction for this message | |
APIMetricTransaction(msg, headers) | |
else: | |
raise tornado.web.HTTPError(500) | |
class ApiCheckRunHandler(tornado.web.RequestHandler): | |
""" | |
Handler to submit Service Checks | |
""" | |
def post(self): | |
# read message | |
msg = self.request.body | |
headers = self.request.headers | |
if msg is not None: | |
# Setup a transaction for this message | |
tr = APIServiceCheckTransaction(msg, headers) | |
else: | |
raise tornado.web.HTTPError(500) | |
self.write("Transaction: %s" % tr.get_id()) | |
class Application(tornado.web.Application): | |
NO_PARALLELISM = 1 | |
DEFAULT_PARALLELISM = 5 | |
def __init__(self, port, agentConfig, watchdog=True, | |
skip_ssl_validation=False, use_simple_http_client=False): | |
self._port = int(port) | |
self._agentConfig = agentConfig | |
self._metrics = {} | |
AgentTransaction.set_application(self) | |
AgentTransaction.set_endpoints(agentConfig['endpoints']) | |
if agentConfig['endpoints'] == {}: | |
log.warning(u"No valid endpoint found. Forwarder will drop all incoming payloads.") | |
AgentTransaction.set_request_timeout(agentConfig['forwarder_timeout']) | |
max_parallelism = self.NO_PARALLELISM | |
# Multiple endpoints => enable parallelism | |
if len(agentConfig['endpoints']) > 1: | |
max_parallelism = self.DEFAULT_PARALLELISM | |
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY, | |
MAX_QUEUE_SIZE, THROTTLING_DELAY, | |
max_parallelism=max_parallelism) | |
AgentTransaction.set_tr_manager(self._tr_manager) | |
self._watchdog = None | |
self.skip_ssl_validation = skip_ssl_validation or agentConfig.get('skip_ssl_validation', False) | |
self.use_simple_http_client = use_simple_http_client | |
if self.skip_ssl_validation: | |
log.info("Skipping SSL hostname validation, useful when using a transparent proxy") | |
# Monitor activity | |
if watchdog: | |
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER / 1000 | |
self._watchdog = Watchdog( | |
watchdog_timeout, | |
max_mem_mb=agentConfig.get('limit_memory_consumption', None), | |
max_resets=WATCHDOG_HIGH_ACTIVITY_THRESHOLD | |
) | |
def log_request(self, handler): | |
""" Override the tornado logging method. | |
If everything goes well, log level is DEBUG. | |
Otherwise it's WARNING or ERROR depending on the response code. """ | |
if handler.get_status() < 400: | |
log_method = log.debug | |
elif handler.get_status() < 500: | |
log_method = log.warning | |
else: | |
log_method = log.error | |
request_time = 1000.0 * handler.request.request_time() | |
log_method( | |
u"%d %s %.2fms", | |
handler.get_status(), | |
handler._request_summary(), request_time | |
) | |
def appendMetric(self, prefix, name, host, device, ts, value): | |
if prefix in self._metrics: | |
metrics = self._metrics[prefix] | |
else: | |
metrics = {} | |
self._metrics[prefix] = metrics | |
if name in metrics: | |
metrics[name].append([host, device, ts, value]) | |
else: | |
metrics[name] = [[host, device, ts, value]] | |
def _postMetrics(self): | |
if len(self._metrics) > 0: | |
self._metrics['uuid'] = get_uuid() | |
self._metrics['internalHostname'] = get_hostname(self._agentConfig) | |
self._metrics['apiKey'] = self._agentConfig['api_key'] | |
MetricTransaction(json.dumps(self._metrics), | |
headers={'Content-Type': 'application/json'}) | |
self._metrics = {} | |
def run(self): | |
handlers = [ | |
(r"/intake/?", AgentInputHandler), | |
(r"/intake/metrics?", MetricsAgentInputHandler), | |
(r"/intake/metadata?", MetadataAgentInputHandler), | |
(r"/api/v1/series/?", ApiInputHandler), | |
(r"/api/v1/check_run/?", ApiCheckRunHandler), | |
(r"/status/?", StatusHandler), | |
] | |
settings = dict( | |
cookie_secret="12oETzKXQAGaYdkL5gEmGeJJFuYh7EQnp2XdTP1o/Vo=", | |
xsrf_cookies=False, | |
debug=False, | |
log_function=self.log_request | |
) | |
non_local_traffic = self._agentConfig.get("non_local_traffic", False) | |
tornado.web.Application.__init__(self, handlers, **settings) | |
http_server = tornado.httpserver.HTTPServer(self) | |
try: | |
# non_local_traffic must be == True to match, not just some non-false value | |
if non_local_traffic is True: | |
http_server.listen(self._port) | |
else: | |
# localhost in lieu of 127.0.0.1 to support IPv6 | |
try: | |
http_server.listen(self._port, address=self._agentConfig['bind_host']) | |
except gaierror: | |
log.warning("localhost seems undefined in your host file, using 127.0.0.1 instead") | |
http_server.listen(self._port, address="127.0.0.1") | |
except socket_error as e: | |
if "Errno 99" in str(e): | |
log.warning("IPv6 doesn't seem to be fully supported. Falling back to IPv4") | |
http_server.listen(self._port, address="127.0.0.1") | |
else: | |
raise | |
except socket_error as e: | |
log.exception("Socket error %s. Is another application listening on the same port ? Exiting", e) | |
sys.exit(1) | |
except Exception as e: | |
log.exception("Uncaught exception. Forwarder is exiting.") | |
sys.exit(1) | |
log.info("Listening on port %d" % self._port) | |
# Register callbacks | |
self.mloop = tornado.ioloop.IOLoop.current() | |
logging.getLogger().setLevel(get_logging_config()['log_level'] or logging.INFO) | |
def flush_trs(): | |
if self._watchdog: | |
self._watchdog.reset() | |
self._postMetrics() | |
self._tr_manager.flush() | |
tr_sched = tornado.ioloop.PeriodicCallback(flush_trs, TRANSACTION_FLUSH_INTERVAL, | |
io_loop=self.mloop) | |
# Register optional Graphite listener | |
gport = self._agentConfig.get("graphite_listen_port", None) | |
if gport is not None: | |
log.info("Starting graphite listener on port %s" % gport) | |
from graphite import GraphiteServer | |
gs = GraphiteServer(self, get_hostname(self._agentConfig), io_loop=self.mloop) | |
if non_local_traffic is True: | |
gs.listen(gport) | |
else: | |
gs.listen(gport, address="localhost") | |
# Start everything | |
if self._watchdog: | |
self._watchdog.reset() | |
tr_sched.start() | |
self.mloop.start() | |
log.info("Stopped") | |
def stop(self): | |
self.mloop.stop() | |
def init(skip_ssl_validation=False, use_simple_http_client=False): | |
agentConfig = get_config(parse_args=False) | |
port = agentConfig.get('listen_port', 17123) | |
if port is None: | |
port = 17123 | |
else: | |
port = int(port) | |
app = Application(port, agentConfig, skip_ssl_validation=skip_ssl_validation, use_simple_http_client=use_simple_http_client) | |
def sigterm_handler(signum, frame): | |
log.info("caught sigterm. stopping") | |
app.stop() | |
import signal | |
signal.signal(signal.SIGTERM, sigterm_handler) | |
signal.signal(signal.SIGINT, sigterm_handler) | |
return app | |
def main(): | |
# Deprecation notice | |
from utils.deprecations import deprecate_old_command_line_tools | |
deprecate_old_command_line_tools() | |
define("sslcheck", default=1, help="Verify SSL hostname, on by default") | |
define("use_simple_http_client", default=0, help="Use Tornado SimpleHTTPClient instead of CurlAsyncHTTPClient") | |
args = parse_command_line() | |
skip_ssl_validation = False | |
use_simple_http_client = False | |
if unicode(options.sslcheck) == u"0": | |
skip_ssl_validation = True | |
if unicode(options.use_simple_http_client) == u"1": | |
use_simple_http_client = True | |
# If we don't have any arguments, run the server. | |
if not args: | |
app = init(skip_ssl_validation, use_simple_http_client=use_simple_http_client) | |
try: | |
app.run() | |
except Exception: | |
log.exception("Uncaught exception in the forwarder") | |
finally: | |
ForwarderStatus.remove_latest_status() | |
else: | |
usage = "%s [help|info]. Run with no commands to start the server" % (sys.argv[0]) | |
command = args[0] | |
if command == 'info': | |
logging.getLogger().setLevel(logging.ERROR) | |
return ForwarderStatus.print_latest_status() | |
elif command == 'help': | |
print usage | |
else: | |
print "Unknown command: %s" % command | |
print usage | |
return -1 | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment