Skip to content

Instantly share code, notes, and snippets.

@python273
Last active June 16, 2024 11:09
Show Gist options
  • Save python273/563177b3ad5b9f74c0f8f3299ec13850 to your computer and use it in GitHub Desktop.
Save python273/563177b3ad5b9f74c0f8f3299ec13850 to your computer and use it in GitHub Desktop.
Flask Streaming Langchain Example
import os
os.environ["OPENAI_API_KEY"] = ""
from flask import Flask, Response, request
import threading
import queue
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.schema import AIMessage, HumanMessage, SystemMessage
app = Flask(__name__)
@app.route('/')
def index():
# just for the example, html is included directly, move to .html file
return Response('''
<!DOCTYPE html>
<html>
<head><title>Flask Streaming Langchain Example</title></head>
<body>
<form id="form">
<input name="prompt" value="write a short koan story about seeing beyond"/>
<input type="submit"/>
</form>
<div id="output"></div>
<script>
const formEl = document.getElementById('form');
const outputEl = document.getElementById('output');
let aborter = new AbortController();
async function run() {
aborter.abort(); // cancel previous request
outputEl.innerText = '';
aborter = new AbortController();
const prompt = new FormData(formEl).get('prompt');
try {
const response = await fetch(
'/chain', {
signal: aborter.signal,
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
prompt
}),
}
);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) { break; }
const decoded = decoder.decode(value, {stream: true});
outputEl.innerText += decoded;
}
} catch (err) {
console.error(err);
}
}
run(); // run on initial prompt
formEl.addEventListener('submit', function(event) {
event.preventDefault();
run();
});
</script>
</body>
</html>
''', mimetype='text/html')
class ThreadedGenerator:
def __init__(self):
self.queue = queue.Queue()
def __iter__(self):
return self
def __next__(self):
item = self.queue.get()
if item is StopIteration: raise item
return item
def send(self, data):
self.queue.put(data)
def close(self):
self.queue.put(StopIteration)
class ChainStreamHandler(StreamingStdOutCallbackHandler):
def __init__(self, gen):
super().__init__()
self.gen = gen
def on_llm_new_token(self, token: str, **kwargs):
self.gen.send(token)
def llm_thread(g, prompt):
try:
chat = ChatOpenAI(
verbose=True,
streaming=True,
callbacks=[ChainStreamHandler(g)],
temperature=0.7,
)
chat([HumanMessage(content=prompt)])
finally:
g.close()
def chain(prompt):
g = ThreadedGenerator()
threading.Thread(target=llm_thread, args=(g, prompt)).start()
return g
@app.route('/chain', methods=['POST'])
def _chain():
return Response(chain(request.json['prompt']), mimetype='text/plain')
if __name__ == '__main__':
app.run(threaded=True, debug=True)
@Houss3m
Copy link

Houss3m commented Mar 8, 2024

what if we are using Tools, how can we get streamings for each tool being invoked?

@YanSte
Copy link

YanSte commented Apr 19, 2024

Hi all !

I wanted to share with you a Custom Stream Response that I implemented in my FastAPI application recently.

I created this solution to manage streaming data.

You can use Stream, Event of Langchain but I'm doing special things with the Handlers that's why I need it.

Here examples:

Fast API

@router.get("/myExample")
async def mySpecialAPI(
    session_id: UUID,
    input="Hello",
) -> StreamResponse:
    # Note: Don't write await we need a coroutine
    invoke = chain.ainvoke(..)
    callback = MyCallback(..)
    return StreamResponse(invoke, callback)

Custom Stream Response

from __future__ import annotations
import asyncio
import typing
from typing import Any, AsyncIterable, Coroutine
from fastapi.responses import StreamingResponse as FastApiStreamingResponse
from starlette.background import BackgroundTask

class StreamResponse(FastApiStreamingResponse):
    def __init__(
        self,
        invoke: Coroutine,
        callback: MyCustomAsyncIteratorCallbackHandler,
        status_code: int = 200,
        headers: typing.Mapping[str, str] | None = None,
        media_type: str | None = "text/event-stream",
        background: BackgroundTask | None = None,
    ) -> None:
        super().__init__(
            content=StreamResponse.send_message(callback, invoke),
            status_code=status_code,
            headers=headers,
            media_type=media_type,
            background=background,
        )

    @staticmethod
    async def send_message(
        callback: AsyncIteratorCallbackHandler, invoke: Coroutine
    ) -> AsyncIterable[str]:
        asyncio.create_task(invoke)

        async for token in callback.aiter():
            yield token

My Custom Callbackhandler

from __future__ import annotations
import asyncio
from typing import Any, AsyncIterator, List

class MyCustomAsyncIteratorCallbackHandler(AsyncCallbackHandler):
    """Callback handler that returns an async iterator."""
    # Note: Can be a BaseModel than str
    queue: asyncio.Queue[Optional[str]]

    # Pass your params as you want
    def __init__(self) -> None:
        self.queue = asyncio.Queue()

    async def on_llm_new_token(
        self,
        token: str,
        tags: List[str] | None = None,
        **kwargs: Any,
    ) -> None:
         self.queue.put_nowait(token)

    async def on_llm_end(
        self,
        response: LLMResult,
        tags: List[str] | None = None,
        **kwargs: Any,
    ) -> None:
          self.queue.put_nowait(None)

   # Note: Ect.. for error 

    async def aiter(self) -> AsyncIterator[str]:
        while True:
            token = await self.queue.get()
           
            if isinstance(token, str):
                yield token # Note: or a BaseModel.model_dump_json() etc..

            elif token is None:
               self.queue.task_done()
               break

https://gist.github.com/YanSte/7be29bc93f21b010f64936fa334a185f

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment