Skip to content

Instantly share code, notes, and snippets.

@blueneogeo
Created August 8, 2023 20:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blueneogeo/962e67fe8962852ab1b6a7c1bb5a4f69 to your computer and use it in GitHub Desktop.
Save blueneogeo/962e67fe8962852ab1b6a7c1bb5a4f69 to your computer and use it in GitHub Desktop.
Failing token streaming code
import { createClient as createSupabaseClient } from 'supabase-js'
import { Database } from '../_shared/supabase.ts'
import env from './env.ts'
/**
* Create a Supabase anon client that uses the Authorization header from the request
* to authenticate with Supabase.
*/
export function createAuthorizedClient(request: Request) {
return createSupabaseClient<Database>(
env.SUPABASE_URL,
env.SUPABASE_ANON_KEY,
{
global: {
headers: {
Authorization: request.headers.get('Authorization')!
}
}
}
)
}
/**
* Create a Supabase admin client that has full access. This should only be used
* for admin level operations, that bypass Supabase row-level security.
*/
export function createAdminClient() {
return createSupabaseClient<Database>(
env.SUPABASE_URL,
env.SUPABASE_SERVICE_KEY
)
}
import { Deferred, sleep } from "helpers"
import { createAuthorizedClient } from "../_shared/client.ts"
import { Status, dataResponse, errorResponse, serve } from "../_shared/serve.ts"
import { splitIntoWords } from "../_shared/tools.ts"
import { ChatRequestSchema, ChatResponse, ChatUpdate } from "./schema.ts"
serve(async function (request: Request) {
const client = createAuthorizedClient(request)
const { data: { user } } = await client.auth.getUser()
if (!user) return errorResponse(Status.NOT_FOUND, 'User not found')
const chatRequest = ChatRequestSchema.parse(await request.json())
const subscribed = new Deferred<void>()
let running = true
const channel = client.channel(chatRequest.channel)
.subscribe(status => {
switch (status) {
case 'SUBSCRIBED':
console.log('Now listening to channel', chatRequest.channel)
subscribed.resolve()
break
case 'CLOSED':
running = false
console.log('Channel closed')
break
case 'CHANNEL_ERROR':
running = false
subscribed.reject()
console.log('Channel error')
break
case "TIMED_OUT":
running = false
subscribed.reject()
console.log('Channel timed out')
break
}
})
await subscribed.promise
const tokens = splitIntoWords(chatRequest.conversation)
for await (const token of tokens) {
if (!running) break
const update: ChatUpdate = { newTokens: [token] }
await channel.send({ type: 'broadcast', event: 'new-tokens', payload: update })
console.log('sent token', token)
await sleep(100)
}
await sleep(500)
await channel.unsubscribe()
console.log('chat function finished', chatRequest.channel)
return dataResponse<ChatResponse>({ success: true, channel: chatRequest.channel })
})
import { serve as denoServe } from "server"
import { Status } from "./http-status.ts";
import { corsHeaders } from "./cors.ts";
export { Status } from "./http-status.ts";
/**
* Same as `serve` from `std/http/server.ts`, but catches errors and returns a 500 response with the error message
*/
export function serve(handler: (req: Request) => Promise<Response>) {
denoServe(async (req: Request) => {
try {
return await handler(req)
} catch (error) {
return errorResponse(500, error.message || error)
}
})
}
/**
* Generate a `Response` with the passed data as `JSON`.
*/
export function dataResponse<T>(data: T) {
return new Response(
JSON.stringify(data),
{ headers: { ...corsHeaders, "Content-Type": "application/json" } }
)
}
/**
* Generate an error `Response` with the passed http {@link Status} and error message
*/
export function errorResponse<T>(status: Status, message: string) {
console.log('sending status with error', { message })
return new Response(
JSON.stringify({ message }),
{ headers: { ...corsHeaders, "Content-Type": "application/json" }, status }
)
}
import { SupabaseClient } from "supabase-js"
import { assertEquals } from 'testing/asserts'
import { beforeAll } from "testing/bdd"
import { Database } from "../_shared/supabase.ts"
import { createTestAnonClient, getAndSignInTestUser } from '../_shared/test-client.ts'
import { ChatRequest, ChatResponseSchema, ChatUpdateSchema } from "./schema.ts"
let anonClient: SupabaseClient<Database>
beforeAll(async () => {
anonClient = createTestAnonClient()
await getAndSignInTestUser(anonClient)
})
async function testChat() {
const CHANNEL_NAME = 'test-channel'
const returnedTokens: string[] = []
const channel = anonClient.channel(CHANNEL_NAME)
.on('broadcast', { event: 'new-tokens' }, ({payload}) => {
console.log('payload', payload)
const response = ChatUpdateSchema.parse(payload)
returnedTokens.push(...response.newTokens)
})
.subscribe(status => {
switch (status) {
case 'SUBSCRIBED':
console.log('Now listening to channel', CHANNEL_NAME)
break
case 'CLOSED':
console.log('Channel closed')
break
case 'CHANNEL_ERROR':
console.log('Channel error')
break
case "TIMED_OUT":
console.log('Channel timed out')
break
}
})
const input: ChatRequest = {
channel: CHANNEL_NAME,
echo: true, // request that we just get the same conversation back that we send
model: 'gpt2',
conversation: 'hello how are you doing today?'
}
console.log('waiting for chat to end...')
const { data, error } = await anonClient.functions.invoke('chat', { body: input })
if (error) throw error.message
const chatResponse = ChatResponseSchema.parse(data)
console.log('end of chat response', chatResponse)
await channel.unsubscribe()
await anonClient.removeAllChannels()
const message = returnedTokens.join(' ')
assertEquals(message, input.conversation)
assertEquals(chatResponse.channel, CHANNEL_NAME)
}
Deno.test('Chat test', testChat)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment