Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Databricks Prometheus Integration
#!/usr/bin/env bash
### Functions
function setup_databricks_prometheus() {
echo "Showing files in /databricks/spark/conf/*"
ls -al /databricks/spark/conf/
cat /databricks/spark/conf/spark.properties
echo "Showing content of /databricks/spark/conf/metrics.properties"
sudo touch /databricks/spark/conf/metrics.properties
cat <<EOF | sudo tee /databricks/spark/conf/metrics.properties
# Enable Ganglia metrics
driver.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
*.sink.ganglia.port=8649
*.sink.ganglia.mode=unicast
# Enable Prometheus metrics
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus
EOF
sudo touch /databricks/spark/dbconf/log4j/master-worker/metrics.properties
cat <<EOF | sudo tee /databricks/spark/dbconf/log4j/master-worker/metrics.properties
# Enable Prometheus metrics
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus
EOF
echo "Showing content of /databricks/spark/dbconf/log4j/master-worker/metrics.properties"
cat /databricks/spark/dbconf/log4j/master-worker/metrics.properties
echo "Showing SPARK_ related envvars"
env | grep "SPARK_"
echo "Local spark ip is: ${SPARK_LOCAL_IP:-'NONE'}"
}
### Main
setup_databricks_prometheus%
import os
import threading
import urllib.request
import logging
from time import sleep
__author__ = "Florian Dambrine <android.florian@gmail.com>"
class DatabricksPushgatewayExporter:
"""
Pushgateway exporter to be used in Databricks notebooks
"""
__logger = logging.getLogger(__name__)
def __init__(self, job="python-test", frequency=2, pushgateway_endpoint="http://pushgateway-nlp.internal.verity.k8s.sx.ggops.com"):
self._job = job
self._instance = os.getenv("SPARK_LOCAL_IP", "unknown")
self._frequency = frequency
# Configure pushgateway endpoint with job and instance
self._pushgateway = f"{pushgateway_endpoint}/metrics/job/{self._job}/instance/{self._instance}"
self._prometheus = self._get_local_prometheus()
# Threading
self._closed = False
self._reporter = threading.Thread(target=self._send_to_pushgateway)
self._reporter.start()
def __repr__(self) -> str:
return f"{__class__.__name__} initialiazed with prometheus endpoint {self._prometheus}"
def _get_local_prometheus(self):
ui_ip = os.getenv("SPARK_LOCAL_IP")
ui_port = sc.getConf().get('spark.ui.port')
return f"http://{ui_ip}:{ui_port}"
def _send_to_pushgateway(self):
while not self._closed:
metrics = None
try:
with urllib.request.urlopen(f"{self._prometheus}/metrics/prometheus") as response:
metrics = response.read()
except Exception:
self.__logger.exception(f"Failed retrieving metrics from {self._prometheus}")
if metrics:
self.__logger.debug(metrics)
req = urllib.request.Request(url=self._pushgateway, method='PUT', data=metrics)
try:
with urllib.request.urlopen(req) as f:
self.__logger.info(f"Successfully reported {len(metrics)} of bytes - {f.status} - {f.reason}")
except Exception:
self.__logger.exception(f"Failed reporting metrics to {self._pushgateway}")
sleep(self._frequency)
def shutdown(self) -> None:
"""
Shutdown and terminate the DatabricksPushgatewayReporter.
"""
self._closed = True
try:
self._reporter.join()
self.__logger.info(f"{self.__class__.__name__} thread {self._reporter} successfully joint")
except RuntimeError:
self.__logger.warning(
f"{self.__class__.__name__} future {self._reporter} shutdown timed out"
)
if __name__ == '__main__':
logging.basicConfig(format="%(asctime)s,%(msecs)03d %(levelname)8s %(name)s %(filename)s:%(lineno)d - %(message)s", level=logging.ERROR)
logging.getLogger('py4j').setLevel(logging.ERROR)
logging.getLogger(__name__).setLevel(logging.INFO)
dpe = DatabricksPushgatewayExporter()
print(dpe)
sleep(10)
dpe.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment