Skip to content

Instantly share code, notes, and snippets.

@jmarhee
Created February 15, 2021 01:19
Show Gist options
  • Save jmarhee/9da99d4e84b3a87db2b04e4a64ab9a37 to your computer and use it in GitHub Desktop.
Save jmarhee/9da99d4e84b3a87db2b04e4a64ab9a37 to your computer and use it in GitHub Desktop.
Terminates containers running longer than an float-value number of hours.
from datetime import datetime
import os
import docker
import dateparser
STARTING_TIME = str(datetime.now()).split(" ")[0]+ "T" + str(datetime.now()).split(" ")[1] + "Z"
if os.environ.get('LIMIT_HOURS') is None:
LIMIT_HOURS = 2.0
else:
LIMIT_HOURS = float(os.environ['LIMIT_HOURS'])
client = docker.from_env()
api_client = docker.APIClient(base_url='unix://var/run/docker.sock')
def compare_time(run_time,container_started_at):
dt = dateparser.parse(container_started_at).timestamp()
mt = dateparser.parse(run_time).timestamp()
interval = mt - dt
return interval
def eval_interval(interval):
eval = (interval / 60 ) / 60
if eval > LIMIT_HOURS:
return True
else:
return False
def check_containers():
to_delete = []
for c in client.containers.list():
start_at = c.attrs['State']['StartedAt']
interval = compare_time(STARTING_TIME,start_at)
deleteable = eval_interval(interval)
print(c.name + " (" + c.id + ") " + str(c.image) + " " + str(deleteable))
if deleteable == True:
to_delete.append({"id": c.id, "name": c.name})
else:
continue
return to_delete
def delete_containers(to_delete):
failed_deletions = []
for c in to_delete:
try:
running_container = str(c['id'])
print("Deleting: " + running_container)
print(api_client.stop(container=running_container))
except:
failed_deletions.append(c)
print("Failed to delete: " + c.id)
return failed_deletions
def main():
to_delete = check_containers()
delete_expired = delete_containers(to_delete)
return delete_expired
if __name__ == "__main__":
main()
dateparser
docker
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment