Skip to content

Instantly share code, notes, and snippets.

@sebinsua
Last active March 11, 2024 12:02
Show Gist options
  • Save sebinsua/76fc5eb6fc498636bc637b9f10b7e6bf to your computer and use it in GitHub Desktop.
Save sebinsua/76fc5eb6fc498636bc637b9f10b7e6bf to your computer and use it in GitHub Desktop.
Smooth a stream of LLM tokens into a stream of characters while reducing jitter by stabilising output timing. Explorations of different approaches.
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
class AsyncQueue<T> {
queuedItems: (T | Error)[];
queuedProcessors: [(item: T) => void, (error: Error) => void][];
constructor() {
// Note: The FIFO `shift` operations we do on these arrays are `O(n)`.
// The performance is acceptable to us for now. If we ever need
// to optimize this we can swap the arrays for linked lists.
// However, without benchmarking it's hard to know whether we
// would benefit more from keeping the contiguous memory layout
// of an array or from moving to linked lists and getting `shift`
// operations with a time complexity of `O(1)` instead of `O(N)`.
//
// Note: We've implemented this here already:
// https://gist.github.com/sebinsua/76fc5eb6fc498636bc637b9f10b7e6bf
this.queuedItems = [];
this.queuedProcessors = [];
}
enqueue(item: T | Error) {
if (this.queuedProcessors.length > 0) {
const [resolve, reject] = this.queuedProcessors.shift()!;
if (item instanceof Error) {
reject(item);
} else {
resolve(item);
}
} else {
this.queuedItems.push(item);
}
}
async dequeue(): Promise<T> {
if (this.queuedItems.length > 0) {
const item = this.queuedItems.shift()!;
if (item instanceof Error) {
throw item;
}
return item;
} else {
return new Promise((resolve, reject) =>
this.queuedProcessors.push([resolve, reject])
);
}
}
size() {
return this.queuedItems.length;
}
}
interface CalculateDelayOptions {
initialDelay?: number;
zeroDelayQueueSize?: number;
}
function calculateDelay(
queueSize: number,
{ initialDelay = 32, zeroDelayQueueSize = 64 }: CalculateDelayOptions = {}
): number {
return Math.max(
0,
Math.floor(initialDelay - (initialDelay / zeroDelayQueueSize) * queueSize)
);
}
export type TokenizeFn = (
text: string,
inclusive?: boolean,
eof?: boolean
) => (readonly [token: string, index: number])[];
export type TokenizeType = "preserve" | "chars" | "words";
export type SmoothOptions = CalculateDelayOptions & {
tokenize?: TokenizeType | TokenizeFn;
};
function preserve(buffer: string) {
return [[buffer, buffer.length] as const];
}
function chars(buffer: string) {
return buffer.split("").map((token, index) => [token, index + 1] as const);
}
function chunks(buffer: string, regex: RegExp, inclusive = false, eof = false) {
const ws = [];
let lastIndex = 0;
for (let currentIndex = 0; currentIndex < buffer.length; currentIndex++) {
if (regex.test(buffer[currentIndex]!)) {
ws.push([
buffer.slice(lastIndex, currentIndex + (inclusive ? 1 : 0)),
currentIndex + (inclusive ? 1 : 0),
] as const);
lastIndex = currentIndex;
}
}
if (eof) {
ws.push([buffer.slice(lastIndex), buffer.length] as const);
}
return ws;
}
function words(buffer: string, eof = false) {
return chunks(buffer, /\s/, false, eof);
}
function clauses(buffer: string, eof = false) {
return chunks(buffer, /[.,!?;]/, true, eof);
}
const tokenizers = {
chars,
words,
clauses,
preserve,
} as const;
/**
* Smooth a stream of LLM tokens into a stream of characters or semantic chunks
* while reducing jitter by stabilising output timing.
*
* @param streamingData A stream of LLM tokens.
* @param options Options for the smoothing algorithm.
*/
export async function* smooth(
streamingData: AsyncGenerator<string | undefined>,
{ tokenize: _tokenize = chars, ...options }: SmoothOptions = {}
) {
const tokenize =
typeof _tokenize === "function" ? _tokenize : tokenizers[_tokenize];
const queue = new AsyncQueue<string | undefined>();
void (async () => {
let buffer = "";
let lastIndex: number | undefined;
try {
for await (const oldToken of streamingData) {
buffer += oldToken ?? "";
for (const [newToken, index] of tokenize(buffer)) {
queue.enqueue(newToken);
lastIndex = index;
}
if (typeof lastIndex === "number") {
buffer = buffer.slice(lastIndex);
lastIndex = undefined;
}
}
// Flush the buffer.
for (const [newToken] of tokenize(buffer, true)) {
queue.enqueue(newToken);
}
} catch (error) {
queue.enqueue(error as Error);
} finally {
queue.enqueue(undefined);
}
})();
while (true) {
const newToken = await queue.dequeue();
if (newToken === undefined) {
break;
}
yield newToken;
const delay = calculateDelay(queue.size(), options);
if (delay === 0) {
continue;
}
await sleep(delay);
}
}
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
class ListNode<T> {
public value: T;
public next: ListNode<T> | null = null;
constructor(value: T) {
this.value = value;
}
}
class LinkedList<T> {
private head: ListNode<T> | null = null;
private tail: ListNode<T> | null = null;
private _length: number = 0;
public isEmpty(): boolean {
return this._length === 0;
}
public size(): number {
return this._length;
}
public get length(): number {
return this._length;
}
public push(value: T): void {
const newNode = new ListNode(value);
if (this.tail) {
this.tail.next = newNode;
} else {
this.head = newNode;
}
this.tail = newNode;
this._length++;
}
public shift(): T | null {
if (!this.head) {
return null;
}
const headValue = this.head.value;
this.head = this.head.next;
if (!this.head) {
this.tail = null;
}
this._length--;
return headValue;
}
}
class AsyncQueue<T> {
queuedItems: LinkedList<T | Error>;
queuedProcessors: LinkedList<[(item: T) => void, (error: Error) => void]>;
constructor() {
// Note: The FIFO `shift` operations we do are `O(n)` on arrays.
// Therefore, we are using linked lists, however, without
// benchmarking it's hard to know whether we would benefit
// more from keeping the contiguous memory layout of an array
// or from continuing to use linked lists in order to get
// `shift` operations with a time complexity of `O(1)` instead
// of `O(N)`.
this.queuedItems = new LinkedList();
this.queuedProcessors = new LinkedList();
}
enqueue(item: T | Error) {
if (this.queuedProcessors.length > 0) {
const [resolve, reject] = this.queuedProcessors.shift()!;
if (item instanceof Error) {
reject(item);
} else {
resolve(item);
}
} else {
this.queuedItems.push(item);
}
}
async dequeue(): Promise<T> {
if (this.queuedItems.length > 0) {
const item = this.queuedItems.shift()!;
if (item instanceof Error) {
throw item;
}
return item;
} else {
return new Promise((resolve, reject) =>
this.queuedProcessors.push([resolve, reject])
);
}
}
size() {
return this.queuedItems.length;
}
}
interface CalculateDelayOptions {
initialDelay?: number;
zeroDelayQueueSize?: number;
}
function calculateDelay(
queueSize: number,
{ initialDelay = 32, zeroDelayQueueSize = 64 }: CalculateDelayOptions = {}
): number {
return Math.max(
0,
Math.floor(initialDelay - (initialDelay / zeroDelayQueueSize) * queueSize)
);
}
export type TokenizeFn = (
text: string,
inclusive?: boolean,
eof?: boolean
) => (readonly [token: string, index: number])[];
export type TokenizeType = "preserve" | "chars" | "words";
export type SmoothOptions = CalculateDelayOptions & {
tokenize?: TokenizeType | TokenizeFn;
};
function preserve(buffer: string) {
return [[buffer, buffer.length] as const];
}
function chars(buffer: string) {
return buffer.split("").map((token, index) => [token, index + 1] as const);
}
function chunks(buffer: string, regex: RegExp, inclusive = false, eof = false) {
const ws = [];
let lastIndex = 0;
for (let currentIndex = 0; currentIndex < buffer.length; currentIndex++) {
if (regex.test(buffer[currentIndex]!)) {
ws.push([
buffer.slice(lastIndex, currentIndex + (inclusive ? 1 : 0)),
currentIndex + (inclusive ? 1 : 0),
] as const);
lastIndex = currentIndex;
}
}
if (eof) {
ws.push([buffer.slice(lastIndex), buffer.length] as const);
}
return ws;
}
function words(buffer: string, eof = false) {
return chunks(buffer, /\s/, false, eof);
}
function clauses(buffer: string, eof = false) {
return chunks(buffer, /[.,!?;]/, true, eof);
}
const tokenizers = {
chars,
words,
clauses,
preserve,
} as const;
/**
* Smooth a stream of LLM tokens into a stream of characters or semantic chunks
* while reducing jitter by stabilising output timing.
*
* @param streamingData A stream of LLM tokens.
* @param options Options for the smoothing algorithm.
*/
export async function* smooth(
streamingData: AsyncGenerator<string | undefined>,
{ tokenize: _tokenize = chars, ...options }: SmoothOptions = {}
) {
const tokenize =
typeof _tokenize === "function" ? _tokenize : tokenizers[_tokenize];
const queue = new AsyncQueue<string | undefined>();
void (async () => {
let buffer = "";
let lastIndex: number | undefined;
try {
for await (const oldToken of streamingData) {
buffer += oldToken ?? "";
for (const [newToken, index] of tokenize(buffer)) {
queue.enqueue(newToken);
lastIndex = index;
}
if (typeof lastIndex === "number") {
buffer = buffer.slice(lastIndex);
lastIndex = undefined;
}
}
// Flush the buffer.
for (const [newToken] of tokenize(buffer, true)) {
queue.enqueue(newToken);
}
} catch (error) {
queue.enqueue(error as Error);
} finally {
queue.enqueue(undefined);
}
})();
while (true) {
const newToken = await queue.dequeue();
if (newToken === undefined) {
break;
}
yield newToken;
const delay = calculateDelay(queue.size(), options);
if (delay === 0) {
continue;
}
await sleep(delay);
}
}
@sebinsua
Copy link
Author

sebinsua commented Nov 12, 2023

The algorithm I wrote adjusts the delay per emit based on the queue size (e.g. calculateDelay waits 32ms per emit by default, but adjusts this delay downwards as the queue size increases, eventually removing the delay entirely if there are 64 items in the queue). Given that the default emit style is chars, that would mean when there are 64 characters in the queue the queue is emitting as fast as possible, and as the tokens might have been roughly 3.5 chars each this could occur as early as 20 tokens in. (These defaults were chosen based on experimentation and if you changed from emitting chars to words or clauses, you'd need to experiment again to choose new values.)

This approach slows the stream down a little and does generally reduce the jitter (such as pauses and bursts in the token stream). The approach was made based on educated guess-work about what might work and evaluated by eye-balling it instead of in a systematic manner. It is not perfect and sometimes the queue is exhausted only to find that we are waiting on a token. Another approach might be to extend the function that calculates the delay length by looking at the rate of increase/decrease of the queue size, as this would allow the algorithm to consider 'the future' in its decisions.

@sebinsua
Copy link
Author

sebinsua commented Nov 12, 2023

In the approach below, we change from using a calculateDelay function to choose the delay between emits to a SmoothDelayCalculator class that can be injected.

The new class extends the original linear queue size approach with some further adjustments to the delay length based on the rate of increase/decrease of the queue size (this rate is smoothed using the previous rate of increase/decrease). The idea behind this is that if the queue is large and emitting is happening fast, but suddenly the queue stops growing due to an issue with the LLM output or HTTP connection we want to further slow down the emits to avoid rapidly exhausting the queue before we are able to recover.

Without wrapping with smooth:

Screen.Recording.2023-11-15.at.14.16.58.mov

After wrapping with smooth:

Screen.Recording.2023-11-15.at.14.18.19.mov

Yet again the logic and its configuration is eye-balled. Realistically to improve upon this I need to record some real-world stream data outputs and then graph how different approaches work with regards to them...

function sleep(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

class ListNode<T> {
  public value: T;
  public next: ListNode<T> | null = null;

  constructor(value: T) {
    this.value = value;
  }
}

class LinkedList<T> {
  private head: ListNode<T> | null = null;
  private tail: ListNode<T> | null = null;
  private _length: number = 0;

  public isEmpty(): boolean {
    return this._length === 0;
  }

  public size(): number {
    return this._length;
  }

  public get length(): number {
    return this._length;
  }

  public push(value: T): void {
    const newNode = new ListNode(value);
    if (this.tail) {
      this.tail.next = newNode;
    } else {
      this.head = newNode;
    }
    this.tail = newNode;
    this._length++;
  }

  public shift(): T | null {
    if (!this.head) {
      return null;
    }
    const headValue = this.head.value;
    this.head = this.head.next;
    if (!this.head) {
      this.tail = null;
    }
    this._length--;
    return headValue;
  }
}

class AsyncQueue<T> {
  queuedItems: LinkedList<T | Error>;
  queuedProcessors: LinkedList<[(item: T) => void, (error: Error) => void]>;

  constructor() {
    // Note: The FIFO `shift` operations we do are `O(n)` on arrays.
    //       Therefore, we are using linked lists, however, without
    //       benchmarking it's hard to know whether we would benefit
    //       more from keeping the contiguous memory layout of an array
    //       or from continuing to use linked lists in order to get
    //       `shift` operations with a time complexity of `O(1)` instead
    //       of `O(N)`.
    this.queuedItems = new LinkedList();
    this.queuedProcessors = new LinkedList();
  }

  enqueue(item: T | Error) {
    if (this.queuedProcessors.length > 0) {
      const [resolve, reject] = this.queuedProcessors.shift()!;
      if (item instanceof Error) {
        reject(item);
      } else {
        resolve(item);
      }
    } else {
      this.queuedItems.push(item);
    }
  }

  async dequeue(): Promise<T> {
    if (this.queuedItems.length > 0) {
      const item = this.queuedItems.shift()!;
      if (item instanceof Error) {
        throw item;
      }
      return item;
    } else {
      return new Promise((resolve, reject) =>
        this.queuedProcessors.push([resolve, reject])
      );
    }
  }

  size() {
    return this.queuedItems.length;
  }
}

export interface CreateCalculateLinearDelayOptions {
  initialDelay?: number;
  zeroDelayQueueSize?: number;
}

export function createCalculateLinearDelay({
  initialDelay = 32,
  zeroDelayQueueSize = 64,
}: CreateCalculateLinearDelayOptions = {}) {
  return function calculateLinearDelay(queueSize: number): number {
    return Math.max(
      0,
      Math.floor(initialDelay - (initialDelay / zeroDelayQueueSize) * queueSize)
    );
  };
}

export abstract class CalculateDelayClass {
  abstract calculateDelay(queueSize: number): number;
}

export interface SmoothDelayCalculatorOptions {
  initialDelay?: number;
  zeroDelayQueueSize?: number;
  smoothingFactor?: number;
  decreaseResponsivenessFactor?: number;
  increaseResponsivenessFactor?: number;
}

export class SmoothDelayCalculator implements CalculateDelayClass {
  initialDelay: number;
  zeroDelayQueueSize: number;
  smoothingFactor: number;
  decreaseResponsivenessFactor: number;
  increaseResponsivenessFactor: number;

  #previousQueueSize: number;
  #previousRateOfChange: number;

  constructor({
    initialDelay = 32,
    zeroDelayQueueSize = 64,
    smoothingFactor = 0.5,
    increaseResponsivenessFactor = 5.0,
    decreaseResponsivenessFactor = 100.0,
  }: SmoothDelayCalculatorOptions = {}) {
    this.initialDelay = initialDelay;
    this.zeroDelayQueueSize = zeroDelayQueueSize;

    // Clamp the smoothing factor to the range [0, 1].
    this.smoothingFactor = Math.max(0, Math.min(1, smoothingFactor));
    this.decreaseResponsivenessFactor = Math.max(
      0,
      decreaseResponsivenessFactor
    );
    this.increaseResponsivenessFactor = Math.max(
      0,
      increaseResponsivenessFactor
    );

    this.#previousQueueSize = 0;
    this.#previousRateOfChange = 0;
  }

  calculateDelay(queueSize: number): number {
    const initialDelay = this.initialDelay;
    const zeroDelayQueueSize = this.zeroDelayQueueSize;
    const smoothingFactor = this.smoothingFactor;
    const decreaseResponsivenessFactor = this.decreaseResponsivenessFactor;
    const increaseResponsivenessFactor = this.increaseResponsivenessFactor;

    const previousQueueSize = this.#previousQueueSize;
    const previousRateOfChange = this.#previousRateOfChange;

    // Calculate the linear delay using the same calculation as `createCalculateLinearDelay`.
    const linearDelay = Math.max(
      0,
      Math.floor(initialDelay - initialDelay * (queueSize / zeroDelayQueueSize))
    );

    // Calculate the current rate of change.
    const currentRateOfChange = queueSize - previousQueueSize;

    // Apply exponential smoothing to the rate of change.
    const smoothedRateOfChange =
      currentRateOfChange * smoothingFactor +
      previousRateOfChange * (1 - smoothingFactor);

    // When the queue size is rapidly decreasing, we want to increase our delay,
    // while when the queue size is rapidly increasing, we want to decrease our delay.
    //
    // We provide responsiveness factors to control how much we want to adjust the delay,
    // but also weight this value more heavily when the queue size is lower.
    const weightedSmoothedRateOfChange = Math.round(
      smoothedRateOfChange *
        (smoothedRateOfChange > 0
          ? (queueSize / zeroDelayQueueSize) * increaseResponsivenessFactor
          : (1 / Math.max(queueSize, 1)) * decreaseResponsivenessFactor)
    );

    // Adjust the delay based on the queue size and the weighted smoothed rate of change.
    const adjustedDelay = Math.max(
      0,
      linearDelay - weightedSmoothedRateOfChange
    );

    // Update previous state.
    this.#previousQueueSize = queueSize;
    this.#previousRateOfChange = currentRateOfChange;

    return adjustedDelay;
  }
}

export function preserve(buffer: string) {
  return [[buffer, buffer.length] as const];
}

export function chars(buffer: string) {
  return buffer.split("").map((token, index) => [token, index + 1] as const);
}

function chunks(buffer: string, regex: RegExp, inclusive = false, eof = false) {
  const ws = [];

  let lastIndex = 0;
  for (let currentIndex = 0; currentIndex < buffer.length; currentIndex++) {
    if (regex.test(buffer[currentIndex]!)) {
      ws.push([
        buffer.slice(lastIndex, currentIndex + (inclusive ? 1 : 0)),
        currentIndex + (inclusive ? 1 : 0),
      ] as const);
      lastIndex = currentIndex;
    }
  }

  if (eof) {
    ws.push([buffer.slice(lastIndex), buffer.length] as const);
  }

  return ws;
}

export function words(buffer: string, eof = false) {
  return chunks(buffer, /\s/, false, eof);
}

export function clauses(buffer: string, eof = false) {
  return chunks(buffer, /[.,!?;]/, true, eof);
}

const tokenizers = {
  chars,
  words,
  clauses,
  preserve,
} as const;

const delayCalculators = {
  linear: () => createCalculateLinearDelay(),
  smooth: () => {
    const smoothDelayCalculator = new SmoothDelayCalculator();
    return smoothDelayCalculator.calculateDelay.bind(smoothDelayCalculator);
  },
} as const;

export type TokenizeFn = (
  text: string,
  inclusive?: boolean,
  eof?: boolean
) => (readonly [token: string, index: number])[];
export type TokenizeType = "preserve" | "chars" | "words";
export type CalculateDelayFn = (queueSize: number) => number;
export type CalculateDelayType = "linear" | "smooth";

export interface SmoothOptions {
  tokenize?: TokenizeType | TokenizeFn;
  calculateDelay?: CalculateDelayFn | CalculateDelayClass | CalculateDelayType;
}

/**
 * Smooth a stream of LLM tokens into a stream of characters or semantic chunks
 * while reducing jitter by stabilising output timing.
 *
 * @param streamingData A stream of LLM tokens.
 * @param options Options for the smoothing algorithm.
 */
export async function* smooth(
  streamingData: AsyncGenerator<string | undefined>,
  {
    tokenize: _tokenize = "chars",
    calculateDelay: _calculateDelay = "smooth",
  }: SmoothOptions = {}
) {
  const tokenize =
    typeof _tokenize === "function" ? _tokenize : tokenizers[_tokenize];

  let calculateDelay: CalculateDelayFn;
  if (typeof _calculateDelay === "function") {
    calculateDelay = _calculateDelay;
  } else if (typeof _calculateDelay === "string") {
    calculateDelay = delayCalculators[_calculateDelay]();
  } else {
    calculateDelay = _calculateDelay.calculateDelay.bind(_calculateDelay);
  }

  let isStreamingDataConsumed = false;

  const queue = new AsyncQueue<string | undefined>();
  void (async () => {
    let buffer = "";
    let lastIndex: number | undefined;
    try {
      for await (const oldToken of streamingData) {
        buffer += oldToken ?? "";
        for (const [newToken, index] of tokenize(buffer)) {
          queue.enqueue(newToken);
          lastIndex = index;
        }
        if (typeof lastIndex === "number") {
          buffer = buffer.slice(lastIndex);
          lastIndex = undefined;
        }
      }

      // Flush the buffer.
      for (const [newToken] of tokenize(buffer, true)) {
        queue.enqueue(newToken);
      }
    } catch (error) {
      queue.enqueue(error as Error);
    } finally {
      queue.enqueue(undefined);

      isStreamingDataConsumed = true;
    }
  })();

  let lastDelay = 0;
  while (true) {
    const newToken = await queue.dequeue();
    if (newToken === undefined) {
      break;
    }

    yield newToken;

    const delay = calculateDelay(queue.size());
    lastDelay = delay;

    if (delay === 0) {
      continue;
    }

    if (isStreamingDataConsumed) {
      await sleep(lastDelay);
    } else {
      await sleep(delay);
    }
  }
}

@sebinsua
Copy link
Author

sebinsua commented Nov 13, 2023

I experimented trying to put this on the back-end using ai in a Next.js app and a client-side React library on the UI. In order to do so, I had to transform to and from AsyncIterator and ReadableStream (as the latter doesn't support async iterables in V8/Chrome yet).

This actually doesn't work right now as the stream output does not arrive smoothly and instead a large chunk is outputted at once. I've not yet confirmed whether this is due to (1) the complex JSON response structure that is being outputted, (2) some kind of low-level HTTP/TCP chunking/segmentation while streaming responses, (3) whatever the browser does when receiving responses, (4) the way React has been configured to render text that arrives, or something else. I'll need to strip down the logic so it's smaller to look into each layer to find out what is happening.

In practice, it might always be better for this logic to exist on the UI side, since that way we can smooth out the jitter and pauses that are caused by network conditions, too.

Either way, here are some helpful utilities for integrating with Node.js streams:

/**
 * Implements ReadableStream.from(asyncIterable), which isn't documented in MDN and isn't implemented in node.
 * https://github.com/whatwg/streams/commit/8d7a0bf26eb2cc23e884ddbaac7c1da4b91cf2bc
 *
 * Inlined from: https://github.com/vercel/ai/blob/8429dce6e6a650cb837a4aafb42367a618fa03e4/packages/core/streams/ai-stream.ts#L249C1-L266C2
 */
export function readableFromAsyncIterable<T>(iterable: AsyncIterable<T>) {
  let it = iterable[Symbol.asyncIterator]();
  return new ReadableStream<T>({
    async pull(controller) {
      const { done, value } = await it.next();
      if (done) controller.close();
      else controller.enqueue(value);
    },

    async cancel(reason) {
      await it.return?.(reason);
    },
  });
}

export async function* readableToAsyncIterable<T>(
  readableStream: ReadableStream<T>,
): AsyncIterable<T> {
  const reader = readableStream.getReader();
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}


function uint8ArrayToString(uint8Array: Uint8Array) {
  const decoder = new TextDecoder();
  return decoder.decode(uint8Array);
}

function stringToUint8Array(str: string) {
  const encoder = new TextEncoder();
  return encoder.encode(str);
}

async function* tokenAsyncIterable(stream: ReadableStream<Uint8Array>) {
  for await (let chunk of readableToAsyncIterable(stream)) {
    yield uint8ArrayToString(chunk);
  }
}

async function* uint8ArrayAsyncIterable(
  asyncIterable: AsyncIterable<string>,
): AsyncIterable<Uint8Array> {
  for await (let chunk of asyncIterable) {
    yield stringToUint8Array(chunk);
  }
}

Then use like so:

return new StreamingTextResponse(
  readableFromAsyncIterable(
    uint8ArrayAsyncIterable(smooth(tokenAsyncIterable(stream))),
  ),
);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment