Skip to content

Commit

Permalink
CI: refactor restart_does_not_drop_connections code (#3557)
Browse files Browse the repository at this point in the history
* CI: integration.rb - rename hot_restart_does_not_drop_connections to restart_does_not_drop_connections

Update to code to work with both hot and phased restarts

* CI: test_integration_cluster.rb - remove usr1_all_respond code

change to use `restart_does_not_drop_connections`

* CI: test_integration_single.rb - use restart_does_not_drop_connections

* CI: integration.rb - adjust restart timing, simplify asserts
  • Loading branch information
MSP-Greg authored Nov 29, 2024
1 parent 7b6bf02 commit 2c9aafe
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 104 deletions.
80 changes: 51 additions & 29 deletions test/helpers/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -405,19 +405,27 @@ def get_stats
JSON.parse read_pipe.read.split("\n", 2).last
end

def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
def restart_does_not_drop_connections(
num_threads: 1,
total_requests: 500,
config: nil,
unix: nil,
signal: nil,
log: nil
)
skipped = true
skip_if :jruby, suffix: <<-MSG
- file descriptors are not preserved on exec on JRuby; connection reset errors are expected during restarts
MSG
skip_if :truffleruby, suffix: ' - Undiagnosed failures on TruffleRuby'

clustered = (workers || 0) >= 2

args = "-w #{workers} -t 5:5 -q test/rackup/hello_with_delay.ru"
if Puma.windows?
@control_tcp_port = UniquePort.call
cli_server "--control-url tcp://#{HOST}:#{@control_tcp_port} --control-token #{TOKEN} #{args}"
cli_server "#{set_pumactl_args} #{args}", unix: unix, config: config, log: log
else
cli_server args
cli_server args, unix: unix, config: config, log: log
end

skipped = false
Expand All @@ -436,7 +444,7 @@ def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
num_requests.times do |req_num|
begin
begin
socket = TCPSocket.new HOST, @tcp_port
socket = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
fast_write socket, "POST / HTTP/1.1\r\nContent-Length: #{message.bytesize}\r\n\r\n#{message}"
rescue => e
replies[:write_error] += 1
Expand Down Expand Up @@ -482,21 +490,44 @@ def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
if Puma.windows?
cli_pumactl 'restart'
else
Process.kill :USR2, @pid
Process.kill signal, @pid
end
sleep 0.5
# If 'wait_for_server_to_boot' times out, error in thread shuts down CI
begin
wait_for_server_to_boot timeout: 5
rescue Minitest::Assertion # Timeout
run = false
if signal == :USR2
# If 'wait_for_server_to_boot' times out, error in thread shuts down CI
begin
wait_for_server_to_boot timeout: 5
rescue Minitest::Assertion # Timeout
run = false
end
end
restart_count += 1
sleep(Puma.windows? ? 2.0 : 0.5)

if Puma.windows?
sleep 2.0
elsif clustered
phase = signal == :USR2 ? 0 : restart_count
# If 'get_worker_pids phase' times out, error in thread shuts down CI
begin
get_worker_pids phase, log: log
# added sleep as locally 165 restarts in 7 seconds
sleep 0.15
rescue Minitest::Assertion # Timeout
run = false
end
else
sleep 0.10
end
end
end

client_threads.each(&:join)
# cycle thru threads rather than one at a time
until client_threads.empty?
client_threads.each_with_index do |t, i|
client_threads[i] = nil if t.join(1)
end
client_threads.compact!
end

run = false
restart_thread.join
if Puma.windows?
Expand All @@ -511,30 +542,21 @@ def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
msg << " %4d refused\n" % replies.fetch(:refused,0)
msg << " %4d read timeout\n" % replies.fetch(:read_timeout,0)
msg << " %4d reset\n" % replies.fetch(:reset,0)
msg << " %4d write_errors\n" % replies.fetch(:write_error,0)
msg << " %4d success\n" % replies.fetch(:success,0)
msg << " %4d success after restart\n" % replies.fetch(:restart,0)
msg << " %4d restart count\n" % restart_count

actual_requests = num_threads * num_requests
allowed_errors = (actual_requests * 0.002).round

refused = replies[:refused]
reset = replies[:reset]

if Puma.windows?
# 5 is default thread count in Puma?
reset_max = num_threads * restart_count
assert_operator reset_max, :>=, reset, "#{msg}Expected reset_max >= reset errors"
assert_operator 40, :>=, refused, "#{msg}Too many refused connections"
else
assert_equal 0, reset, "#{msg}Expected no reset errors"
max_refused = (0.001 * replies.fetch(:success,0)).round
assert_operator max_refused, :>=, refused, "#{msg}Expected no than #{max_refused} refused connections"
end
assert_equal 0, replies[:unexpected_response], "#{msg}Unexpected response"
assert_equal 0, replies[:read_timeout], "#{msg}Expected no read timeouts"

if Puma.windows?
assert_equal (num_threads * num_requests) - reset - refused, replies[:success]
assert_equal actual_requests - reset - refused, replies[:success]
else
assert_equal (num_threads * num_requests), replies[:success]
assert_operator replies[:success], :>=, actual_requests - allowed_errors, msg
end

ensure
Expand Down
97 changes: 25 additions & 72 deletions test/test_integration_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,33 @@ def teardown
end

def test_hot_restart_does_not_drop_connections_threads
hot_restart_does_not_drop_connections num_threads: 10, total_requests: 3_000
restart_does_not_drop_connections num_threads: 10, total_requests: 3_000,
signal: :USR2
end

def test_hot_restart_does_not_drop_connections
hot_restart_does_not_drop_connections num_threads: 1, total_requests: 1_000
restart_does_not_drop_connections num_threads: 1, total_requests: 1_000,
signal: :USR2
end

def test_phased_restart_does_not_drop_connections_threads
restart_does_not_drop_connections num_threads: 10, total_requests: 3_000,
signal: :USR1
end

def test_phased_restart_does_not_drop_connections
restart_does_not_drop_connections num_threads: 1, total_requests: 1_000,
signal: :USR1
end

def test_phased_restart_does_not_drop_connections_threads_fork_worker
restart_does_not_drop_connections num_threads: 10, total_requests: 3_000,
signal: :USR1 #, config: 'fork_worker', log: true
end

def test_phased_restart_does_not_drop_connections_unix
restart_does_not_drop_connections num_threads: 1, total_requests: 1_000,
signal: :USR1, unix: true
end

def test_pre_existing_unix
Expand Down Expand Up @@ -98,25 +120,6 @@ def test_term_closes_listeners_unix
term_closes_listeners unix: true
end

# Next two tests, one tcp, one unix
# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used

def test_usr1_all_respond_tcp
skip_unless_signal_exist? :USR1
usr1_all_respond unix: false
end

def test_usr1_fork_worker
skip_unless_signal_exist? :USR1
usr1_all_respond config: '--fork-worker'
end

def test_usr1_all_respond_unix
skip_unless_signal_exist? :USR1
usr1_all_respond unix: true
end

def test_term_exit_code
skip_unless_signal_exist? :TERM

Expand Down Expand Up @@ -658,57 +661,6 @@ def term_closes_listeners(unix: false)
end
end

# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def usr1_all_respond(unix: false, config: '')
cli_server "-w #{workers} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
threads = []
replies = []
mutex = Mutex.new

s = connect "sleep1", unix: unix
replies << read_body(s)

Process.kill :USR1, @pid

refused = thread_run_refused unix: unix

24.times do |delay|
threads << Thread.new do
thread_run_pid replies, delay, 1, mutex, refused, unix: unix
end
end

threads.each(&:join)

responses = replies.count { |r| r[/\ASlept 1/] }
resets = replies.count { |r| r == :reset }
refused = replies.count { |r| r == :refused }
read_timeouts = replies.count { |r| r == :read_timeout }

# get pids from replies, generate uniq array
t = replies.map { |body| body[/\d+\z/] }
t.uniq!; t.compact!
qty_pids = t.length

msg = "#{responses} responses, #{qty_pids} uniq pids"

assert_equal 25, responses, msg
assert_operator qty_pids, :>, 2, msg

msg = "#{responses} responses, #{resets} resets, #{refused} refused, #{read_timeouts} read timeouts"

assert_equal 0, refused, msg

assert_equal 0, resets, msg

assert_equal 0, read_timeouts, msg
ensure
unless passed?
$debugging_info << "#{full_name}\n #{msg}\n#{replies.inspect}\n"
end
end

def worker_respawn(phase = 1, size = workers, config = 'test/config/worker_shutdown_timeout_2.rb')
threads = []

Expand Down Expand Up @@ -809,4 +761,5 @@ def thread_run_step(replies, delay, sleep_time, step, mutex, refused, unix: fals
mutex.synchronize { replies[step] = :read_timeout }
end
end

end if ::Process.respond_to?(:fork)
8 changes: 5 additions & 3 deletions test/test_integration_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ def workers ; 0 ; end

def test_hot_restart_does_not_drop_connections_threads
ttl_reqs = Puma.windows? ? 500 : 1_000
hot_restart_does_not_drop_connections num_threads: 5, total_requests: ttl_reqs
restart_does_not_drop_connections num_threads: 5, total_requests: ttl_reqs,
signal: :USR2
end

def test_hot_restart_does_not_drop_connections
if Puma.windows?
hot_restart_does_not_drop_connections total_requests: 300
restart_does_not_drop_connections total_requests: 300,
signal: :USR2
else
hot_restart_does_not_drop_connections
restart_does_not_drop_connections signal: :USR2
end
end

Expand Down

0 comments on commit 2c9aafe

Please sign in to comment.