Created
March 16, 2025 18:51
-
-
Save Taofiqq/f90095f27b898a109fb1949579c031d2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import re | |
from typing import Dict, List, Any | |
from crewai import Agent, Crew, Task, Process | |
from langchain_openai import OpenAI | |
from copilotkit import CopilotKitState | |
class FeedbackAnalyzerState(CopilotKitState): | |
feedback_text: str = "" | |
analysis_result: Dict[str, Any] = {} | |
class FeedbackAnalyzerCrew: | |
def __init__(self): | |
# Load OpenAI API key from environment variable | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
raise ValueError("OPENAI_API_KEY environment variable not set") | |
# Make temperature configurable via environment variable | |
temperature = float(os.getenv("LLM_TEMPERATURE", "0.2")) | |
self.llm = OpenAI(openai_api_key=openai_api_key, temperature=temperature) | |
def create_crew(self) -> Crew: | |
"""Create a CrewAI crew with specialized agents.""" | |
# Create the specialized agents | |
categorizer = Agent( | |
role="Feedback Categorizer", | |
goal="Accurately categorize customer feedback", | |
backstory=( | |
"You are an expert at analyzing text and identifying categories. " | |
"You can determine what topics a piece of feedback relates to." | |
), | |
verbose=True, | |
llm=self.llm, | |
) | |
theme_identifier = Agent( | |
role="Theme Identifier", | |
goal="Extract key themes and sentiments from feedback", | |
backstory=( | |
"You excel at finding patterns in text and identifying emotional tones. " | |
"You can extract the main themes and determine if they're positive or negative." | |
), | |
verbose=True, | |
llm=self.llm, | |
) | |
report_generator = Agent( | |
role="Report Generator", | |
goal="Create insightful summaries from feedback analysis", | |
backstory=( | |
"You are skilled at synthesizing analyses into clear, actionable reports. " | |
"You can highlight key insights and recommend next steps." | |
), | |
verbose=True, | |
llm=self.llm, | |
) | |
# Create a crew with these agents | |
crew = Crew( | |
agents=[categorizer, theme_identifier, report_generator], | |
tasks=[], | |
verbose=True, | |
process=Process.sequential, | |
) | |
return crew | |
async def analyze_feedback(self, feedback_text: str) -> Dict[str, Any]: | |
"""Analyze customer feedback and return structured results.""" | |
if not feedback_text: | |
return {"error": "Feedback text cannot be empty"} | |
try: | |
crew = self.create_crew() | |
# Define tasks with explicit JSON output instructions | |
categorization_task = Task( | |
description=( | |
f"Analyze this feedback and return a JSON object with a 'categories' key containing a list of 3-5 categories, each with 'name' (string) and 'confidence' (float 0-1): {feedback_text}\n" | |
'Example: {"categories": [{"name": "product satisfaction", "confidence": 0.9}, {"name": "customer service", "confidence": 0.7}]}' | |
), | |
agent=crew.agents[0], | |
output_json=True, | |
tools=[], | |
) | |
theme_task = Task( | |
description=( | |
f"Identify 2-4 key themes in this feedback, return a JSON object with a 'themes' key containing a list of themes, each with 'name' (string), 'keywords' (list of strings), and 'sentiment' (positive/negative/neutral): {feedback_text}\n" | |
'Example: {"themes": [{"name": "customer service", "keywords": ["wait time"], "sentiment": "negative"}, {"name": "product satisfaction", "keywords": ["great product"], "sentiment": "positive"}]}' | |
), | |
agent=crew.agents[1], | |
output_json=True, | |
tools=[], | |
) | |
summary_task = Task( | |
description=f"Create a concise, insightful summary of this feedback as a string: {feedback_text}", | |
agent=crew.agents[2], | |
) | |
# Add tasks to the crew | |
crew.tasks = [categorization_task, theme_task, summary_task] | |
# Run the crew | |
crew.kickoff() # Removed config for now as it's not supported in 0.105.0 | |
# Process task outputs | |
categories = [] | |
themes = [] | |
summary = "" | |
# Debug raw outputs | |
if hasattr(categorization_task, "output") and categorization_task.output: | |
raw_output = getattr( | |
categorization_task.output, "raw", categorization_task.output | |
) | |
print(f"Raw categorization output: {raw_output}") | |
try: | |
# Try to parse as JSON | |
parsed = json.loads(raw_output) | |
categories = parsed.get("categories", []) | |
except json.JSONDecodeError: | |
# Fallback: Parse plain text into categories | |
print("Falling back to text parsing for categories") | |
# Example: "product satisfaction, customer service, user interface" | |
category_names = ( | |
raw_output.lower().split("could be ")[-1].split(", ") | |
) | |
categories = [ | |
{"name": name.strip(), "confidence": 0.8} | |
for name in category_names | |
if name.strip() | |
] | |
if hasattr(theme_task, "output") and theme_task.output: | |
raw_output = getattr(theme_task.output, "raw", theme_task.output) | |
print(f"Raw theme output: {raw_output}") | |
try: | |
# Try to parse as JSON | |
parsed = json.loads(raw_output) | |
themes = parsed.get("themes", []) | |
except json.JSONDecodeError: | |
# Fallback: Parse plain text into themes | |
print("Falling back to text parsing for themes") | |
# Extract themes, keywords, and sentiment | |
themes_section = raw_output.lower() | |
theme_names = ( | |
themes_section.split("are ")[-1].split(". ")[0].split(", ") | |
) | |
keywords_match = re.search( | |
r"keywords are (.*?)\. the sentiment", themes_section | |
) | |
sentiment_match = re.search(r"sentiment is (.*?)\.", themes_section) | |
keywords = ( | |
keywords_match.group(1).split(", ") if keywords_match else [] | |
) | |
sentiment = ( | |
sentiment_match.group(1) if sentiment_match else "neutral" | |
) | |
themes = [ | |
{ | |
"name": name.strip(), | |
"keywords": [ | |
kw.strip().strip("'") for kw in keywords if name in kw | |
], | |
"sentiment": ( | |
"mixed" | |
if "mixed" in sentiment | |
else ( | |
"positive" | |
if "positive" in sentiment | |
else "negative" | |
) | |
), | |
} | |
for name in theme_names | |
if name.strip() | |
] | |
if hasattr(summary_task, "output") and summary_task.output: | |
raw_output = getattr(summary_task.output, "raw", summary_task.output) | |
print(f"Raw summary output: {raw_output}") | |
summary = str(raw_output) | |
return {"categories": categories, "themes": themes, "summary": summary} | |
except Exception as e: | |
print(f"Error in analyze_feedback: {str(e)}") | |
return {"error": f"Failed to analyze feedback: {str(e)}"} | |
async def state_handler( | |
self, state: Dict[str, Any], messages: List[Dict[str, Any]] | |
) -> str: | |
"""Handle CopilotKit state and messages for chat functionality.""" | |
if not messages: | |
return ( | |
"Hello! I'm your feedback analysis assistant. How can I help you today?" | |
) | |
# Validate message format | |
if ( | |
not isinstance(messages, list) | |
or not messages | |
or "content" not in messages[-1] | |
): | |
return "Sorry, I couldn't understand your message. Please try again." | |
latest_message = messages[-1]["content"] | |
if "analyze" in latest_message.lower() and "feedback" in latest_message.lower(): | |
# Extract feedback text (improved parsing) | |
feedback_text = latest_message | |
if "feedback:" in latest_message.lower(): | |
feedback_text = latest_message.lower().split("feedback:", 1)[-1].strip() | |
if not feedback_text: | |
return "Please provide feedback text to analyze (e.g., 'Analyze feedback: The product is great but slow')." | |
# Analyze the feedback | |
result = await self.analyze_feedback(feedback_text) | |
if "error" in result: | |
return f"Error: {result['error']}" | |
# Format the response | |
response = "## Feedback Analysis Results\n\n" | |
response += "### Categories\n" | |
for category in result["categories"]: | |
confidence_percent = f"{category['confidence'] * 100:.1f}%" | |
response += f"- {category['name']}: {confidence_percent}\n" | |
response += "\n### Themes\n" | |
for theme in result["themes"]: | |
keywords = ", ".join(theme["keywords"]) | |
response += f"- {theme['name']} ({theme['sentiment']}): {keywords}\n" | |
response += f"\n### Summary\n{result['summary']}" | |
return response | |
else: | |
# Handle normal conversation | |
crew = self.create_crew() | |
assistant_task = Task( | |
description=f"The user said: '{latest_message}'. Respond helpfully as a feedback analysis assistant.", | |
agent=crew.agents[2], # Report Generator | |
) | |
try: | |
response = await assistant_task.execute() | |
return response or "I couldn't generate a response. Please try again." | |
except Exception as e: | |
return f"Error during conversation: {str(e)}" | |
# Create an instance of the feedback analyzer | |
feedback_analyzer = FeedbackAnalyzerCrew() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI | |
from copilotkit.integrations.fastapi import add_fastapi_endpoint | |
from agent import FeedbackAnalyzerCrew | |
feedback_analyzer = FeedbackAnalyzerCrew() | |
def setup_copilot(app: FastAPI): | |
@app.get("/api/copilotkit") | |
async def handle_copilotkit(): | |
return {"status": "CopilotKit is up and running"} | |
add_fastapi_endpoint( | |
app, | |
sdk=feedback_analyzer, | |
prefix="/copilotkit", | |
use_thread_pool=False | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from dotenv import load_dotenv | |
from fastapi import FastAPI, Depends, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from typing import List | |
from crewai import Agent, Task, Crew | |
from copilot_integration import setup_copilot | |
from contextlib import asynccontextmanager | |
from langchain_community.llms import OpenAI | |
from agent import feedback_analyzer | |
# Load environment variables | |
load_dotenv() | |
@asynccontextmanager | |
async def lifespan(app: FastAPI): | |
setup_copilot(app) | |
yield | |
# Initialize FastAPI app | |
app = FastAPI(lifespan=lifespan) | |
# Add CORS middleware to allow requests from frontend | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["http://localhost:3000"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class FeedbackRequest(BaseModel): | |
text: str | |
class Category(BaseModel): | |
name: str | |
confidence: float | |
class Theme(BaseModel): | |
name: str | |
keywords: List[str] | |
sentiment: str | |
class AnalysisResponse(BaseModel): | |
categories: List[Category] | |
themes: List[Theme] | |
summary: str | |
def get_llm(): | |
return OpenAI(openai_api_key=os.getenv("OPENAI_API_KEY"), temperature=0.2) | |
def create_agents(): | |
# Categorizer Agent | |
categorizer = Agent( | |
role="Feedback Categorizer", | |
goal="Accurately categorize customer feedback into relevant categories", | |
backstory=( | |
"You are an expert at analyzing and categorizing customer feedback. " | |
"Your specialty is in identifying the main themes and assigning the " | |
"appropriate categories to each piece of feedback." | |
), | |
verbose=True, | |
llm=get_llm(), | |
) | |
# Theme Identifier Agent | |
theme_identifier = Agent( | |
role="Theme Identifier", | |
goal="Identify recurring themes and sentiments in customer feedback", | |
backstory=( | |
"You excel at spotting patterns and trends in large amounts of feedback. " | |
"You can identify both explicit and implicit themes, as well as the " | |
"sentiment associated with each theme." | |
), | |
verbose=True, | |
llm=get_llm(), | |
) | |
# Report Generator Agent | |
report_generator = Agent( | |
role="Report Generator", | |
goal="Create concise, insightful summaries of customer feedback analysis", | |
backstory=( | |
"You are skilled at synthesizing complex analyses into clear, actionable " | |
"reports. You focus on highlighting the most important insights and providing " | |
"a balanced view of the feedback." | |
), | |
verbose=True, | |
llm=get_llm(), | |
) | |
return categorizer, theme_identifier, report_generator | |
@app.post("/api/analyze", response_model=AnalysisResponse) | |
async def analyze_feedback(request: FeedbackRequest): | |
if not request.text: | |
raise HTTPException(status_code=400, detail="Feedback text cannot be empty") | |
try: | |
# Use the feedback_analyzer instance from agent.py | |
result = await feedback_analyzer.analyze_feedback(request.text) | |
# Format response | |
categories = [ | |
Category(name=cat["name"], confidence=cat["confidence"]) | |
for cat in result.get("categories", []) | |
] | |
themes = [ | |
Theme( | |
name=theme["name"], | |
keywords=theme["keywords"], | |
sentiment=theme["sentiment"], | |
) | |
for theme in result.get("themes", []) | |
] | |
# Extract string from TaskOutput if necessary | |
summary = result.get("summary", "") | |
if hasattr(summary, "result"): | |
summary = summary.result | |
elif hasattr(summary, "description"): | |
summary = summary.description | |
elif not isinstance(summary, str): | |
summary = str(summary) | |
return AnalysisResponse(categories=categories, themes=themes, summary=summary) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") | |
@app.get("/health") | |
def health_check(): | |
llm = get_llm() | |
return {"status": "ok", "llm": str(llm)} | |
# Run the application | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment