Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ericallam/d960abc638c2bf06d2b1cba05b206a28 to your computer and use it in GitHub Desktop.
Save ericallam/d960abc638c2bf06d2b1cba05b206a28 to your computer and use it in GitHub Desktop.
Trigger.dev Discourse clustering job
import { client } from "@/trigger"
import { eventTrigger, isTriggerError } from "@trigger.dev/sdk"
import { OpenAI } from "@trigger.dev/openai"
import { Supabase } from "@trigger.dev/supabase"
import { z } from "zod"
import _ from "lodash"
import { write } from "@/lib/neo4j"
// Validate the JSON structure using Zod
const conversationSchema = z.object({
Informative_Cues: z.string(),
Extracted_Conversation: z.array(
z.object({
message_id: z.number(),
content: z.string(),
timestamp: z.string(),
username: z.string(),
})
),
Orphan_Flag: z.boolean(),
})
const jsonResponseSchema = z.array(conversationSchema)
async function fetchDiscourseQuestionEmbeddings(io, orgId) {
const { data: discourseMessages, error: fetchError } =
await io.supabase.runTask(
"get-discourse-question-embeddings",
async (db) => {
// 1. Fetch the latest 100 messages from Supabase
return (
db
.from("discourse_question_embeddings")
.select("*")
.eq("org_id", orgId)
// .range(100, 1000)
.limit(20)
)
}
)
if (fetchError) {
await io.logger.error("Failed to fetch discourse message:", fetchError)
throw fetchError
}
return discourseMessages
}
async function groupExtractMessagesByTopics(discourseMessages) {
// Group the messages by channel_id using lodash
const groupedByTopic = _.groupBy(discourseMessages, "topic_id")
const extractedData = _.mapValues(groupedByTopic, (messages) => {
// Sort the messages by timestamp in ascending order
const sortedMessages = _.sortBy(messages, "post_number", "desc")
return {
topic_title: sortedMessages[0].topic_title, // Assuming all messages in a group have the same channel_name
messages: sortedMessages.map((message) => ({
discourse_post_id: message.discourse_post_id.toString(),
username: message.username,
content: message.cooked,
})),
}
})
return extractedData
}
async function openaiIntentMessage(io, topicName, messagesForTopicString) {
const openaiPrompt = `
Discourse Messages for Topic ${topicName}: ${messagesForTopicString}
Manual Topic Extraction with Iteration:
Familiarization & Data Preparation:
- Read Through: Read the entire dataset to understand the content.
- Organize: Ensure data is structured (e.g., chronologically or by author).
- Highlight Key Terms: Mark significant or frequently recurring terms.
Initial Topic Identification:
- List Themes: Make a list of recurring themes or subjects from the data.
- Group Terms: Cluster related terms under broader topics.
Iterative Refinement (3 Iterations):
- Iteration 1:
-- Review the initially identified topics and keywords.
-- Merge similar topics.
-- Split overly broad topics.
-- Revisit the text to check for missed keywords or topics.
- Iteration 2:
-- Further condense topics.
-- Eliminate redundant or overly specific keywords.
-- Align topics with the majority of associated messages or text passages.
- Iteration 3:
-- Finalize topic definitions.
-- Ensure each keyword is pertinent and not overly repetitive across topics.
-- Validate topics against the original text to ensure they capture main themes.
Validation:
- Peer Review: Have your topics reviewed by another person.
- Comparison: If possible, compare with topics from automated methods.
Documentation & Output Formation:
- Record Topics: Document your topics and related terms.
- Format Output: For each message or text passage, create an output in the specified format:
json
{
"title": "Title of the Message or Text",
"extracted_keywords": ["keyword1", "keyword2", "keyword3", ...],
"topic": "Identified Main Topic",
"discourse_post_id": "Unique Identifier for the Message or Text"
}
The iterative process ensures that topics and keywords are revisited and refined multiple times, leading to a more condensed and accurate representation. Remember, even with iterations, manual extraction remains subjective, so it's essential to maintain transparency and rigor throughout the process.
`
// io.logger.info("openaiPrompt:", openaiPrompt);
// Summarize the message using OpenAI
// @ts-ignore
const openaiInput = {
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: openaiPrompt,
},
],
functions: [
{
name: "TopicExtractionMessages",
description: "Exctract Topic of discourse messages",
parameters: {
type: "object",
properties: {
topic_extractions: {
type: "array",
description: "An array of objects representing topic extractions",
items: {
type: "object",
properties: {
title: {
type: "string",
description:
"the title of the topic extraction of the message",
},
extracted_keywords: {
type: "array",
items: {
type: "string",
},
description:
"List of topic keyword repesenting the topic of the message",
},
topic: {
type: "string",
description:
"One line description of the topic of the message",
},
discourse_post_id: {
type: "string",
description:
"A unique identifier for the message within the conversation",
},
},
required: [
"title",
"extracted_keywords",
"topic",
"discourse_post_id",
],
},
},
},
required: ["topic_extractions"],
},
},
],
function_call: "auto",
}
const response = await io.openai.createChatCompletion(
`chat-completion-${topicName}`,
openaiInput
)
return response
}
async function openaiTopicHierachicalRepresentation(io, topic_cluster) {
const updatedTreeTopicCluster = { ...topic_cluster } // Creating a shallow copy to avoid direct mutation
for (const tree_topic of updatedTreeTopicCluster.tree) {
const topic_list = tree_topic.Topics
const topic_name_list = topic_list.map((topic_id) => {
// find the topic name from the topic id
updatedTreeTopicCluster.topic.find((topic) => topic.Topic === Number(topic_id))?.Name
})
const topic_name_list_string = topic_name_list.join("\n")
// Simple name merge for now but could use representive documents and keywords in the future
const openaiPrompt = `
I have to merge the following topics:
Topics:
${topic_name_list_string}
Based on the information above, extract a short merge topic label in the following format:
{
topic: <topic label>
}
`
// @ts-ignore
const openaiInput = {
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: openaiPrompt,
},
],
functions: [
{
name: "TopicExtractionMessages",
description: "Exctract Topic of Documents",
parameters: {
type: "object",
properties: {
topic: {
type: "string",
description:
"One line description of the topic of the documents",
},
},
required: ["topic"],
},
},
],
function_call: "auto",
}
try {
const response = await io.openai.createChatCompletion(
`chat-completion-${tree_topic.Parent_Name}`,
openaiInput
)
// Assuming the response contains the new topic name directly
const responseData = response?.choices?.[0]
if (responseData.finish_reason === "function_call") {
if (responseData?.message?.function_call?.arguments) {
const functionArgs = JSON.parse(
responseData?.message?.function_call?.arguments
)
console.log("functionArgs:", functionArgs)
const newTopicName = functionArgs.topic
// Update the topic name
tree_topic.Parent_Name = newTopicName
}
} else {
io.logger.warn(`${tree_topic.Parent_Name} FUNCTION NOT BEING CALLED:`, responseData)
}
} catch (error) {
// ADD THIS
if (isTriggerError(error)) throw error;
await io.logger.error(`Error updating topic ${tree_topic.Parent_Name}:`, error)
console.error(`Error updating topic ${tree_topic.Parent_Name}:`, error)
}
}
return updatedTreeTopicCluster
}
async function openaiTopicRepresentation(io, topic_cluster) {
const updatedTopicCluster = { ...topic_cluster } // Creating a shallow copy to avoid direct mutation
for (const topic of updatedTopicCluster.topic) {
const representativeDocuments = topic.Representative_Docs.join("\n ")
const representativeKeywords = topic.Representation.join(", ")
const openaiPrompt = `
I have a topic that contains the following documents:
${representativeDocuments}
The topic is described by the following keywords:
${representativeKeywords}
Based on the information above, extract a short topic label in the following format:
{
topic: <topic label>
}
`
// @ts-ignore
const openaiInput = {
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: openaiPrompt,
},
],
functions: [
{
name: "TopicExtractionMessages",
description: "Exctract Topic of Documents",
parameters: {
type: "object",
properties: {
topic: {
type: "string",
description:
"One line description of the topic of the documents",
},
},
required: ["topic"],
},
},
],
function_call: "auto",
}
try {
const response = await io.openai.createChatCompletion(
`chat-completion-${topic.Name}`,
openaiInput
)
// Assuming the response contains the new topic name directly
const responseData = response?.choices?.[0]
if (responseData.finish_reason === "function_call") {
if (responseData?.message?.function_call?.arguments) {
const functionArgs = JSON.parse(
responseData?.message?.function_call?.arguments
)
console.log("functionArgs:", functionArgs)
const newTopicName = functionArgs.topic
// Update the topic name
topic.Name = newTopicName
}
} else {
io.logger.warn(`${topic.Name} FUNCTION NOT BEING CALLED:`, responseData)
}
} catch (error) {
await io.logger.error(`Error updating topic ${topic.Name}:`, error)
console.error(`Error updating topic ${topic.Name}:`, error)
}
}
return updatedTopicCluster
}
async function writeIntentToNeo4j(topic, orgId) {
// Add to cypher
const cypher = `
MERGE (m:Message_topic {
title: $title,
extracted_keywords: $extracted_keywords,
topic: $topic,
discourse_post_id: $discourse_post_id
})
`
await write(cypher, { ...topic, orgId })
}
async function openaiEmbed(io, topic_titles) {
const embeddingResponse = await io.openai.createEmbedding(
`embedding-topic-titles`,
{
model: "text-embedding-ada-002",
input: topic_titles,
}
)
// console.log("embeddingResponse:", embeddingResponse)
return embeddingResponse.data
}
async function upsertEmbedding(io, discourse_messages, embeddingVector, orgId) {
// for each extracted keyword, upsert the embedding vector
const upsertData = discourse_messages.map((post, index) => ({
org_id: orgId,
keyword: null,
embedding_vector: embeddingVector[index].embedding,
topic_title: post.topic_title,
topic: post.topic_title,
discourse_post_id: post.discourse_post_id,
base_discourse_url: post.base_discourse_url,
}))
const { error } = await io.supabase.runTask(
`upsert-topic-embedding`,
async (db) => {
return db.from("discourse_topic_embeddings").upsert(upsertData, {
onConflict: "discourse_post_id, base_discourse_url",
})
}
)
if (error) {
await io.logger.error("Failed to upsert topic extraction:", error)
}
}
async function topicClusteringFetch(io, questions, embeddingVector) {
const jsonResponse = await io.runTask(
`topic-clustering-endpoint`,
async () => {
const response = await fetch(
"https://my-fastapi-app-ueugow36ea-uc.a.run.app/topic_clustering",
{
method: "POST",
headers: {
"Content-Type": "application/json",
accept: "application/json",
},
body: JSON.stringify({
topic_messages: questions,
embeddings: embeddingVector,
}),
}
)
return response.json()
}
)
return jsonResponse
}
async function topicClustering(io, questions, embeddingVector) {
// console.log("topic_titles:", questions.slice(0, 5))
// console.log("embeddingVector:", embeddingVector.slice(0, 5))
const response = await io.backgroundFetch(
"topic-clustering-endpoint",
"https://my-fastapi-app-ueugow36ea-uc.a.run.app/topic_clustering",
{
method: "POST",
headers: {
"Content-Type": "application/json",
// Authorization: redactString`Bearer ${auth.apiKey}`,
},
body: JSON.stringify({
topic_messages: questions,
embeddings: embeddingVector,
}),
},
{
"429": {
strategy: "backoff",
limit: 10,
minTimeoutInMs: 1000,
maxTimeoutInMs: 60000,
factor: 2,
randomize: true,
},
}
)
return response
}
async function writeDiscourseQuestionClusteringNeo4j(
topic_clustering_dict,
orgId
) {
const cypherCreateTopics = `
UNWIND $topics AS topic
WITH topic
CREATE (t:Topic {
id: topic.Topic,
count: topic.Count,
name: topic.Name,
representation: topic.Representation,
representative_docs: topic.Representative_Docs,
orgId: $orgId
})
`
await write(cypherCreateTopics, {
topics: topic_clustering_dict.topic,
orgId,
})
const cypherCreateDocuments = `
UNWIND $documents AS doc
MATCH (t:Topic {id: doc.Topic, orgId: $orgId})
CREATE (d:Document {
name: doc.Name,
topic_id: doc.Topic,
document: doc.Document,
representation: doc.Representation,
representative_docs: doc.Representative_Docs,
top_n_words: doc.Top_n_words,
probability: doc.Probability,
representative_document: doc.Representative_document,
relevance_score: doc.relevance_score,
orgId: $orgId
})
CREATE (t)-[:HAS_DOCUMENT]->(d)
`
await write(cypherCreateDocuments, {
documents: topic_clustering_dict.document,
orgId,
})
const cypherCreateHierarchicalTopics = `
UNWIND $tree AS item
MERGE (parent:Hierarchical_Topic {id: item.Parent_ID, name: item.Parent_Name, orgId: $orgId})
WITH item, parent
UNWIND item.Topics AS topicId
WITH item, parent, topicId
MATCH (topic:Topic {id: topicId, orgId: $orgId})
MERGE (parent)-[:INCLUDES_TOPIC]->(topic)
WITH item, parent
MERGE (leftChild:Hierarchical_Topic {id: item.Child_Left_ID, orgId: $orgId})
MERGE (rightChild:Hierarchical_Topic {id: item.Child_Right_ID, orgId: $orgId})
MERGE (parent)-[:HAS_CHILD {distance: item.Distance}]->(leftChild)
MERGE (parent)-[:HAS_CHILD {distance: item.Distance}]->(rightChild)
`
await write(cypherCreateHierarchicalTopics, {
tree: topic_clustering_dict.tree,
orgId,
})
}
async function rerankDocuments(io, topic_cluster) {
const documents = topic_cluster.document
const groupedByTopic = _.groupBy(documents, "Topic")
const updatedDocuments: any = []
for (const [topic, docs] of Object.entries(groupedByTopic)) {
const topic_name = topic_cluster.topic.find(
(t) => t.Topic === Number(topic)
)?.Name
if (!topic_name) {
await io.logger.warn(`Topic name not found for topic: ${topic}`)
await io.logger.warn(`Skipping topic: ${topic}`, topic_cluster)
continue
}
// const cleanedName = docs[0]["Name"].replace(/^\d+_/, "")
const query = `Does the question relate well with the topic: ${topic_name}`
const topic_documents = docs.map((doc) => doc.Document)
await io.logger.info(`Query-${topic_name}:`, query)
await io.logger.info(`topic_documents-${topic_name}:`, topic_documents)
try {
const jsonResponse = await io.runTask(
`rerank-cohere-${topic_name}`,
async () => {
const response = await fetch("https://api.cohere.ai/v1/rerank", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${COHERE_API_KEY}`,
accept: "application/json",
},
body: JSON.stringify({
return_documents: false,
max_chunks_per_doc: 10,
model: "rerank-english-v2.0",
query: query,
documents: topic_documents,
}),
})
return response.json()
}
)
if (jsonResponse && jsonResponse.results) {
for (const result of jsonResponse.results) {
const { index, relevance_score } = result
if (docs[index]) {
docs[index].relevance_score = relevance_score
docs[index].Name = topic_name //update topic_name
}
}
}
} catch (error) {
await io.logger.error(
`Error during rerank operation: ${topic_name}`,
error
)
console.error("Error during rerank operation:", error)
}
updatedDocuments.push(...docs)
}
return updatedDocuments
}
const COHERE_API_KEY = process.env.COHERE_API_KEY
const supabase = new Supabase({
id: "supabase",
supabaseUrl: process.env.NEXT_PUBLIC_SUPABASE_URL!,
supabaseKey: process.env.SUPABASE_SERVICE_ROLE_KEY!,
})
const openai = new OpenAI({
id: "openai",
apiKey: process.env.OPENAI_API_KEY!,
})
// Define the Trigger.dev job
client.defineJob({
id: "discourse-question-clustering",
name: "Discourse Question clustering",
version: "0.0.1",
trigger: eventTrigger({
name: "discourse.question.clustering",
schema: z.object({
// message_id: z.string(),
org_id: z.string(),
}),
}),
integrations: {
supabase,
//@ts-ignore
openai,
},
run: async (payload, io) => {
try {
// Fetch discord message from Supabase
const orgId = payload.org_id
//@ts-ignore
// const discourseMessages = await fetchDiscourseMessages(io, orgId)
const discourseMessages = await fetchDiscourseQuestionEmbeddings(
io,
orgId
)
// // Group the messages by channel_id using lodash
// const extractedData = await groupExtractMessagesByTopics(
// discourseMessages
// ) // Extracted logic
await io.logger.info("Extracted Data:", discourseMessages)
// console.log("extractedData:", JSON.stringify(discourseMessages))
// io.logger.info("extractedData:", extractedData);
// Now, extractedData is an object where each key is a channel_id, and the value contains channel_name and an array of extracted messages for that channel.
// const embedding = await openaiEmbed(
// io,
// discourseMessages.map((msg) => msg.topic_title)
// )
// await upsertEmbedding(io, discourseMessages, embedding, orgId)
// fetch toipc clustering
// const clustering_result = await topicClustering(
// io,
// discourseMessages.map((msg) => msg.question),
// discourseMessages.map((msg) => JSON.parse(msg.embedding_vector))
// )
const clustering_result = await topicClusteringFetch(
io,
discourseMessages.map((msg) => msg.question),
discourseMessages.map((msg) => JSON.parse(msg.embedding_vector))
)
await io.logger.info("Question Clustering:", clustering_result)
// Openai transform the topic names
const updatedTopicCluster = await openaiTopicRepresentation(
io,
clustering_result
)
// Topic clustering
// console.log("clustering_result:", updatedTopicCluster.topic)
const updatedDocuments = await rerankDocuments(io, clustering_result)
await io.logger.info("Updated Documents:", updatedDocuments)
console.log("updatedDocuments:", updatedDocuments)
//update the clustering result
clustering_result.document = updatedDocuments
// Openai transform the hierarchical topic names
const updatedHierarchicalTopicCluster =
await openaiTopicHierachicalRepresentation(io, clustering_result)
await io.logger.info("Updated Hierarchical Topic Cluster:", updatedHierarchicalTopicCluster)
console.log("updatedHierarchicalTopicCluster:", updatedHierarchicalTopicCluster)
io.logger.info("Writing to Neo4j")
await writeDiscourseQuestionClusteringNeo4j(updatedHierarchicalTopicCluster, orgId)
io.logger.info("Finished writing to Neo4j")
return {
success: true as const,
}
} catch (error) {
return {
success: false as const,
error,
}
}
},
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment