Skip to content

Instantly share code, notes, and snippets.

@alexrudall
Last active April 17, 2024 01:12
Show Gist options
  • Save alexrudall/cb5ee1e109353ef358adb4e66631799d to your computer and use it in GitHub Desktop.
Save alexrudall/cb5ee1e109353ef358adb4e66631799d to your computer and use it in GitHub Desktop.
ChatGPT streaming with ruby-openai, Rails 7, Hotwire, Turbostream, Sidekiq and Tailwind!

How to add ChatGPT streaming to your Ruby on Rails 7 app!

This guide will walk you through adding a ChatGPT-like messaging stream to your Ruby on Rails 7 app using ruby-openai, Rails 7, Hotwire, Turbostream, Sidekiq and Tailwind. All code included below!

Alt Text

First, add the ruby-openai gem! Needs to be at least version 4. Add Sidekiq too.

# Gemfile
gem "ruby-openai", "~> 4.0.0"

# Simple, efficient background processing using Redis.
# https://github.com/sidekiq/sidekiq
gem "sidekiq", "~> 7.0.9"

Install Redis on your machine.

brew install redis

Add Redis and Sidekiq to your Procfile so they run when you run bin/dev.

# Procfile.dev
web: bin/rails server -p 3000
css: bin/rails tailwindcss:watch
sidekiq: bundle exec sidekiq -c 2
queue: redis-server

Add your secret OpenAI token to your .env file. Get one from OpenAI here.

OPENAI_ACCESS_TOKEN=abc123

Add the new routes:

# config/routes.rb
resources :chats, only: %i[create show] do
  resources :messages, only: %i[create]
end

Generate the migrations:

bin/rails generate migration CreateChats user:references
bin/rails generate migration CreateMessages chat:references role:integer content:string

Add the rest of the code, full example files below!

# Controllers.
app/controllers/chats_controller.rb
app/controllers/messages_controller.rb

# Sidekiq job to stream the data from the OpenAI API.
app/jobs/get_ai_response.rb

# Migrations
db/migrate/20230427131800_create_chats.rb
db/migrate/20230427131900_create_messages.rb

# Models
app/models/chat.rb
app/models/message.rb

# Views
app/views/chats/show.html.erb
app/views/messages/_form.html.erb
app/views/messages/_message.html.erb
app/views/messages/create.turbo_stream.erb
# db/migrate/20230427131800_create_chats.rb
# bin/rails generate migration CreateChats user:references
class CreateChats < ActiveRecord::Migration[7.0]
def change
create_table :chats do |t|
t.references :user, null: false, foreign_key: true
t.timestamps
end
end
end
# db/migrate/20230427131900_create_messages.rb
# bin/rails generate migration CreateMessages chat:references role:integer content:string
class CreateMessages < ActiveRecord::Migration[7.0]
def change
create_table :messages do |t|
t.references :chat, foreign_key: true
t.integer :role, null: false, default: 0
t.string :content, null: false
t.integer :response_number, null: false, default: 0
t.timestamps
end
end
end
# app/models/chat.rb
class Chat < ApplicationRecord
belongs_to :user
has_many :messages, dependent: :destroy
end
# app/controllers/chats_controller.rb
class ChatsController < ApplicationController
before_action :authenticate_user!
before_action :set_chat, only: %i[show]
def show
respond_with(@chat)
end
def create
@chat = Chat.create(user: current_user)
respond_with(@chat)
end
private
def set_chat
@chat = Chat.find(params[:id])
end
end
# app/jobs/get_ai_response.rb
class GetAiResponse < SidekiqJob
RESPONSES_PER_MESSAGE = 1
def perform(chat_id)
chat = Chat.find(chat_id)
call_openai(chat: chat)
end
private
def call_openai(chat:)
OpenAI::Client.new.chat(
parameters: {
model: "gpt-3.5-turbo",
messages: Message.for_openai(chat.messages),
temperature: 0.8,
stream: stream_proc(chat: chat),
n: RESPONSES_PER_MESSAGE
}
)
end
def create_messages(chat:)
messages = []
RESPONSES_PER_MESSAGE.times do |i|
message = chat.messages.create(role: "assistant", content: "", response_number: i)
message.broadcast_created
messages << message
end
messages
end
def stream_proc(chat:)
messages = create_messages(chat: chat)
proc do |chunk, _bytesize|
new_content = chunk.dig("choices", 0, "delta", "content")
message = messages.find { |m| m.response_number == chunk.dig("choices", 0, "index") }
message.update(content: message.content + new_content) if new_content
end
end
end
# app/models/message.rb
class Message < ApplicationRecord
include ActionView::RecordIdentifier
enum role: { system: 0, assistant: 10, user: 20 }
belongs_to :chat
after_create_commit -> { broadcast_created }
after_update_commit -> { broadcast_updated }
def broadcast_created
broadcast_append_later_to(
"#{dom_id(chat)}_messages",
partial: "messages/message",
locals: { message: self, scroll_to: true },
target: "#{dom_id(chat)}_messages"
)
end
def broadcast_updated
broadcast_append_to(
"#{dom_id(chat)}_messages",
partial: "messages/message",
locals: { message: self, scroll_to: true },
target: "#{dom_id(chat)}_messages"
)
end
def self.for_openai(messages)
messages.map { |message| { role: message.role, content: message.content } }
end
end
# app/controllers/messages_controller.rb
class MessagesController < ApplicationController
include ActionView::RecordIdentifier
before_action :authenticate_user!
def create
@message = Message.create(message_params.merge(chat_id: params[:chat_id], role: "user"))
GetAiResponse.perform_async(@message.chat_id)
respond_to do |format|
format.turbo_stream
end
end
private
def message_params
params.require(:message).permit(:content)
end
end
# app/views/chats/show.html.erb
<div class="mx-auto w-full flex">
<div class="mx-auto">
<div class="bg-white py-8">
<div class="mx-auto max-w-lg px-6 ">
<ul role="list" class="overflow-y-auto max-h-[48vh] flex flex-col-reverse">
<%= turbo_stream_from "#{dom_id(@chat)}_messages" %>
<div id="<%= dom_id(@chat) %>_messages">
<%= render @chat.messages %>
</div>
</ul>
<%= render partial: "messages/form", locals: { chat: @chat } %>
</div>
</div>
</div>
</div>
# app/views/messages/create.turbo_stream.erb
<%= turbo_stream.append "#{dom_id(@message.chat)}_messages" do %>
<%= render "message", message: @message, scroll_to: true %>
<% end %>
<%= turbo_stream.replace "#{dom_id(@message.chat)}_message_form" do %>
<%= render "form", chat: @message.chat %>
<% end %>
# app/views/messages/_form.html.erb
<%= turbo_frame_tag "#{dom_id(chat)}_message_form" do %>
<%= form_with(model: Message.new, url: [chat, chat.messages.new]) do |form| %>
<div class="my-5">
<%= form.text_area :content, rows: 4, class: "block shadow rounded-md border border-gray-200 outline-none px-3 py-2 mt-2 w-full", autofocus: true, "x-on:keydown.cmd.enter" => "$event.target.form.requestSubmit();" %>
</div>
<div class="grid justify-items-end">
<%= form.button type: :submit, class: "rounded-lg py-3 px-5 bg-blue-600 text-white inline-block font-medium cursor-pointer" do %>
<i class="fas fa-paper-plane"></i>
<span class="pl-2">Send</span>
<% end %>
</div>
<% end %>
<% end %>
# app/views/messages/_message.html.erb
# Thanks to github.com/fanahova for this template!
<div id="<%= dom_id(message) %>_messages">
<% if message.user? %>
<div class="bg-sky-400 rounded-lg m-8 text-white p-4">
<%= message.content %>
</div>
<% else %>
<div class="bg-gray-200 rounded-lg m-8 p-4">
<%= message.content %>
</div>
<% end %>
</div>
@beouk
Copy link

beouk commented Jul 13, 2023

Possibly worth a note at the top this assumes you have devise or similar pre installed?

@TaraJura
Copy link

TaraJura commented Jul 14, 2023

Wont work as expected :D TechTools

@Tronerta
Copy link

For some reason stream_proc method wasn't working for me, I needed to put its content directly into OpenAI::Client.new.chat. Maybe that will help someone.

@Shaher-11
Copy link

what is pulling the OPENAI_ACCESS_TOKEN from .env file
I'm not able to see the value being pulled anywhere in the code ?

@Tronerta
Copy link

what is pulling the OPENAI_ACCESS_TOKEN from .env file I'm not able to see the value being pulled anywhere in the code ?

It is here in the gem docs: https://github.com/alexrudall/ruby-openai#with-config

@sschuez
Copy link

sschuez commented Jul 24, 2023

Nice, thank you. I was wondering how you configured your SidekiqJob to make it reliably stream. Would be great to have a look at that..

@amolk
Copy link

amolk commented Jul 30, 2023

Thank you for this gist!

For longer LLM responses, this results in a lot of broadcasted messages. Is there a way throttle? Maybe clear out any queued messages before broadcasting a new one?

@alexrudall
Copy link
Author

No probs @amolk, good question. Could you use broadcast_append_later_to for update and then debounce with something like sidekiq-debounce - basically just wait a few seconds before broadcasting an update, and only the "last" one would get sent?

@mindtonic
Copy link

mindtonic commented Aug 29, 2023

I love this and am grateful for your efforts! Thank you.

Philosophically, however, it feels really wrong to include view code in the model. Why should models have "write" access to the view layer? I'm just curious about your reasoning here and if there might be a better way to decouple the concerns.

Edit: I am aware this is the documented recommendation, but I am still curious about the reasoning. It seems like an anti-pattern to have a model communicating with the View layer. If someone has knowledge of a blog post or other rationale from the turbo-rails team I would love to read it.

@roberthopman
Copy link

Thank you, added all things in an example here: https://github.com/roberthopman/granny

@alexrudall
Copy link
Author

@mindtonic to me it's more like a pub/sub pattern; the model broadcasts and the view is subscribed, but other things could theoretically also subscribe, so it's not exactly writing directly.

@alexrudall
Copy link
Author

Thank you, added all things in an example here: https://github.com/roberthopman/granny

Nice! Good to have it cloneable like that

@chrismoy
Copy link

chrismoy commented Oct 2, 2023

Hey @alexrudall, I wanted to say thank you for your work on this project.

I am integrating it into my own app and wanted to make a suggestion. For your streaming proc, it functions great, but it has a downside of doing a lot of updates, which in turn means a lot of broadcast jobs. From my testing, a chunk can frequently be as small as a character, meaning that for even a moderately-sized paragraph response I can see 100+ database updates (and therefore an equal amount of broadcast jobs spun up). This is fine if you are the only user, but I imagine with multiple concurrent users or even a really large response this would become untenable fairly quickly.

To get around this, I built upon your proc and added a small PORO to the job, turning

def stream_proc(message:)
    proc do |chunk, _bytesize|
        new_content = chunk.dig("choices", 0, "delta", "content")
        message.update(content: message.content + new_content) if new_content
    end
end

into

def stream_proc(message:)
    buffer = StreamBuffer.new(message:)

    proc do |chunk, _bytesize|
          buffer.push(chunk:)
    end
end

class StreamBuffer
    INTERVAL = 300 # ms
    def initialize(message:)
        @message = message

        @buffer = ''
        @chunk_count = 0
        @stop = nil
    end

    def push(chunk:)
        @buffer_start ||= Time.zone.now

        @stop = chunk.dig('choices', 0, 'finish_reason')
        content = chunk.dig('choices', 0, 'delta', 'content')
        @buffer += content if content.present?

        return @message.content unless flush_required?

        flush
    end

    private

    def flush_required?
        return true if @stop

        time_elapsed = (Time.zone.now - @buffer_start).in_milliseconds
        time_elapsed > @chunk_count * INTERVAL
    end

    def flush
        @message.update(content: @message.content + @buffer)

        @buffer = ''
        @chunk_count += 1

        @message.content
    end
end

The trade-off is that the updates are not as scroll-y as before, but in return I noticed a 10x+ reduction in database updates/jobs at INTERVAL = 300ms (You can tune this to reach your desired scroll:update ratio). I figure the point of the stream is less the specific visual effect and more the UX of the user knowing that something is happening and that the request isn't hung anyways.

@mindtonic
Copy link

mindtonic commented Oct 3, 2023

@alexrudall I also am grateful for your efforts. Thank you!

I just couldn't get past having the model speak directly to the view layer. I also agree with @chrismoy that the database writes were, frankly, egregious.

My solution is fairly simple. This module is included in a Job. I created an instance variable to store the text as it was streamed, and leveraged ActionCable to handle the View layer updates. Once there was is more text, I update the database. As a safety net in my use case, the RevisionChannel updates a textarea which the user can save, thus creating a very simple redundancy on the stream results. (I also have an autosave feature for a third safety net, but that is more of a quality of life feature.)

I'm totally open to feedback and critique of this pattern. I'd love to hear other's thoughts and perspectives!

module Rewriter
  attr_accessor :revisable, :rewrite

  def revise text, revisable
    @revisable = revisable
    @rewrite = ""

    client = OpenAI::Client.new
    response = client.chat(
      parameters: {
        model: "gpt-3.5-turbo-0613",
        messages: [
          {
            "role": "system",
            "content": "Read the following text and improve the writing for readability and clarity while maintaining the author's original voice."
          },
          {
            "role": "user",
            "content": text
          }
        ],
        stream: stream_proc
      }
    )
  end

  def stream_proc
    proc do |chunk, _bytesize|
      new_text = chunk.dig("choices", 0, "delta", "content")
      if new_text
        @rewrite += new_text
        ActionCable.server.broadcast "revision:#{@revisable.id}", {text: new_text}
      else
        @revisable.update(text: @rewrite)
      end
    end
  end
end

@swkeever
Copy link

Thanks @alexrudall for your work on this library and example.

I created an alternate approach that uses server-sent events and Stimulus.js. Here's an example repo and a 5 minute YouTube tutorial if you're interested.

@alexrudall
Copy link
Author

Thanks all for comments and code! I like these solutions a lot @chrismoy @mindtonic , @swkeever , I've made a note to test + update this gist with one of these when I have a bit more time

@alexrudall
Copy link
Author

@chrismoy @mindtonic @swkeever @amok @roberthopman What about using StringIO for this?

  BUFFER_SIZE = 20 # String characters.

  def stream_proc(message:)
    buffer = StringIO.new

    proc do |chunk, _bytesize|
      flush = buffer.size >= BUFFER_SIZE
      stop = chunk.dig('choices', 0, 'finish_reason')
      content = chunk.dig('choices', 0, 'delta', 'content')

      buffer.write(content)
      if flush || stop
        current_content = message.content || ""
        updated_content = current_content + buffer.string

        message.update(content: updated_content)
        buffer.reopen # Clear the buffer
      end
    end

@DaAwesomeP
Copy link

Has anyone tried this example and had issues with Turbo working properly? First I had to adjust the form to be:

form_with(model: Message.new, url: my_resource_path(format: :turbo_stream)) do |form|

to avoid an ActionController::UnknownFormat error (Turbo was not requesting a Turbo frame from Rails).

Now I have an issue where submitting the form causes Turbo to redirect to the turbo frame response as plain text because it is missing the proper text/vnd.turbo-stream.html MIME in the Accept header in the request.

@MiltonRen
Copy link

Thanks @mindtonic ! I find this pattern fairly elegant. Translating this to turbo below:

def stream_proc(messages)
  proc do |chunk, _bytesize|
    new_content = chunk.dig("choices", 0, "delta", "content")
    finish_reason = chunk.dig("choices", 0, "finish_reason")

    message = messages.find { |m| m.response_number == chunk.dig("choices", 0, "index") }

    if new_content.present?
      # update the message in memory then broadcast
      message.content += new_content
      message.broadcast_updated
    end

    if finish_reason.present?
      message.save!
    end
  end
end

@nickkirt
Copy link

I have some problems with the order in which the messages are shown. The messages lose their order, so the question ends up AFTER the response, instead of BEFORE.

Does anyone else have that problem? Any suggestions? I've spent hours on this.

One thing that 'slightly' helps is, adjusting use of:
broadcast_append_later_to ()
broadcast_append_to -> and adding later... -> broadcast_append_later_to

@ShawnAukstak
Copy link

@nickkirt I have seen a couple of times the order got messed up. I've only seen it maybe 2-3 times out of 100. I haven't looked into it much yet or tried to reproduce it.

I'm assuming it's some race condition where one of the later jobs is being processed faster.

@nickkirt
Copy link

It's not that - sorry I wasn't very clear.

It always happens - per message (request from the user) / response. So instead of the response being after the message. It's the other way round. Initially, the order is correct. But when the streaming stops, the response ends up before the message (request).

I'm sure that it's a mistake that I've introduced. But do you have an idea of what is causing the switch - which occurs upon completion of the streaming message.

@ShawnAukstak
Copy link

It always happens - per message (request from the user) / response. So instead of the response being after the message. It's the other way round. Initially, the order is correct. But when the streaming stops, the response ends up before the message (request).

Hey, one bright side of it happening every time it makes it easier to debug and narrow down :) My solution is different from the exact code in the gist, but it's the same idea, and I haven't had this issue.

Is the order still messed up after you do a hard refresh of the Chat page?

@nickkirt
Copy link

Once it's refreshed it's in the right order.

It's working OK ish, but it jitters upon completion of the stream - that's the best I can get it working. But I can't pinpoint what is happening upon completion.

If you want, try it at: aymi.ai (it's a work in progress).

Just chat with a character (requires sign-up, but it's really quick).

No worries if you don't have time.

@DaAwesomeP
Copy link

I believe I solved the order issue by adding an chat message order number field and using CSS to reorder them. Note that I had bad luck using the message ID because there seems to be some maximum order number value that CSS will order.

Something like:

<element style="order: <%= message.response_number %>">

Scrolling is a whole other thing: I just had it scroll to the message every time it received an update. If another message had been sent after the message that was updating, then I did not scroll (would be annoying to jump up or back and forth if two are updating). A scroll "lock" behavior that only scrolls when you are at the bottom would be preferred but not as trivial to implement.

@ShawnAukstak
Copy link

Thanks, @DaAwesomeP! This works great for me, but I haven't seen the out of order message since adding it. I changed the container to a flex column and added the inline style to each message.

I'm querying the order number, but I figure I could persist it to save a query, too. I have an index on created_at in my messages table in my implementation.

In models/chatbot_message.rb

  def message_order
    @message_order ||= conversation.messages.where("created_at <= ?", created_at).count
  end

In my chatbot_messages/_chatbot_message.html.erb partial:

  <div id="<%= dom_id(chatbot_message) %>" style="order: <%= chatbot_message.message_order %>;">

@nickkirt
Copy link

nickkirt commented Feb 5, 2024

Wow, I fixed it!!!

In the Messages Controller, there's the following line:
GetAiResponse.perform_async(@message.chat_id)

Instead, I used the following:
GetAiResponseJob.perform_now(@message.id)

I replaced it with:
GetAiResponseJob.perform_later(@message.id)

Now it works fine!

I'm surprised that's the problem. But I'm happy that it's working.

@OseasSon
Copy link

Did anyone manage to create a run and stream the response? I'm trying to implement streaming for assistants. I would appreciate any insights 🙏

@joshuachestang
Copy link

Did anyone manage to create a run and stream the response? I'm trying to implement streaming for assistants. I would appreciate any insights 🙏

+1

@ShawnAukstak
Copy link

Did anyone manage to create a run and stream the response? I'm trying to implement streaming for assistants. I would appreciate any insights 🙏

I used the basic structure mentioned in here with a number of changes to create one and it seems to be working well so far at limited scale. If you have any specific questions I'll try to follow-up and help!

Some other resources that might be helpful:
Drifting Ruby - streaming llm response
How To Integrate Chatgpt With Rails 7: Step-by-step Tutorial

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment