Skip to content

Commit

Permalink
Add support to keep open streaming connections with Puma
Browse files Browse the repository at this point in the history
  • Loading branch information
jkowens committed Jan 2, 2023
1 parent 9b6ba81 commit cf969dc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
29 changes: 20 additions & 9 deletions examples/chat.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env ruby -I ../lib -I lib
# frozen_string_literal: true

require_relative 'rainbows'
# This example does *not* work properly with WEBrick or other
# servers that buffer output. To shut down the server, close any
# open browser tabs that are connected to the chat server.

require 'sinatra'
set :server, :rainbows
connections = []
set :server, :puma
connections = Set.new

get '/' do
halt erb(:login) unless params[:user]
Expand All @@ -14,13 +16,22 @@

get '/stream', provides: 'text/event-stream' do
stream :keep_open do |out|
connections << out
out.callback { connections.delete(out) }
if connections.add?(out)
out.callback { connections.delete(out) }
end
out << "heartbeat:\n"
sleep 1
rescue
out.close
end
end

post '/' do
connections.each { |out| out << "data: #{params[:msg]}\n\n" }
connections.each do |out|
out << "data: #{params[:msg]}\n\n"
rescue
out.close
end
204 # response without entity body
end

Expand All @@ -37,10 +48,10 @@
</html>

@@ login
<form action='/'>
<form action="/">
<label for='user'>User Name:</label>
<input name='user' value='' />
<input type='submit' value="GO!" />
<input name="user" value="" />
<input type="submit" value="GO!" />
</form>

@@ chat
Expand Down
14 changes: 12 additions & 2 deletions lib/sinatra/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ def each(&front)
@back.call(self)
rescue Exception => e
@scheduler.schedule { raise e }
ensure
close unless @keep_open
end
close unless @keep_open
end
end

Expand Down Expand Up @@ -506,7 +507,16 @@ def closed?
def stream(keep_open = false)
scheduler = env['async.callback'] ? EventMachine : Stream
current = @params.dup
body Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } }
stream = if scheduler == Stream && keep_open
Stream.new(scheduler, false) do |out|
until out.closed?
with_params(current) { yield(out) }
end
end
else
Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } }
end
body stream
end

# Specify response freshness policy for HTTP caches (Cache-Control header).
Expand Down
1 change: 1 addition & 0 deletions test/streaming_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def close.errback; end
env['async.close'] = close
stream(:keep_open) do |out|
out.callback { ran = true }
out.close
end
end
end
Expand Down

0 comments on commit cf969dc

Please sign in to comment.