Skip to content

Instantly share code, notes, and snippets.

@palkan
Last active April 16, 2024 04:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save palkan/270a192e79b05d5601fe497ad3ec83b5 to your computer and use it in GitHub Desktop.
Save palkan/270a192e79b05d5601fe497ad3ec83b5 to your computer and use it in GitHub Desktop.
ActionCable over SSE

Action Cable over SSE

This is a demo of leveraging the proposed Action Cable architecture to implement an SSE transport for Action Cable (without changing the user-space code, e.g., Connection and Channel classes).

Start the server by running the following command:

 ruby main.rb

Now, you can connect to Action Cable over SSE via cURL as follows:

 curl -N "http://localhost:3000/cable/events?user_id=123&identifier=%7B%22channel%22%3A%22ChatChannel%22%2C%22id%22%3A42%7D"

You can also connect to the same server via WebSocket, e.g., using ACLI:

 acli -u "ws://localhost:3000/cable?user_id=431" -c ChatChannel --channel-params id:42

Then, you can publish a message via cURL:

 curl -X POST -H Content-Type:application/json -d '{"message":"hello!"}' http://localhost:3000/rooms/42/messages

You should see it delivered to both clients!

require "bundler/inline"
# This is a demo of leveraging proposed Action Cable architecture to implement
# an SSE transport for Action Cable (without changing the user-space code, e.g., Connection and Channel classes).
#
# Start the server by running the following command:
#
# ruby main.rb
#
# Now, you can connect to Action Cable over SSE via cURL as follows:
#
# curl -N "http://localhost:3000/cable/events?user_id=123&identifier=%7B%22channel%22%3A%22ChatChannel%22%2C%22id%22%3A42%7D"
#
# You can also connect to the same server via WebSocket, e.g., using ACLI:
#
# acli -u "ws://localhost:3000/cable?user_id=431" -c ChatChannel --channel-params id:42
#
# Then, you can publish a message via cURL:
#
# curl -X POST -H Content-Type:application/json -d '{"message":"hello!"}' http://localhost:3000/rooms/42/messages
#
# You should see it delivered to both clients!
#
gemfile(true) do
gem "rails", github: "palkan/rails", branch: "refactor/action-cable-server-adapterization"
gem "puma"
end
require "action_controller/railtie"
require "action_cable/engine"
# config/application.rb
class App < Rails::Application
config.root = __dir__
config.eager_load = false
config.load_defaults 7.1
config.secret_key_base = "i_am_a_secret"
config.hosts = []
config.action_cable.disable_request_forgery_protection = true
config.logger = ActiveSupport::Logger.new($stdout)
routes.append do
resources :rooms, only: [] do
resources :messages, only: [:create]
end
namespace :cable, module: "action_cable/sse" do
get "events", to: "events#index"
end
end
end
ActiveSupport::Inflector.inflections(:en) do |inflect|
inflect.acronym "SSE"
end
# Configure Action Cable
ActionCable.server.config.cable = {
"adapter" => "async"
}
module ActionCable
module SSE
class Connection
attr_reader :logger, :request, :protocol, :server
private attr_reader :sse, :coder, :connection, :buffer, :connected
alias_method :connected?, :connected
delegate :env, to: :request
delegate :worker_pool, to: :server
def initialize(server, request, sse, coder: ActiveSupport::JSON, logger: Rails.logger)
@protocol = "actioncable-v1-json-sse"
@server = server
@logger = logger
@request = request
@coder = coder
@sse = sse
@connection = server.config.connection_class.call.new(server, self)
@buffer = Queue.new
@connected = true
end
def process
loop do
# Timeout must be greater then the heartbeat interval
data = buffer.pop(timeout: 5)
if data
perform_transmit(data)
else
raise "No heartbeat received from client for 5 seconds, closing connection."
end
end
end
def transmit(data)
return unless connected?
buffer << data
end
def perform_transmit(data)
sse.write(coder.encode(data))
end
def close
@connected = false
sse.close
end
def receive(message) # :nodoc:
payload = coder.decode(message)
connection.handle_incoming(payload)
end
def handle_open
connection.handle_open
server.add_connection(connection)
end
def handle_close
server.remove_connection(connection)
connection.handle_close
end
def perform_work(receiver, method_name, *)
worker_pool.async_invoke(receiver, method_name, *, connection: self)
end
end
class EventsController < ActionController::Base
include ActionController::Live
def index
response.headers["Content-Type"] = "text/event-stream"
server = ActionCable.server
sse = ActionController::Live::SSE.new(response.stream)
connection = Connection.new(
server,
request,
sse,
logger: server.new_tagged_logger(request)
)
connection.handle_open
server.setup_heartbeat_timer
if params[:identifier]
subscription_request = { command: "subscribe", identifier: params[:identifier] }.to_json
# TODO: We need to handle the case where the subscription is rejected
connection.receive(subscription_request)
end
connection.process
rescue ClientDisconnected
connection.logger.info "Client disconnected"
ensure
connection&.handle_close
sse.close
end
end
end
end
module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :user_id
def connect
return reject_unauthorized_connection unless request.params[:user_id].present?
self.user_id = request.params[:user_id]
logger.debug "User connected via #{socket.protocol}: #{user_id}"
end
end
class Channel < ActionCable::Channel::Base
end
end
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from room_stream
end
def speak(data)
ActionCable.server.broadcast room_stream, {text: data["text"]}
end
private
def room_stream = "chat/#{params[:id]}"
end
class MessagesController < ActionController::Base
protect_from_forgery with: :null_session
def create
text = params.require(:message)
room_id = params[:room_id]
ActionCable.server.broadcast "chat/#{room_id}", {text:}
end
end
Rails.application.initialize!
require "rack/handler/puma"
Rack::Handler::Puma.run(Rails.application, Port: 3000, force_shutdown_after: 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment