Skip to content

Instantly share code, notes, and snippets.

@sgnn7
Created June 8, 2021 21:56
Show Gist options
  • Save sgnn7/8e3a64f27e0770ae580ebc8b3cf13c02 to your computer and use it in GitHub Desktop.
Save sgnn7/8e3a64f27e0770ae580ebc8b3cf13c02 to your computer and use it in GitHub Desktop.
Datadog Agent check that exercises the multiprocessing library
import logging
import os
import sys
import time
import multiprocessing as mp
from multiprocessing import Process, Queue
try:
from datadog_checks.base import AgentCheck
except ImportError:
from checks import AgentCheck
# Options are "fork", "spawn", and "spawnserver"
# - Windows: "spawn" is default. "fork" is unavailable.
# - Darwin: "spawn" is default.
# - Linux: "fork" is default.
START_METHOD = 'spawn'
try:
from datadog_agent import log
except:
pass
__version__ = "1.0.0"
LOG_PATH = '/opt/datadog-agent/logs/log.txt'
if sys.platform == 'win32':
LOG_PATH = 'C:\ProgramData\Datadog\log.txt'
# We need a consistent way to log things
def create_logger():
logger = mp.get_logger()
logger.setLevel(logging.NOTSET)
try:
os.remove(LOG_PATH)
except:
pass
formatter = logging.Formatter( \
'[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
file_handler = logging.FileHandler(LOG_PATH)
file_handler.setFormatter(formatter)
class AgentHandler(logging.Handler):
def emit(self, record):
message = record.getMessage()
# This will not be available for `spawn`ed processes. Comment out if needed.
log(message, 0)
agent_handler = AgentHandler()
if not len(logger.handlers):
# We dump the output to both files and stdout
logger.addHandler(agent_handler)
logger.addHandler(file_handler)
return logger
mp_logger = create_logger()
# Code for the subprocesses/jobs that we will create
def runner(name, queue):
pid = os.getpid()
mp.get_logger().info("### Hello from {} (pid: {}) via module's logger".format(name, pid))
logging_exception = None
try:
log("### Hello from {} (pid: {}) via Golang-injected 'log' method".format(name, pid), 0)
except Exception as e:
logging_exception = e
mp.get_logger().info("Failed to use Golang-injected 'log' method in {} (pid: {})".format(name, pid))
# Queue is the main way to pass info between subrocessed back to the parent
queue.put("Hello from {} via queue (pid: {}). Injected-Golang logging exception: {}".format(name, pid, logging_exception))
class HelloCheck(AgentCheck):
def check(self, instance):
mp.get_logger().info("*" * 50)
try:
log("Golang-injected logging from main thread works", 0)
except Exception as e:
mp.get_logger().info("Golang-injected logging from main thread DOES NOT WORK!")
if sys.version_info >= (3, 0):
mp.get_logger().info("Available start methods: {}".format(mp.get_all_start_methods()))
mp.get_logger().info("Current start method: {}".format(mp.get_start_method()))
# On at least 1 platform (Darwin), we have to force our context to use a fork
# See here for more info: https://bugs.python.org/issue33725
# XXX: "fork" cannot be used on Windows
mp.set_start_method(START_METHOD, force=True)
mp.get_logger().info("New start method: {}".format(mp.get_start_method()))
queue = Queue()
mp.get_logger().info("Sys executable: {}".format(sys.executable))
mp.get_logger().info("Exec prefix: {}".format(sys.exec_prefix))
mp.get_logger().info("Sys prefix: {}".format(sys.prefix))
if sys.version_info >= (3, 0):
from multiprocessing import spawn
mp.get_logger().info("Spawn exe: {}".format(spawn.get_executable()))
if hasattr(sys, 'argv'):
mp.get_logger().info("Argv: %s", sys.argv)
else:
mp.get_logger().info("Argv is empty!")
mp.get_logger().info("Starting check...")
# Run 5 jobs
jobs = []
for index in range(5):
p = Process(target=runner, args=('testprocess_{}'.format(index), queue,))
jobs.append(p)
mp.get_logger().info("Starting process #{}...".format(index))
p.start()
mp.get_logger().info("Joining...")
# Coalesce the results
for index, job in enumerate(jobs):
mp.get_logger().info("Joining process #{}...".format(index))
job.join()
mp.get_logger().info("Joined all")
# Print the results from the processes
mp.get_logger().info("*" * 50)
for index, _ in enumerate(jobs):
results = queue.get()
mp.get_logger().info(results)
mp.get_logger().info("*" * 50)
mp.get_logger().info("Done.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment