Skip to content

Instantly share code, notes, and snippets.

@rmosolgo
Last active March 27, 2023 08:38
Show Gist options
  • Star 23 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save rmosolgo/ba31acf93f07f8007d99ba365a662d8f to your computer and use it in GitHub Desktop.
Save rmosolgo/ba31acf93f07f8007d99ba365a662d8f to your computer and use it in GitHub Desktop.
GraphQL Ruby Subscriptions

Example end-to-end web app with subscriptions, demonstrating APIs.

See rmosolgo/graphql-ruby#613.

To run:

bundle install
bundle exec rackup
open localhost:4567

subscription-demo

require "graphql"
require "sinatra"
require "thin"
# Here's the application logic:
# - A set of stateful counters, identified by `#id`. They can be incremented.
# - An in-memory subscription database
# - A transport for sending payloads over open sockets
# - A queuing system for isolating subscription execution & delivery
module App
# An incrementing counter, identified by ID
Counter = Struct.new(:id, :value) do
def increment
self.value += 1
end
# Get or create a counter by id
def self.find(id)
@counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) }
@counters[id]
end
end
class Subscriptions < GraphQL::Subscriptions::Implementation
def initialize(schema:)
super
# Here's the "database":
#
# For each counter, who is subscribed to it?
# @return Hash<String => Array<GraphQL::Query>>
@subscriptions = Hash.new { |h, event_id| h[event_id] = [] }
#
# Given a Channel, who does it belong to?
# @return Hash<String => GraphQL::Query>
@queries = {}
#
# Given a Channel, return the IO to write to
# @return Hash<String => IO>
@streams = {}
end
# Part of the subscription API: put these subscriptions in the database
def subscribed(query, events)
puts "Registering #{query.context[:channel]}"
@queries[query.context[:channel]] = query
events.each do |ev|
@subscriptions[ev.key] << query
end
end
# Part of the subscription API: load the query data for this channel
def get_subscription(channel)
query = @queries[channel]
{
query_string: query.query_string,
variables: query.provided_variables,
context: {},
operation_name: query.operation_name,
}
end
# Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one
def each_channel(event_key)
@subscriptions[event_key].each do |query|
yield(query.context[:channel])
end
end
# Not used by GraphQL, but the Application needs some way to unsubscribe
# `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)`
def delete(channel)
@queries.delete(channel)
@subscriptions.each do |event_key, queries|
queries.reject! { |q| q.context[:channel] == channel }
end
close(channel)
end
# Optional subscription API -- could use ActiveJob etc here:
def enqueue(channel, event_key, object)
Thread.new {
execute(channel, event_key, object)
}
end
# Part of the subscription API: send `result` over `channel`.
def deliver(channel, result, ctx)
puts "Delivering to #{channel}: #{result}"
stream = @streams[channel]
# The client _may_ have opened this channel:
if stream
stream << "event: update\n"
stream << "data: #{JSON.dump(result)}\n\n"
else
# Stream was closed or never opened
delete(channel)
end
end
# Used by the transport layer:
def open(channel, stream)
@streams[channel] = stream
end
# Not used by GraphQL, but needed by the App to unsubscribe
def close(channel)
@streams.delete(channel)
end
end
end
# Here's the GraphQL API for this application:
module API
# Type system:
Definition = <<-GRAPHQL
type Subscription {
counterIncremented(id: ID!): Counter
}
type Counter {
id: ID!
value: Int!
}
type Query {
counter(id: ID!): Counter
}
type Mutation {
incrementCounter(id: ID!): Counter
}
GRAPHQL
# Resolve functions:
Resolvers = {
"Mutation" => {
"incrementCounter" => ->(o, a, c) {
counter = App::Counter.find(a["id"])
counter.increment
API::Schema.subscriber.trigger("counterIncremented", a, counter)
counter
}
},
"Query" => {
"counter" => ->(o, a, c) { App::Counter.find(a["id"]) }
},
"Counter" => {
"value" => ->(o, a, c) { o.value },
"id" => ->(o, a, c) { o.id },
}
}
# Schema, defined from the definition then updated with subscription info
Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do
use GraphQL::Subscriptions, implementation: App::Subscriptions
end
end
# Serve the HTML subscription dashboard
get "/" do
erb :index
end
# Send queries here, it will provide a Channel ID which the client _may_ open
post "/graphql" do
content_type "application/json"
channel = "socket-#{rand(10000)}"
res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel })
[200, {"x-channel-id" => channel}, JSON.dump(res)]
end
# Clients may open their channels here to receive updates
get '/channels/:channel' do
content_type 'text/event-stream'
stream(:keep_open) do |out|
channel = params[:channel]
puts "Stream for #{channel}"
API::Schema.subscriber.implementation.open(channel, out)
out.callback {
puts "Unsubscribing #{channel}"
# This is forwarded to the `store`
API::Schema.subscriber.implementation.delete(channel)
}
end
end
__END__
@@ index
<html>
<head>
<title>GraphQL Subscriptions Example</title>
<script src="https://code.jquery.com/jquery-3.2.1.js" integrity="sha256-DZAnKJ/6XZ9si04Hgrsxu/8s717jcIzLy3oi35EouyE=" crossorigin="anonymous"></script>
<style>
.dashboard {
display: flex;
}
table {
margin: 20px;
}
</style>
</head>
<body>
<h1>Subscriptions</h1>
<div class="dashboard">
<table>
<thead>
<tr>
<th colspan="4">Counters</th>
<tr>
<tr>
<th>Id</th>
<th>Value</td>
<th colspan="2">Actions</th>
</tr>
</thead>
<tbody id="counters">
</tbody>
</table>
<table>
<thead>
<tr>
<th colspan="3">Updates</th>
</tr>
<tr>
<th>Channel</th>
<th>Counter</th>
<th>Value</th>
</tr>
</thead>
<tbody id="updates">
</tbody>
</table>
<table>
<thead>
<tr>
<th colspan="2">Channels</th>
</tr>
<tr>
<th>Id</th>
<th>Actions</th>
<tr>
</thead>
<tbody id="channels">
</tbody>
</table>
</div>
<script>
var eventSources = {}
var subscriptionString = "subscription WatchCounter($id: ID!) { counterIncremented(id: $id) { id value } }"
var mutationString = "mutation IncrementCounter($id: ID!) { incrementCounter(id: $id) { value } }"
function incrementCounter(id) {
$.post("/graphql", {query: mutationString, variables: { id: id } }, function(response) {
$("#counter-value-" + id).text(response.data.incrementCounter.value)
})
}
function subscribeToCounter(id) {
$.ajax({
type: "POST",
url: "/graphql",
data: {
query: subscriptionString,
variables: {
id: id,
},
},
success: function(data, status, jqXHR) {
var channelId = jqXHR.getResponseHeader("x-channel-id")
openChannel(channelId)
}
})
}
function openChannel(channelId) {
var eventSource = new EventSource("/channels/" + channelId)
eventSources[channelId] = eventSource
eventSource.addEventListener("update", function(e) {
console.log("message", e)
var result = JSON.parse(e.data)
var counter = result.data.counterIncremented
$("#counter-value-" + counter.id).text(counter.value)
$("#updates").append("<tr><td>" + channelId + "</td><td>" + counter.id + "</td><td>" + counter.value + "</td></tr>")
})
$("#channels").append(channelRowTemplate.replace(/%channelid%/g, channelId))
}
function closeChannel(channelId) {
eventSources[channelId].close()
delete eventSources[channelId]
$("#channel-" + channelId).remove()
}
var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>...</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>"
var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>"
var counterIds = [1, 2, 3]
counterIds.forEach(function(counterId) {
$("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId))
})
var initialLoadQuery = "{ c1: counter(id: 1) { value } c2: counter(id: 2) { value } c3: counter(id: 3) { value } }"
$.post("/graphql", {query: initialLoadQuery}, function(data) {
$("#counter-value-1").text(data.data.c1.value)
$("#counter-value-2").text(data.data.c2.value)
$("#counter-value-3").text(data.data.c3.value)
})
</script>
</body>
</html>
require "./app"
run Sinatra::Application
source 'https://rubygems.org'
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions"
gem "sinatra"
gem "thin"
GIT
remote: git://github.com/rmosolgo/graphql-ruby.git
revision: 2c011f6fbc50132706d14a52027f4eea9df2e103
branch: subscriptions
specs:
graphql (1.6.6)
GEM
remote: https://rubygems.org/
specs:
daemons (1.2.4)
eventmachine (1.2.5)
mustermann (1.0.0)
rack (2.0.3)
rack-protection (2.0.0)
rack
sinatra (2.0.0)
mustermann (~> 1.0)
rack (~> 2.0)
rack-protection (= 2.0.0)
tilt (~> 2.0)
thin (1.7.2)
daemons (~> 1.0, >= 1.0.9)
eventmachine (~> 1.0, >= 1.0.4)
rack (>= 1, < 3)
tilt (2.0.8)
PLATFORMS
ruby
DEPENDENCIES
graphql!
sinatra
thin
BUNDLED WITH
1.15.3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment