Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to keep open streaming connections with Puma #1858

Merged
merged 2 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add support to keep open streaming connections with Puma
  • Loading branch information
jkowens authored and dentarg committed Feb 10, 2023
commit e8da82a4ac8cc0a68d4b495b01f15a3e741146a7
29 changes: 20 additions & 9 deletions examples/chat.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env ruby -I ../lib -I lib
# frozen_string_literal: true

require_relative 'rainbows'
# This example does *not* work properly with WEBrick or other
# servers that buffer output. To shut down the server, close any
# open browser tabs that are connected to the chat server.

require 'sinatra'
set :server, :rainbows
connections = []
set :server, :puma
connections = Set.new

get '/' do
halt erb(:login) unless params[:user]
Expand All @@ -14,13 +16,22 @@

get '/stream', provides: 'text/event-stream' do
stream :keep_open do |out|
connections << out
out.callback { connections.delete(out) }
if connections.add?(out)
out.callback { connections.delete(out) }
end
out << "heartbeat:\n"
sleep 1
rescue
out.close
end
end

post '/' do
connections.each { |out| out << "data: #{params[:msg]}\n\n" }
connections.each do |out|
out << "data: #{params[:msg]}\n\n"
rescue
out.close
end
204 # response without entity body
end

Expand All @@ -37,10 +48,10 @@
</html>

@@ login
<form action='/'>
<form action="/">
<label for='user'>User Name:</label>
<input name='user' value='' />
<input type='submit' value="GO!" />
<input name="user" value="" />
<input type="submit" value="GO!" />
</form>

@@ chat
Expand Down
14 changes: 12 additions & 2 deletions lib/sinatra/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ def each(&front)
@back.call(self)
rescue Exception => e
@scheduler.schedule { raise e }
ensure
close unless @keep_open
end
close unless @keep_open
end
end

Expand Down Expand Up @@ -506,7 +507,16 @@ def closed?
def stream(keep_open = false)
scheduler = env['async.callback'] ? EventMachine : Stream
current = @params.dup
body Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } }
stream = if scheduler == Stream && keep_open
Stream.new(scheduler, false) do |out|
until out.closed?
with_params(current) { yield(out) }
end
end
else
Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } }
end
body stream
end

# Specify response freshness policy for HTTP caches (Cache-Control header).
Expand Down
1 change: 1 addition & 0 deletions test/streaming_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def close.errback; end
env['async.close'] = close
stream(:keep_open) do |out|
out.callback { ran = true }
out.close
end
end
end
Expand Down