Skip to content

Instantly share code, notes, and snippets.

@purple4reina
Created November 18, 2022 23:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save purple4reina/525d228eda6ede52d9c75dbeff3069af to your computer and use it in GitHub Desktop.
Save purple4reina/525d228eda6ede52d9c75dbeff3069af to your computer and use it in GitHub Desktop.
Datadog agent in customer code
# This is the agent interfacer code which can be used to start/stop/flush a Datadog agent
import pkg_resources
import requests
import subprocess
import time
from ddtrace import tracer as ddtracer
class _singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(_singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Agent(metaclass=_singleton):
binary_path = pkg_resources.resource_filename(
'datadog_agent', 'includes/datadog-agent')
agent_addr = 'http://localhost:8127'
agent_ready_path = f'{agent_addr}/ready'
flush_traces_path = f'{agent_addr}/trace/flush'
flush_metrics_path = f'{agent_addr}/metrics/flush'
ready = False
tracer = ddtracer
def init(self):
subprocess.Popen([self.binary_path, 'grep', 'hello'])
def flush(self):
flushed_traces = self.flush_traces()
flushed_metrics = self.flush_metrics()
return flushed_traces and flushed_metrics
def flush_traces(self):
return self._flush_agent(self.flush_traces_path)
def flush_metrics(self):
return self._flush_agent(self.flush_metrics_path)
def _flush_agent(self, path):
flush_attempts = 3
while flush_attempts:
try:
resp = requests.post(path)
if resp.ok:
return True
except Exception:
pass
flush_attempts -= 1
if flush_attempts:
time.sleep(1)
return False
def is_ready(self):
if self.ready:
return True
try:
resp = requests.get(self.agent_ready_path)
if not resp.ok:
return False
except Exception:
return False
self.ready = resp.json()["ready"]
return self.ready
agent = Agent()
def init():
agent.init()
# This is the customer application
#!/usr/bin/env python
# This script is run on cron once per day
#
# If there is a weather advisory for the given US state, then post the advisory
# in slack and send an email to the administrators.
#######################
# 1. INITIALIZE AGENT #
#######################
import ddtrace
ddtrace.patch_all()
import datadog_agent
datadog_agent.init()
import requests
import emails
import slack
STATE = 'OR'
ADMIN_EMAILS = 'rey.abolofia@datadoghq.com'
SLACK_CHANNEL = '#weather-advisories'
######################
# 2. WRAP ENTRYPOINT #
######################
@datadog_agent.wrap
def main(event):
state = 'OR'
advisories = get_weather_advisories()
if advisories:
advisories_txt = format_advisories(advisories)
email_admin(advisories_txt)
post_to_slack(advisories_txt)
_weather_advisories_url = f'https://api.weather.gov/alerts/active?area={STATE}'
def get_weather_advisories():
resp = requests.get(_weather_advisories_url)
resp.raise_for_status()
return resp.json()['features']
def format_advisories(advisories):
adv_txts = '\n'.join(
f' ADVISORY: {a["properties"]["event"]} in {a["properties"]["areaDesc"]}'
for a in advisories
)
return f'Weather advisories found for {STATE}:\n{adv_txts}'
_email_from = 'weather.admin@datadoghq.com'
_email_subject = 'Weather advisories'
def email_admin(advisories):
emails.send_email(ADMIN_EMAILS, _email_from, _email_subject, advisories)
def post_to_slack(advisories):
slack.post_to_channel(advisories, SLACK_CHANNEL)
if __name__ == '__main__':
main({})
// These are the endpoints that are added to the serverless-init binary, it lives at cmd/serverless-init/endpoints.go
package main
import (
"log"
"net/http"
"sync"
"time"
"github.com/DataDog/datadog-agent/pkg/serverless/metrics"
"github.com/DataDog/datadog-agent/pkg/serverless/trace"
)
const serverAddr = ":8127"
var ready = struct {
sync.Mutex
areYou bool
done chan struct{}
}{done: make(chan struct{}, 1)}
func waitTillReady(timeout time.Duration) bool {
ready.Lock()
defer ready.Unlock()
if ready.areYou {
return true
}
select {
case <-ready.done:
ready.areYou = true
case <-time.After(timeout):
}
return ready.areYou
}
func setupEndpoints(traceAgent *trace.ServerlessTraceAgent, metricAgent *metrics.ServerlessMetricAgent) {
mux := http.NewServeMux()
mux.Handle("/trace/flush", newFlushHandler(traceAgent))
mux.Handle("/metrics/flush", newFlushHandler(metricAgent))
mux.HandleFunc("/ready", readyHandler)
go log.Fatal(http.ListenAndServe(serverAddr, mux))
ready.done <- struct{}{}
}
type flusher interface {
Flush()
}
func newFlushHandler(agent flusher) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusNotFound)
return
}
agent.Flush()
w.WriteHeader(http.StatusOK)
})
}
func readyHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotFound)
return
}
switch waitTillReady(5 * time.Second) {
case true:
w.Write([]byte(`{"ready":"true"}`))
case false:
w.Write([]byte(`{"ready":"false"}`))
}
}
# This is the code the customer can use to start/flush the agent with each function call
import time
from .agent import agent
class wrap(object):
def __init__(self, fn=None):
self.fn = fn
def __enter__(self):
while True:
if agent.is_ready():
break
time.sleep(0.1)
self.span = agent.tracer.trace('hello world')
return agent
def __exit__(self, exc_type, exc_value, traceback):
self.span.finish()
while not agent.flush_traces():
pass
def __call__(self, *args, **kwargs):
with self:
return self.fn(*args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment