-
Notifications
You must be signed in to change notification settings - Fork 182
/
failure_scenarios_test.exs
128 lines (97 loc) · 3.11 KB
/
failure_scenarios_test.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
defmodule FailureScenariosTest do
use ExUnit.Case, async: false
import ExqTestUtil
@moduletag :failure_scenarios
defmodule PerformWorker do
def perform do
send(:exqtest, {:worked})
end
end
defmodule SleepWorker do
def perform do
send(:exqtest, {:worked})
Process.register(self(), :sleep_worker)
:timer.sleep(:infinity)
end
end
setup do
TestRedis.setup()
Application.start(:ranch)
on_exit(fn ->
wait()
TestRedis.teardown()
end)
:ok
end
test "handle Redis connection lost on manager" do
conn = FlakyConnection.start(String.to_charlist(redis_host()), redis_port())
{:ok, _} = Exq.start_link(port: conn.port)
wait_long()
# Stop Redis and wait for a bit
FlakyConnection.stop(conn)
# Not ideal - but seems to be min time for manager to die past supervision
:timer.sleep(5100)
# Restart Flakey connection manually, things should be back to normal
{:ok, agent} = Agent.start_link(fn -> [] end)
{:ok, _} =
:ranch.start_listener(
conn.ref,
100,
:ranch_tcp,
[port: conn.port],
FlakyConnectionHandler,
['127.0.0.1', redis_port(), agent]
)
:timer.sleep(2000)
assert_exq_up(Exq)
Exq.stop(Exq)
end
test "handle Redis connection lost on enqueue" do
conn = FlakyConnection.start(String.to_charlist(redis_host()), redis_port())
# Start Exq but don't listen to any queues
{:ok, _} = Exq.start_link(port: conn.port)
wait_long()
# Stop Redis
FlakyConnection.stop(conn)
wait_long()
# enqueue with redis stopped
enq_result = Exq.enqueue(Exq, "default", "FakeWorker", [])
assert enq_result == {:error, %Redix.ConnectionError{reason: :closed}}
enq_result = Exq.enqueue_at(Exq, "default", DateTime.utc_now(), ExqTest.PerformWorker, [])
assert enq_result == {:error, %Redix.ConnectionError{reason: :closed}}
# Starting Redis again and things should be back to normal
wait_long()
# Restart Flakey connection manually
{:ok, agent} = Agent.start_link(fn -> [] end)
{:ok, _} =
:ranch.start_listener(
conn.ref,
100,
:ranch_tcp,
[port: conn.port],
FlakyConnectionHandler,
['127.0.0.1', redis_port(), agent]
)
:timer.sleep(2000)
assert_exq_up(Exq)
Exq.stop(Exq)
end
test "handle supervisor tree shutdown properly" do
{:ok, sup} = Exq.start_link()
assert Process.alive?(sup) == true
# Create worker that sleeps infinitely with registered process
{:ok, _jid} = Exq.enqueue(Exq, "default", FailureScenariosTest.SleepWorker, [])
Process.register(self(), :exqtest)
# wait until worker started
assert_receive {:worked}, 500
stop_process(sup)
# Takes 5500 for worker to stop
:timer.sleep(5500)
# Make sure everything is shut down properly
assert Process.alive?(sup) == false
assert Process.whereis(Exq.Manager.Server) == nil
assert Process.whereis(Exq.Stats.Server) == nil
assert Process.whereis(Exq.Scheduler.Server) == nil
assert Process.whereis(:sleep_worker) == nil
end
end