Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to keep open streaming connections with Puma #1858

Merged
merged 2 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions examples/chat.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env ruby -I ../lib -I lib
# frozen_string_literal: true

require_relative 'rainbows'
# This example does *not* work properly with WEBrick or other
# servers that buffer output. To shut down the server, close any
# open browser tabs that are connected to the chat server.

require 'sinatra'
set :server, :rainbows
connections = []
set :server, :puma
connections = Set.new

get '/' do
halt erb(:login) unless params[:user]
Expand All @@ -14,13 +16,22 @@

get '/stream', provides: 'text/event-stream' do
stream :keep_open do |out|
connections << out
out.callback { connections.delete(out) }
if connections.add?(out)
out.callback { connections.delete(out) }
end
out << "heartbeat:\n"
sleep 1
rescue
out.close
end
end

post '/' do
connections.each { |out| out << "data: #{params[:msg]}\n\n" }
connections.each do |out|
out << "data: #{params[:msg]}\n\n"
rescue
out.close
end
204 # response without entity body
end

Expand All @@ -37,10 +48,10 @@
</html>

@@ login
<form action='/'>
<form action="/">
<label for='user'>User Name:</label>
<input name='user' value='' />
<input type='submit' value="GO!" />
<input name="user" value="" />
<input type="submit" value="GO!" />
</form>

@@ chat
Expand Down
14 changes: 12 additions & 2 deletions lib/sinatra/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ def each(&front)
@back.call(self)
rescue Exception => e
@scheduler.schedule { raise e }
ensure
close unless @keep_open
end
close unless @keep_open
end
end

Expand Down Expand Up @@ -506,7 +507,16 @@ def closed?
def stream(keep_open = false)
scheduler = env['async.callback'] ? EventMachine : Stream
current = @params.dup
body Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } }
stream = if scheduler == Stream && keep_open
Stream.new(scheduler, false) do |out|
until out.closed?
with_params(current) { yield(out) }
end
end
else
Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } }
end
body stream
end

# Specify response freshness policy for HTTP caches (Cache-Control header).
Expand Down
4 changes: 2 additions & 2 deletions test/integration/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

set :out, nil
get '/async' do
stream(:keep_open) { |o| (settings.out = o) << "hi!" }
stream(:keep_open) { |o| (settings.out = o) << "hi!"; sleep 1 }
end

get '/send' do
Expand Down Expand Up @@ -66,7 +66,7 @@
class Subclass < Sinatra::Base
set :out, nil
get '/subclass/async' do
stream(:keep_open) { |o| (settings.out = o) << "hi!" }
stream(:keep_open) { |o| (settings.out = o) << "hi!"; sleep 1 }
end

get '/subclass/send' do
Expand Down
2 changes: 1 addition & 1 deletion test/integration_async_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module IntegrationAsyncHelper
def it(message, &block)
base_port = 5100 + Process.pid % 100

%w(rainbows).each_with_index do |server_name, index|
%w(rainbows puma).each_with_index do |server_name, index|
server = IntegrationHelper::BaseServer.new(server_name, base_port + index)
next unless server.installed?

Expand Down
9 changes: 8 additions & 1 deletion test/integration_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ def run

def installed?
return @installed unless @installed.nil?
s = server == 'HTTP' ? 'net/http/server' : server
s = case server
when 'HTTP'
'net/http/server'
when 'puma'
'puma/rack/handler'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests output show puma is not installed, skipping integration tests so I think this require is not working as expected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch 😬 Explains why things went so smooth 😆

else
server
end
require s
@installed = true
rescue LoadError
Expand Down
1 change: 1 addition & 0 deletions test/streaming_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def close.errback; end
env['async.close'] = close
stream(:keep_open) do |out|
out.callback { ran = true }
out.close
end
end
end
Expand Down