Created
May 9, 2024 19:14
-
-
Save jondcoleman/f2d2f70965fcae34b850779d85cea83b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
from typing import Any, Dict, Optional | |
from crewai import Crew | |
from crewai.process import Process | |
from crewai.utilities import I18N | |
class ProcessExtended(str, Enum): | |
""" | |
Class representing the different processes that can be used to tackle tasks | |
""" | |
sequential = "sequential" | |
hierarchical = "hierarchical" | |
custom = "custom" | |
# TODO: consensual = 'consensual' | |
class CrewExtended(Crew): | |
processExtended: Optional[ProcessExtended] = None | |
def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str: | |
"""Starts the crew to work on its assigned tasks.""" | |
self._execution_span = self._telemetry.crew_execution_span(self) | |
self._interpolate_inputs(inputs or {}) # customized to avoid pylance error | |
self._set_tasks_callbacks() | |
i18n = I18N(language=self.language, language_file=self.language_file) | |
for agent in self.agents: | |
agent.i18n = i18n | |
agent.crew = self | |
if not agent.function_calling_llm: | |
agent.function_calling_llm = self.function_calling_llm | |
if not agent.step_callback: | |
agent.step_callback = self.step_callback | |
agent.create_agent_executor() | |
metrics = [] | |
if self.processExtended == ProcessExtended.custom: | |
result = self._run_custom_process() | |
elif self.process == Process.sequential: | |
result = self._run_sequential_process() | |
elif self.process == Process.hierarchical: | |
result, manager_metrics = self._run_hierarchical_process() | |
metrics.append(manager_metrics) | |
else: | |
raise NotImplementedError( | |
f"The process '{self.process}' is not implemented yet." | |
) | |
metrics = metrics + [ | |
agent._token_process.get_summary() for agent in self.agents | |
] | |
self.usage_metrics = { | |
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0] | |
} | |
return result | |
def _run_custom_process(self) -> str: | |
"""Executes tasks in a custom way and returns the final output.""" | |
# task_output = "" | |
# for task in self.tasks: | |
# if task.agent and task.agent.allow_delegation: | |
# agents_for_delegation = [ | |
# agent for agent in self.agents if agent != task.agent | |
# ] | |
# if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.tools: | |
# task.tools += AgentTools(agents=agents_for_delegation).tools() | |
# role = task.agent.role if task.agent is not None else "None" | |
# self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple") | |
# self._logger.log( | |
# "info", f"== Starting Task: {task.description}", color="bold_purple" | |
# ) | |
# if self.output_log_file: | |
# self._file_handler.log( | |
# agent=role, task=task.description, status="started" | |
# ) | |
# output = task.execute(context=task_output) | |
# if not task.async_execution: | |
# task_output = output | |
# role = task.agent.role if task.agent is not None else "None" | |
# self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n") | |
# if self.output_log_file: | |
# self._file_handler.log(agent=role, task=task_output, status="completed") | |
# self._finish_execution(task_output) | |
# return self._format_output(task_output) | |
return "custom process output" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment