Skip to content

Instantly share code, notes, and snippets.

@Necmttn
Created August 30, 2023 11:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Necmttn/5224c7968a58ea406eca37ab1f1b2d29 to your computer and use it in GitHub Desktop.
Save Necmttn/5224c7968a58ea406eca37ab1f1b2d29 to your computer and use it in GitHub Desktop.
Stream execute openai call including the JSON responses.
import {
createParser,
ParsedEvent,
ReconnectInterval,
} from "eventsource-parser";
import { Procedure } from "./procedure";
import openai, { ChatCompletionRequestMessage } from "@acme/openai";
import _ from "lodash";
export type Message = Omit<ChatCompletionRequestMessage, "role"> & {
role: "user" | "assistant" | "function" | "system";
name?: string;
content: string | null;
function_call?: {
name: string;
arguments: string;
};
};
type GPT_MODELS =
| "gpt-3.5-turbo-0613"
| "gpt-3.5-turbo-16k-0613"
| "gpt-4-0613";
export async function getApiResponse({
history,
procedures,
model = "gpt-3.5-turbo-0613",
stream = true,
}: {
model?: GPT_MODELS;
history: Message[];
procedures: Procedure<any, any, any>[];
stream?: boolean;
}) {
console.log("@".repeat(60));
console.log({
messages: _.last(history),
});
const response = await openai.createChatCompletion({
messages: history,
model,
stream,
temperature: 0.4,
functions:
procedures.length > 0 ? procedures.map((p) => p.toObject()) : undefined,
});
return response;
}
async function executeProcedure(
procedures: Procedure<any, any, any>[],
functionName: string,
args: string,
ctx: any,
) {
const procedure = procedures.find((p) => p.name === functionName);
if (!procedure) {
throw new Error(`Unknown function call: ${functionName}`);
}
const parsedArguments = JSON.parse(args);
const result = await procedure.operation({
ctx,
input: parsedArguments,
});
return result;
}
function mergeObjects<T extends { [key: string]: any }>(objects: T[]): T {
return objects.reduce((accumulator, current) => {
for (const key in current) {
if (current.hasOwnProperty(key)) {
if (
typeof current[key] === "string" &&
typeof accumulator[key] === "string"
) {
accumulator[key] = ((accumulator[key] as string) +
(current[key] as string)) as any;
} else if (current[key] !== undefined) {
if (
typeof current[key] === "object" &&
!Array.isArray(current[key]) &&
current[key] !== null
) {
// If it's a nested object, merge it
accumulator[key] = mergeObjects([
accumulator[key] || {},
current[key],
]) as any;
} else {
accumulator[key] = current[key];
}
}
}
}
return accumulator;
}, {} as T);
}
export async function executeStreamProcedures({
model,
history,
procedures,
ctx,
onResponse,
}: {
model?: GPT_MODELS;
history: Message[];
procedures: Procedure<any, any, any>[];
ctx: any;
onResponse: (content: string) => Promise<void>;
}) {
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const stream = new ReadableStream({
async start(controller) {
let latestResponse: Partial<{
index: number;
delta: {
role: string;
function_call?: {
name: string;
arguments: string;
};
content: string | null;
};
finish_reason: "stop" | "function_call" | null;
}> = {};
async function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === "event") {
const data = event.data;
if (data === "[DONE]") {
if (latestResponse?.finish_reason === "stop") {
console.log("Its finished.");
await onResponse(latestResponse?.delta?.content!);
controller.close();
return;
}
} else {
try {
const json = JSON.parse(data);
if (json?.error) {
throw new Error(json?.error);
}
const message = json.choices[0];
latestResponse = mergeObjects([latestResponse, message]);
const text = message?.delta.content;
if (text) {
const queue = encoder.encode(text);
controller.enqueue(queue);
}
} catch (e) {
console.log(e);
controller.error(e);
}
}
}
}
const parser = createParser(onParse);
while (true) {
const response = await getApiResponse({ history, procedures, model });
for await (const chunk of response.body as any) {
parser.feed(decoder.decode(chunk));
}
if (!!latestResponse.index) {
console.log("EMPTY RESPONSE");
break;
}
console.log("LATEST", latestResponse);
if (latestResponse?.delta?.function_call) {
history.push({
role: "assistant",
content: latestResponse?.delta?.content!,
function_call: latestResponse?.delta?.function_call,
});
const procedureResult = await executeProcedure(
procedures,
latestResponse?.delta?.function_call.name,
latestResponse.delta?.function_call.arguments,
ctx,
);
history.push({
role: "function",
name: latestResponse.delta?.function_call.name,
content: JSON.stringify(procedureResult),
});
latestResponse = {};
continue;
}
if (latestResponse?.delta?.content) {
history.push({
role: "assistant",
content: latestResponse?.delta?.content as string,
});
console.log({
"GOT RESPONSEEE": latestResponse?.delta?.content,
});
// await onResponse(latestResponse?.delta?.content);
}
if (latestResponse.finish_reason === "stop") {
break;
}
if (!latestResponse) {
console.log("SOMETHING WENT SOUTH.");
break;
}
}
},
});
return stream;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment