Skip to content

Instantly share code, notes, and snippets.

@jondcoleman
Created May 9, 2024 19:14
Show Gist options
  • Save jondcoleman/f2d2f70965fcae34b850779d85cea83b to your computer and use it in GitHub Desktop.
Save jondcoleman/f2d2f70965fcae34b850779d85cea83b to your computer and use it in GitHub Desktop.
from enum import Enum
from typing import Any, Dict, Optional
from crewai import Crew
from crewai.process import Process
from crewai.utilities import I18N
class ProcessExtended(str, Enum):
"""
Class representing the different processes that can be used to tackle tasks
"""
sequential = "sequential"
hierarchical = "hierarchical"
custom = "custom"
# TODO: consensual = 'consensual'
class CrewExtended(Crew):
processExtended: Optional[ProcessExtended] = None
def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
self._interpolate_inputs(inputs or {}) # customized to avoid pylance error
self._set_tasks_callbacks()
i18n = I18N(language=self.language, language_file=self.language_file)
for agent in self.agents:
agent.i18n = i18n
agent.crew = self
if not agent.function_calling_llm:
agent.function_calling_llm = self.function_calling_llm
if not agent.step_callback:
agent.step_callback = self.step_callback
agent.create_agent_executor()
metrics = []
if self.processExtended == ProcessExtended.custom:
result = self._run_custom_process()
elif self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result, manager_metrics = self._run_hierarchical_process()
metrics.append(manager_metrics)
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
metrics = metrics + [
agent._token_process.get_summary() for agent in self.agents
]
self.usage_metrics = {
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
}
return result
def _run_custom_process(self) -> str:
"""Executes tasks in a custom way and returns the final output."""
# task_output = ""
# for task in self.tasks:
# if task.agent and task.agent.allow_delegation:
# agents_for_delegation = [
# agent for agent in self.agents if agent != task.agent
# ]
# if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.tools:
# task.tools += AgentTools(agents=agents_for_delegation).tools()
# role = task.agent.role if task.agent is not None else "None"
# self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
# self._logger.log(
# "info", f"== Starting Task: {task.description}", color="bold_purple"
# )
# if self.output_log_file:
# self._file_handler.log(
# agent=role, task=task.description, status="started"
# )
# output = task.execute(context=task_output)
# if not task.async_execution:
# task_output = output
# role = task.agent.role if task.agent is not None else "None"
# self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
# if self.output_log_file:
# self._file_handler.log(agent=role, task=task_output, status="completed")
# self._finish_execution(task_output)
# return self._format_output(task_output)
return "custom process output"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment