Skip to content

Instantly share code, notes, and snippets.

@basilfx
Created June 18, 2021 18:34
Show Gist options
  • Save basilfx/d70026c34b6b2e40daec6316ef618f74 to your computer and use it in GitHub Desktop.
Save basilfx/d70026c34b6b2e40daec6316ef618f74 to your computer and use it in GitHub Desktop.
Fail2ban plugin for collectd
"""
Plugin for collectd to fetch Fail2ban statistics per jail.
Recreated, but based on the original source code by Antti Jaakkola
(https://github.com/annttu/collectd-plugins).
To add this plugin to collectd, create a definition as below:
```python
LoadPlugin python
<Plugin python>
ModulePath "/opt/collectd/plugins"
Import "fail2ban_plugin"
</Plugin>
```
"""
from fail2ban.client.csocket import CSocket
import collectd
# Global instance of a Fail2ban client.
client = None
class Fail2banClient:
"""
Simple client for interacting with the Fail2ban client. Based on the
original source.
"""
def __init__(self, socket: str) -> None:
"""
Construct a new Fail2ban client.
"""
self.socket = socket
def _send_command(self, cmd: list[str], timeout: int = -1) -> list:
"""
Send a command to the Fail2ban server, and wait for a response.
"""
client = CSocket(self.socket, timeout=timeout)
if timeout != -1:
client.settimeout(timeout)
ret = client.send(cmd)
if ret[0] != 0:
return
return ret[1]
def get_jail_status(self, jail: str) -> dict:
"""
Return the status of a specific jail.
"""
response = self._send_command(["status", jail])
if not response:
return []
# Restructure the response.
response = {k: v for k, v in response}
if "Actions" in response:
response["Actions"] = {k: v for k, v in response["Actions"]}
if "Filter" in response:
response["Filter"] = {k: v for k, v in response["Filter"]}
return response
def get_status(self) -> list[dict]:
"""
Return the status information.
"""
response = self._send_command(["status"])
if not response:
return []
# Restructure the response.
response = {k: v for k, v in response}
if "Jail list" in response:
response["Jail list"] = [
j.strip() for j in response["Jail list"].split(",")
]
return response
def init() -> bool:
"""
Initialize the plugin.
"""
global client
client = Fail2banClient("/var/run/fail2ban/fail2ban.sock")
return True
def read(data=None) -> None:
"""
Read the status of fail2ban.
"""
global client
status = client.get_status()
if not status:
collectd.error("fail2ban_plugin: unable to retrieve status.")
return
if "Jail list" not in status:
return
for jail in status["Jail list"]:
jail_status = client.get_jail_status(jail)
if not jail_status:
collectd.error("fail2ban_plugin: unable to retrieve jail status.")
continue
v1 = collectd.Values()
v1.plugin = "fail2ban"
v1.plugin_instance = jail
v1.type = "gauge"
v1.type_instance = "currently_banned"
v1.values = [float(jail_status["Actions"]["Currently banned"])]
v1.dispatch()
v2 = collectd.Values()
v2.plugin = "fail2ban"
v2.plugin_instance = jail
v2.type = "gauge"
v2.type_instance = "total_banned"
v2.values = [float(jail_status["Actions"]["Total banned"])]
v2.dispatch()
# Register plugin.
collectd.register_read(read)
collectd.register_init(init)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment