Skip to content

Instantly share code, notes, and snippets.

@khusseini
Created May 10, 2024 06:15
Show Gist options
  • Save khusseini/90031a531e3c940697806efe63b1acee to your computer and use it in GitHub Desktop.
Save khusseini/90031a531e3c940697806efe63b1acee to your computer and use it in GitHub Desktop.
from crewai import Agent
from langchain_community.llms import Ollama
from .tools.kubernetes_events import retrieve_kubernetes_events
from .tools.search import search_tool
def create_kubernetes_cluster_state_agent(llm=None):
if not llm:
llm = Ollama(model="llama2")
agent = Agent(
role="Kubernetes Cluster State Agent",
goal="Retrieve Kubernetes events.",
backstory="You are responsible for connecting to the Kubernetes API server and retrieving the current events of the cluster and make them ready for human processing.",
verbose=True,
allow_delegation=False,
llm=llm,
tools=[search_tool]
)
return agent
from crewai import Crew, Process
from .kubernetes.agents import (
create_kubernetes_cluster_state_agent,
)
from .kubernetes.tasks import (
create_kubernetes_events_task,
)
from langchain_community.llms import Ollama
def create_devops_crew(llm=None):
if not llm:
llm = Ollama(model_name="llama2")
kubernetes_cluster_state_agent = create_kubernetes_cluster_state_agent(llm)
crew = Crew(
name="Kubernetes Query Crew",
description="A team of agents to assist with answering user queries about the Kubernetes cluster state.",
agents=[
kubernetes_cluster_state_agent,
],
tasks=[
create_kubernetes_events_task(kubernetes_cluster_state_agent),
],
)
return crew
from typing import Optional
from kubernetes import client, config, watch
from langchain.tools import tool
@tool("retrieve kubernetes events")
def retrieve_kubernetes_events():
"""
Useful to list kubernetes events in all namespaces of the kubernetes cluster.
It does not need any arguments and just lists all events in a human readable format.
"""
config.load_kube_config()
v1 = client.CoreV1Api()
events = v1.list_event_for_all_namespaces(watch=False)
output = []
for event in events.items:
event_type = event.type
event_reason = event.reason
event_message = event.message
involved_object = event.involved_object
object_kind = involved_object.kind
object_name = involved_object.name
object_namespace = involved_object.namespace
event_summary = f"{event_type}: {event_reason} - {event_message}"
object_info = f"Object: {object_kind}/{object_name} (Namespace: {object_namespace})"
output.append(f"{event_summary}\n{object_info}\n")
return "\n".join(output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment