Skip to content

Instantly share code, notes, and snippets.

@Taofiqq
Created March 16, 2025 18:51
Show Gist options
  • Save Taofiqq/f90095f27b898a109fb1949579c031d2 to your computer and use it in GitHub Desktop.
Save Taofiqq/f90095f27b898a109fb1949579c031d2 to your computer and use it in GitHub Desktop.
import os
import json
import re
from typing import Dict, List, Any
from crewai import Agent, Crew, Task, Process
from langchain_openai import OpenAI
from copilotkit import CopilotKitState
class FeedbackAnalyzerState(CopilotKitState):
feedback_text: str = ""
analysis_result: Dict[str, Any] = {}
class FeedbackAnalyzerCrew:
def __init__(self):
# Load OpenAI API key from environment variable
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
# Make temperature configurable via environment variable
temperature = float(os.getenv("LLM_TEMPERATURE", "0.2"))
self.llm = OpenAI(openai_api_key=openai_api_key, temperature=temperature)
def create_crew(self) -> Crew:
"""Create a CrewAI crew with specialized agents."""
# Create the specialized agents
categorizer = Agent(
role="Feedback Categorizer",
goal="Accurately categorize customer feedback",
backstory=(
"You are an expert at analyzing text and identifying categories. "
"You can determine what topics a piece of feedback relates to."
),
verbose=True,
llm=self.llm,
)
theme_identifier = Agent(
role="Theme Identifier",
goal="Extract key themes and sentiments from feedback",
backstory=(
"You excel at finding patterns in text and identifying emotional tones. "
"You can extract the main themes and determine if they're positive or negative."
),
verbose=True,
llm=self.llm,
)
report_generator = Agent(
role="Report Generator",
goal="Create insightful summaries from feedback analysis",
backstory=(
"You are skilled at synthesizing analyses into clear, actionable reports. "
"You can highlight key insights and recommend next steps."
),
verbose=True,
llm=self.llm,
)
# Create a crew with these agents
crew = Crew(
agents=[categorizer, theme_identifier, report_generator],
tasks=[],
verbose=True,
process=Process.sequential,
)
return crew
async def analyze_feedback(self, feedback_text: str) -> Dict[str, Any]:
"""Analyze customer feedback and return structured results."""
if not feedback_text:
return {"error": "Feedback text cannot be empty"}
try:
crew = self.create_crew()
# Define tasks with explicit JSON output instructions
categorization_task = Task(
description=(
f"Analyze this feedback and return a JSON object with a 'categories' key containing a list of 3-5 categories, each with 'name' (string) and 'confidence' (float 0-1): {feedback_text}\n"
'Example: {"categories": [{"name": "product satisfaction", "confidence": 0.9}, {"name": "customer service", "confidence": 0.7}]}'
),
agent=crew.agents[0],
output_json=True,
tools=[],
)
theme_task = Task(
description=(
f"Identify 2-4 key themes in this feedback, return a JSON object with a 'themes' key containing a list of themes, each with 'name' (string), 'keywords' (list of strings), and 'sentiment' (positive/negative/neutral): {feedback_text}\n"
'Example: {"themes": [{"name": "customer service", "keywords": ["wait time"], "sentiment": "negative"}, {"name": "product satisfaction", "keywords": ["great product"], "sentiment": "positive"}]}'
),
agent=crew.agents[1],
output_json=True,
tools=[],
)
summary_task = Task(
description=f"Create a concise, insightful summary of this feedback as a string: {feedback_text}",
agent=crew.agents[2],
)
# Add tasks to the crew
crew.tasks = [categorization_task, theme_task, summary_task]
# Run the crew
crew.kickoff() # Removed config for now as it's not supported in 0.105.0
# Process task outputs
categories = []
themes = []
summary = ""
# Debug raw outputs
if hasattr(categorization_task, "output") and categorization_task.output:
raw_output = getattr(
categorization_task.output, "raw", categorization_task.output
)
print(f"Raw categorization output: {raw_output}")
try:
# Try to parse as JSON
parsed = json.loads(raw_output)
categories = parsed.get("categories", [])
except json.JSONDecodeError:
# Fallback: Parse plain text into categories
print("Falling back to text parsing for categories")
# Example: "product satisfaction, customer service, user interface"
category_names = (
raw_output.lower().split("could be ")[-1].split(", ")
)
categories = [
{"name": name.strip(), "confidence": 0.8}
for name in category_names
if name.strip()
]
if hasattr(theme_task, "output") and theme_task.output:
raw_output = getattr(theme_task.output, "raw", theme_task.output)
print(f"Raw theme output: {raw_output}")
try:
# Try to parse as JSON
parsed = json.loads(raw_output)
themes = parsed.get("themes", [])
except json.JSONDecodeError:
# Fallback: Parse plain text into themes
print("Falling back to text parsing for themes")
# Extract themes, keywords, and sentiment
themes_section = raw_output.lower()
theme_names = (
themes_section.split("are ")[-1].split(". ")[0].split(", ")
)
keywords_match = re.search(
r"keywords are (.*?)\. the sentiment", themes_section
)
sentiment_match = re.search(r"sentiment is (.*?)\.", themes_section)
keywords = (
keywords_match.group(1).split(", ") if keywords_match else []
)
sentiment = (
sentiment_match.group(1) if sentiment_match else "neutral"
)
themes = [
{
"name": name.strip(),
"keywords": [
kw.strip().strip("'") for kw in keywords if name in kw
],
"sentiment": (
"mixed"
if "mixed" in sentiment
else (
"positive"
if "positive" in sentiment
else "negative"
)
),
}
for name in theme_names
if name.strip()
]
if hasattr(summary_task, "output") and summary_task.output:
raw_output = getattr(summary_task.output, "raw", summary_task.output)
print(f"Raw summary output: {raw_output}")
summary = str(raw_output)
return {"categories": categories, "themes": themes, "summary": summary}
except Exception as e:
print(f"Error in analyze_feedback: {str(e)}")
return {"error": f"Failed to analyze feedback: {str(e)}"}
async def state_handler(
self, state: Dict[str, Any], messages: List[Dict[str, Any]]
) -> str:
"""Handle CopilotKit state and messages for chat functionality."""
if not messages:
return (
"Hello! I'm your feedback analysis assistant. How can I help you today?"
)
# Validate message format
if (
not isinstance(messages, list)
or not messages
or "content" not in messages[-1]
):
return "Sorry, I couldn't understand your message. Please try again."
latest_message = messages[-1]["content"]
if "analyze" in latest_message.lower() and "feedback" in latest_message.lower():
# Extract feedback text (improved parsing)
feedback_text = latest_message
if "feedback:" in latest_message.lower():
feedback_text = latest_message.lower().split("feedback:", 1)[-1].strip()
if not feedback_text:
return "Please provide feedback text to analyze (e.g., 'Analyze feedback: The product is great but slow')."
# Analyze the feedback
result = await self.analyze_feedback(feedback_text)
if "error" in result:
return f"Error: {result['error']}"
# Format the response
response = "## Feedback Analysis Results\n\n"
response += "### Categories\n"
for category in result["categories"]:
confidence_percent = f"{category['confidence'] * 100:.1f}%"
response += f"- {category['name']}: {confidence_percent}\n"
response += "\n### Themes\n"
for theme in result["themes"]:
keywords = ", ".join(theme["keywords"])
response += f"- {theme['name']} ({theme['sentiment']}): {keywords}\n"
response += f"\n### Summary\n{result['summary']}"
return response
else:
# Handle normal conversation
crew = self.create_crew()
assistant_task = Task(
description=f"The user said: '{latest_message}'. Respond helpfully as a feedback analysis assistant.",
agent=crew.agents[2], # Report Generator
)
try:
response = await assistant_task.execute()
return response or "I couldn't generate a response. Please try again."
except Exception as e:
return f"Error during conversation: {str(e)}"
# Create an instance of the feedback analyzer
feedback_analyzer = FeedbackAnalyzerCrew()
from fastapi import FastAPI
from copilotkit.integrations.fastapi import add_fastapi_endpoint
from agent import FeedbackAnalyzerCrew
feedback_analyzer = FeedbackAnalyzerCrew()
def setup_copilot(app: FastAPI):
@app.get("/api/copilotkit")
async def handle_copilotkit():
return {"status": "CopilotKit is up and running"}
add_fastapi_endpoint(
app,
sdk=feedback_analyzer,
prefix="/copilotkit",
use_thread_pool=False
)
import os
from dotenv import load_dotenv
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List
from crewai import Agent, Task, Crew
from copilot_integration import setup_copilot
from contextlib import asynccontextmanager
from langchain_community.llms import OpenAI
from agent import feedback_analyzer
# Load environment variables
load_dotenv()
@asynccontextmanager
async def lifespan(app: FastAPI):
setup_copilot(app)
yield
# Initialize FastAPI app
app = FastAPI(lifespan=lifespan)
# Add CORS middleware to allow requests from frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class FeedbackRequest(BaseModel):
text: str
class Category(BaseModel):
name: str
confidence: float
class Theme(BaseModel):
name: str
keywords: List[str]
sentiment: str
class AnalysisResponse(BaseModel):
categories: List[Category]
themes: List[Theme]
summary: str
def get_llm():
return OpenAI(openai_api_key=os.getenv("OPENAI_API_KEY"), temperature=0.2)
def create_agents():
# Categorizer Agent
categorizer = Agent(
role="Feedback Categorizer",
goal="Accurately categorize customer feedback into relevant categories",
backstory=(
"You are an expert at analyzing and categorizing customer feedback. "
"Your specialty is in identifying the main themes and assigning the "
"appropriate categories to each piece of feedback."
),
verbose=True,
llm=get_llm(),
)
# Theme Identifier Agent
theme_identifier = Agent(
role="Theme Identifier",
goal="Identify recurring themes and sentiments in customer feedback",
backstory=(
"You excel at spotting patterns and trends in large amounts of feedback. "
"You can identify both explicit and implicit themes, as well as the "
"sentiment associated with each theme."
),
verbose=True,
llm=get_llm(),
)
# Report Generator Agent
report_generator = Agent(
role="Report Generator",
goal="Create concise, insightful summaries of customer feedback analysis",
backstory=(
"You are skilled at synthesizing complex analyses into clear, actionable "
"reports. You focus on highlighting the most important insights and providing "
"a balanced view of the feedback."
),
verbose=True,
llm=get_llm(),
)
return categorizer, theme_identifier, report_generator
@app.post("/api/analyze", response_model=AnalysisResponse)
async def analyze_feedback(request: FeedbackRequest):
if not request.text:
raise HTTPException(status_code=400, detail="Feedback text cannot be empty")
try:
# Use the feedback_analyzer instance from agent.py
result = await feedback_analyzer.analyze_feedback(request.text)
# Format response
categories = [
Category(name=cat["name"], confidence=cat["confidence"])
for cat in result.get("categories", [])
]
themes = [
Theme(
name=theme["name"],
keywords=theme["keywords"],
sentiment=theme["sentiment"],
)
for theme in result.get("themes", [])
]
# Extract string from TaskOutput if necessary
summary = result.get("summary", "")
if hasattr(summary, "result"):
summary = summary.result
elif hasattr(summary, "description"):
summary = summary.description
elif not isinstance(summary, str):
summary = str(summary)
return AnalysisResponse(categories=categories, themes=themes, summary=summary)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
@app.get("/health")
def health_check():
llm = get_llm()
return {"status": "ok", "llm": str(llm)}
# Run the application
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment