changed README.md
 
@@ -1,11 +1,11 @@
1
1
# Swarm
2
2
3
- [![Hex.pm Version](http://img.shields.io/hexpm/v/swarm.svg?style=flat)](https://hex.pm/packages/swarm)
3
+ [![Hex.pm Version](http://img.shields.io/hexpm/v/swarm.svg?style=flat)](https://hex.pm/packages/swarm) [![Build Status](https://travis-ci.com/bitwalker/swarm.svg?branch=master)](https://travis-ci.com/bitwalker/swarm)
4
4
5
5
**NOTE**: If you are upgrading from 1.0, be aware that the autoclustering functionality has been extracted
6
6
to its own package, which you will need to depend on if you use that feature.
7
- The package is [libcluster](http://github.com/bitwalker/libcluster) and is available on
8
- [Hex](https://hex.pm/packages/libcluster). Please be sure to read over the README to make sure your
7
+ The package is [libcluster](http://github.com/bitwalker/libcluster) and is available on
8
+ [Hex](https://hex.pm/packages/libcluster). Please be sure to read over the README to make sure your
9
9
config is properly updated.
10
10
11
11
Swarm is a global distributed registry, offering a feature set similar to that of `gproc`,
 
@@ -90,7 +90,7 @@ Swarm provides two strategies for you to use:
90
90
separate partitions, it's generally not an issue if those events are for the same device. However
91
91
this is clearly not ideal in all situations. Swarm also aims to be fast, so registrations and
92
92
lookups must be as low latency as possible, even when the number of processes in the registry grows
93
- very large. This is acheived without consensus by using a consistent hash of the name which
93
+ very large. This is achieved without consensus by using a consistent hash of the name which
94
94
deterministically defines which node a process belongs on, and all requests to start a process on
95
95
that node will be serialized through that node to prevent conflicts.
96
96
 
@@ -324,6 +324,26 @@ end
324
324
325
325
MIT
326
326
327
+ ## Testing
328
+
329
+ `mix test` runs a variety of tests, most of them use a cluster of
330
+ Elixir nodes to test the tracker and the registry. If you want more
331
+ verbose output during the tests, run them like this:
332
+
333
+ # SWARM_DEBUG=true mix test
334
+
335
+ This sets the log level to `:debug`, runs ExUnit with `--trace`, and
336
+ enables GenServer tracing on the Tracker processes.
337
+
338
+ ### Executing the tests locally
339
+ In order to execute the tests locally you'll need to have
340
+ [Erlang Port Mapper Daemon](http://erlang.org/doc/man/epmd.html) running.
341
+
342
+ If you don't have `epmd` running you can start it using the following command:
343
+
344
+ epmd -daemon
345
+
346
+
327
347
## TODO
328
348
329
349
- automated testing (some are present)
changed hex_metadata.config
 
@@ -4,27 +4,27 @@
4
4
<<"A fast, multi-master, distributed global process registry, with automatic distribution of worker processes.">>}.
5
5
{<<"elixir">>,<<"~> 1.3">>}.
6
6
{<<"files">>,
7
- [<<"lib">>,<<"lib/swarm">>,<<"lib/swarm.ex">>,<<"lib/swarm/app.ex">>,
8
- <<"lib/swarm/distribution">>,<<"lib/swarm/distribution/ring.ex">>,
7
+ [<<"lib">>,<<"lib/swarm">>,<<"lib/swarm/registry.ex">>,
8
+ <<"lib/swarm/logger.ex">>,<<"lib/swarm/distribution">>,
9
+ <<"lib/swarm/distribution/strategy.ex">>,
9
10
<<"lib/swarm/distribution/static_quorum_ring.ex">>,
10
- <<"lib/swarm/distribution/strategy.ex">>,<<"lib/swarm/logger.ex">>,
11
- <<"lib/swarm/registry.ex">>,<<"lib/swarm/tracker">>,
11
+ <<"lib/swarm/distribution/ring.ex">>,<<"lib/swarm/tracker">>,
12
12
<<"lib/swarm/tracker/crdt.ex">>,<<"lib/swarm/tracker/entry.ex">>,
13
- <<"lib/swarm/tracker/tracker.ex">>,<<"src">>,<<"src/swarm.erl">>,
14
- <<"mix.exs">>,<<"README.md">>,<<"LICENSE.md">>]}.
13
+ <<"lib/swarm/tracker/tracker.ex">>,<<"lib/swarm/app.ex">>,
14
+ <<"lib/swarm.ex">>,<<"src">>,<<"src/swarm.erl">>,<<"mix.exs">>,
15
+ <<"README.md">>,<<"LICENSE.md">>]}.
15
16
{<<"licenses">>,[<<"MIT">>]}.
16
17
{<<"links">>,[{<<"Github">>,<<"https://github.com/bitwalker/swarm">>}]}.
17
- {<<"maintainers">>,[<<"Paul Schoenfelder">>]}.
18
18
{<<"name">>,<<"swarm">>}.
19
19
{<<"requirements">>,
20
- [[{<<"app">>,<<"gen_state_machine">>},
21
- {<<"name">>,<<"gen_state_machine">>},
22
- {<<"optional">>,false},
23
- {<<"repository">>,<<"hexpm">>},
24
- {<<"requirement">>,<<"~> 2.0">>}],
25
- [{<<"app">>,<<"libring">>},
20
+ [[{<<"app">>,<<"libring">>},
26
21
{<<"name">>,<<"libring">>},
27
22
{<<"optional">>,false},
28
23
{<<"repository">>,<<"hexpm">>},
29
- {<<"requirement">>,<<"~> 1.0">>}]]}.
30
- {<<"version">>,<<"3.3.1">>}.
24
+ {<<"requirement">>,<<"~> 1.0">>}],
25
+ [{<<"app">>,<<"gen_state_machine">>},
26
+ {<<"name">>,<<"gen_state_machine">>},
27
+ {<<"optional">>,false},
28
+ {<<"repository">>,<<"hexpm">>},
29
+ {<<"requirement">>,<<"~> 2.0">>}]]}.
30
+ {<<"version">>,<<"3.4.0">>}.
changed lib/swarm.ex
 
@@ -47,10 +47,22 @@ defmodule Swarm do
47
47
The default value is `:infinity` to block indefinitely.
48
48
"""
49
49
@spec register_name(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term}
50
- @spec register_name(term, atom(), atom(), [term], non_neg_integer() | :infinity) :: {:ok, pid} | {:error, term}
50
+ @spec register_name(term, atom(), atom(), [term], non_neg_integer() | :infinity) ::
51
+ {:ok, pid} | {:error, term}
51
52
def register_name(name, m, f, a, timeout \\ :infinity)
52
53
def register_name(name, m, f, a, timeout), do: Swarm.Registry.register(name, m, f, a, timeout)
53
54
55
+ @doc """
56
+ Either finds the named process in the swarm or registers it using the register function.
57
+ """
58
+ @spec whereis_or_register_name(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term}
59
+ @spec whereis_or_register_name(term, atom(), atom(), [term], non_neg_integer() | :infinity) ::
60
+ {:ok, pid} | {:error, term}
61
+ def whereis_or_register_name(name, m, f, a, timeout \\ :infinity)
62
+
63
+ def whereis_or_register_name(name, m, f, a, timeout),
64
+ do: Swarm.Registry.whereis_or_register(name, m, f, a, timeout)
65
+
54
66
@doc """
55
67
Unregisters the given name from the registry.
56
68
"""
changed lib/swarm/app.ex
 
@@ -10,8 +10,9 @@ defmodule Swarm.App do
10
10
children = [
11
11
supervisor(Task.Supervisor, [[name: Swarm.TaskSupervisor]]),
12
12
worker(Swarm.Registry, []),
13
- worker(Swarm.Tracker, []),
13
+ worker(Swarm.Tracker, [])
14
14
]
15
+
15
16
supervise(children, strategy: :one_for_one)
16
17
end
17
18
end
changed lib/swarm/distribution/ring.ex
 
@@ -2,10 +2,10 @@ defmodule Swarm.Distribution.Ring do
2
2
@moduledoc false
3
3
use Swarm.Distribution.Strategy
4
4
5
- def create(), do: HashRing.new()
6
- def add_node(ring, node), do: HashRing.add_node(ring, node)
5
+ def create(), do: HashRing.new()
6
+ def add_node(ring, node), do: HashRing.add_node(ring, node)
7
7
def add_node(ring, node, weight), do: HashRing.add_node(ring, node, weight)
8
- def add_nodes(ring, nodes), do: HashRing.add_nodes(ring, nodes)
9
- def remove_node(ring, node), do: HashRing.remove_node(ring, node)
10
- def key_to_node(ring, key), do: HashRing.key_to_node(ring, key)
8
+ def add_nodes(ring, nodes), do: HashRing.add_nodes(ring, nodes)
9
+ def remove_node(ring, node), do: HashRing.remove_node(ring, node)
10
+ def key_to_node(ring, key), do: HashRing.key_to_node(ring, key)
11
11
end
changed lib/swarm/distribution/static_quorum_ring.ex
 
@@ -56,33 +56,25 @@ defmodule Swarm.Distribution.StaticQuorumRing do
56
56
57
57
def create do
58
58
%StaticQuorumRing{
59
- static_quorum_size: Application.get_env(:swarm, :static_quorum_size, 2),
60
- ring: HashRing.new(),
59
+ static_quorum_size: static_quorum_size(),
60
+ ring: HashRing.new()
61
61
}
62
62
end
63
63
64
64
def add_node(quorum, node) do
65
- %StaticQuorumRing{quorum |
66
- ring: HashRing.add_node(quorum.ring, node),
67
- }
65
+ %StaticQuorumRing{quorum | ring: HashRing.add_node(quorum.ring, node)}
68
66
end
69
67
70
68
def add_node(quorum, node, weight) do
71
- %StaticQuorumRing{quorum |
72
- ring: HashRing.add_node(quorum.ring, node, weight),
73
- }
69
+ %StaticQuorumRing{quorum | ring: HashRing.add_node(quorum.ring, node, weight)}
74
70
end
75
71
76
72
def add_nodes(quorum, nodes) do
77
- %StaticQuorumRing{quorum |
78
- ring: HashRing.add_nodes(quorum.ring, nodes),
79
- }
73
+ %StaticQuorumRing{quorum | ring: HashRing.add_nodes(quorum.ring, nodes)}
80
74
end
81
75
82
76
def remove_node(quorum, node) do
83
- %StaticQuorumRing{quorum |
84
- ring: HashRing.remove_node(quorum.ring, node),
85
- }
77
+ %StaticQuorumRing{quorum | ring: HashRing.remove_node(quorum.ring, node)}
86
78
end
87
79
88
80
@doc """
 
@@ -96,4 +88,26 @@ defmodule Swarm.Distribution.StaticQuorumRing do
96
88
_ -> HashRing.key_to_node(ring, key)
97
89
end
98
90
end
91
+
92
+ defp static_quorum_size() do
93
+ Application.get_env(:swarm, :static_quorum_size, 2)
94
+ |> static_quorum_size()
95
+ end
96
+
97
+ defp static_quorum_size(nil), do: static_quorum_size(2)
98
+
99
+ defp static_quorum_size(binary) when is_binary(binary) do
100
+ binary
101
+ |> Integer.parse()
102
+ |> convert_to_integer()
103
+ |> static_quorum_size()
104
+ end
105
+
106
+ defp static_quorum_size(size) when is_integer(size) and size > 0, do: size
107
+
108
+ defp static_quorum_size(_size),
109
+ do: raise("config :static_quorum_size should be a positive integer")
110
+
111
+ defp convert_to_integer({integer, _}) when is_integer(integer), do: integer
112
+ defp convert_to_integer(other), do: other
99
113
end
changed lib/swarm/distribution/strategy.ex
 
@@ -21,7 +21,7 @@ defmodule Swarm.Distribution.Strategy do
21
21
end
22
22
end
23
23
24
- @type reason :: String.t
24
+ @type reason :: String.t()
25
25
@type strategy :: term
26
26
@type weight :: pos_integer
27
27
@type nodelist :: [node() | {node(), weight}]
changed lib/swarm/logger.ex
 
@@ -5,24 +5,24 @@ defmodule Swarm.Logger do
5
5
@doc """
6
6
Log a debugging message
7
7
"""
8
- @spec debug(String.t) :: :ok
9
- def debug(message), do: Logger.debug("[swarm on #{Node.self}] #{message}")
8
+ @spec debug(String.t()) :: :ok
9
+ def debug(message), do: Logger.debug("[swarm on #{Node.self()}] #{message}")
10
10
11
11
@doc """
12
12
Log a warning message
13
13
"""
14
- @spec warn(String.t) :: :ok
15
- def warn(message), do: Logger.warn("[swarm on #{Node.self}] #{message}")
14
+ @spec warn(String.t()) :: :ok
15
+ def warn(message), do: Logger.warn("[swarm on #{Node.self()}] #{message}")
16
16
17
17
@doc """
18
18
Log an info message
19
19
"""
20
- @spec info(String.t) :: :ok
21
- def info(message), do: Logger.info("[swarm on #{Node.self}] #{message}")
20
+ @spec info(String.t()) :: :ok
21
+ def info(message), do: Logger.info("[swarm on #{Node.self()}] #{message}")
22
22
23
23
@doc """
24
24
Log an error message
25
25
"""
26
- @spec error(String.t) :: :ok
27
- def error(message), do: Logger.error("[swarm on #{Node.self}] #{message}")
26
+ @spec error(String.t()) :: :ok
27
+ def error(message), do: Logger.error("[swarm on #{Node.self()}] #{message}")
28
28
end
changed lib/swarm/registry.ex
 
@@ -24,11 +24,33 @@ defmodule Swarm.Registry do
24
24
case get_by_name(name) do
25
25
:undefined ->
26
26
Tracker.whereis(name)
27
+
27
28
entry(pid: pid) when is_pid(pid) ->
28
29
pid
29
30
end
30
31
end
31
32
33
+ @spec whereis_or_register(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term}
34
+ def whereis_or_register(name, m, f, a, timeout \\ :infinity)
35
+
36
+ @spec whereis_or_register(term, atom(), atom(), [term], non_neg_integer() | :infinity) ::
37
+ {:ok, pid} | {:error, term}
38
+ def whereis_or_register(name, module, fun, args, timeout) do
39
+ with :undefined <- whereis(name),
40
+ {:ok, pid} <- register(name, module, fun, args, timeout) do
41
+ {:ok, pid}
42
+ else
43
+ pid when is_pid(pid) ->
44
+ {:ok, pid}
45
+
46
+ {:error, {:already_registered, pid}} ->
47
+ {:ok, pid}
48
+
49
+ {:error, _} = err ->
50
+ err
51
+ end
52
+ end
53
+
32
54
@spec join(term, pid) :: :ok
33
55
def join(group, pid), do: Tracker.add_meta(group, true, pid)
34
56
 
@@ -37,9 +59,12 @@ defmodule Swarm.Registry do
37
59
38
60
@spec members(group :: term) :: [pid]
39
61
def members(group) do
40
- :ets.select(@table_name, [{entry(name: :'$1', pid: :'$2', ref: :'$3', meta: %{group => :'$4'}, clock: :'$5'), [], [:'$_']}])
62
+ :ets.select(@table_name, [
63
+ {entry(name: :"$1", pid: :"$2", ref: :"$3", meta: %{group => :"$4"}, clock: :"$5"), [],
64
+ [:"$_"]}
65
+ ])
41
66
|> Enum.map(fn entry(pid: pid) -> pid end)
42
- |> Enum.uniq
67
+ |> Enum.uniq()
43
68
end
44
69
45
70
@spec registered() :: [{name :: term, pid}]
 
@@ -64,7 +89,9 @@ defmodule Swarm.Registry do
64
89
@spec send(name :: term, msg :: term) :: :ok
65
90
def send(name, msg) do
66
91
case whereis(name) do
67
- :undefined -> :ok
92
+ :undefined ->
93
+ :ok
94
+
68
95
pid when is_pid(pid) ->
69
96
Kernel.send(pid, msg)
70
97
end
 
@@ -78,7 +105,7 @@ defmodule Swarm.Registry do
78
105
|> Enum.map(fn entry(name: name, pid: pid) -> {name, pid} end)
79
106
end
80
107
81
- @spec snapshot() :: [Entry.entry]
108
+ @spec snapshot() :: [Entry.entry()]
82
109
def snapshot() do
83
110
:ets.tab2list(@table_name)
84
111
end
 
@@ -86,7 +113,7 @@ defmodule Swarm.Registry do
86
113
@doc """
87
114
Inserts a new registration, and returns true if successful, or false if not
88
115
"""
89
- @spec new(Entry.entry) :: boolean
116
+ @spec new(Entry.entry()) :: boolean
90
117
def new(entry() = reg) do
91
118
:ets.insert_new(@table_name, reg)
92
119
end
 
@@ -94,12 +121,12 @@ defmodule Swarm.Registry do
94
121
@doc """
95
122
Like `new/1`, but raises if the insertion fails.
96
123
"""
97
- @spec new!(Entry.entry) :: true | no_return
124
+ @spec new!(Entry.entry()) :: true | no_return
98
125
def new!(entry() = reg) do
99
126
true = :ets.insert_new(@table_name, reg)
100
127
end
101
128
102
- @spec remove(Entry.entry) :: true
129
+ @spec remove(Entry.entry()) :: true
103
130
def remove(entry() = reg) do
104
131
:ets.delete_object(@table_name, reg)
105
132
end
 
@@ -109,69 +136,85 @@ defmodule Swarm.Registry do
109
136
case get_by_pid(pid) do
110
137
:undefined ->
111
138
true
139
+
112
140
entries when is_list(entries) ->
113
141
Enum.each(entries, &:ets.delete_object(@table_name, &1))
114
142
true
115
143
end
116
144
end
117
145
118
- @spec get_by_name(term()) :: :undefined | Entry.entry
146
+ @spec get_by_name(term()) :: :undefined | Entry.entry()
119
147
def get_by_name(name) do
120
148
case :ets.lookup(@table_name, name) do
121
- [] -> :undefined
149
+ [] -> :undefined
122
150
[obj] -> obj
123
151
end
124
152
end
125
153
126
- @spec get_by_pid(pid) :: :undefined | [Entry.entry]
154
+ @spec get_by_pid(pid) :: :undefined | [Entry.entry()]
127
155
def get_by_pid(pid) do
128
- case :ets.match_object(@table_name, entry(name: :'$1', pid: pid, ref: :'$2', meta: :'$3', clock: :'$4')) do
156
+ case :ets.match_object(
157
+ @table_name,
158
+ entry(name: :"$1", pid: pid, ref: :"$2", meta: :"$3", clock: :"$4")
159
+ ) do
129
160
[] -> :undefined
130
161
list when is_list(list) -> list
131
162
end
132
163
end
133
164
134
- @spec get_by_pid_and_name(pid(), term()) :: :undefined | Entry.entry
165
+ @spec get_by_pid_and_name(pid(), term()) :: :undefined | Entry.entry()
135
166
def get_by_pid_and_name(pid, name) do
136
- case :ets.match_object(@table_name, entry(name: name, pid: pid, ref: :'$1', meta: :'$2', clock: :'$3')) do
167
+ case :ets.match_object(
168
+ @table_name,
169
+ entry(name: name, pid: pid, ref: :"$1", meta: :"$2", clock: :"$3")
170
+ ) do
137
171
[] -> :undefined
138
172
[obj] -> obj
139
173
end
140
174
end
141
175
142
- @spec get_by_ref(reference()) :: :undefined | Entry.entry
176
+ @spec get_by_ref(reference()) :: :undefined | Entry.entry()
143
177
def get_by_ref(ref) do
144
- case :ets.match_object(@table_name, entry(name: :'$1', pid: :'$2', ref: ref, meta: :'$3', clock: :'$4')) do
145
- [] -> :undefined
178
+ case :ets.match_object(
179
+ @table_name,
180
+ entry(name: :"$1", pid: :"$2", ref: ref, meta: :"$3", clock: :"$4")
181
+ ) do
182
+ [] -> :undefined
146
183
[obj] -> obj
147
184
end
148
185
end
149
186
150
- @spec get_by_meta(term()) :: :undefined | [Entry.entry]
187
+ @spec get_by_meta(term()) :: :undefined | [Entry.entry()]
151
188
def get_by_meta(key) do
152
- case :ets.match_object(@table_name, entry(name: :'$1', pid: :'$2', ref: :'$3', meta: %{key => :'$4'}, clock: :'$5')) do
153
- [] -> :undefined
189
+ case :ets.match_object(
190
+ @table_name,
191
+ entry(name: :"$1", pid: :"$2", ref: :"$3", meta: %{key => :"$4"}, clock: :"$5")
192
+ ) do
193
+ [] -> :undefined
154
194
list when is_list(list) -> list
155
195
end
156
196
end
157
197
158
- @spec get_by_meta(term(), term()) :: :undefined | [Entry.entry]
198
+ @spec get_by_meta(term(), term()) :: :undefined | [Entry.entry()]
159
199
def get_by_meta(key, value) do
160
- case :ets.match_object(@table_name, entry(name: :'$1', pid: :'$2', ref: :'$3', meta: %{key => value}, clock: :'$4')) do
161
- [] -> :undefined
200
+ case :ets.match_object(
201
+ @table_name,
202
+ entry(name: :"$1", pid: :"$2", ref: :"$3", meta: %{key => value}, clock: :"$4")
203
+ ) do
204
+ [] -> :undefined
162
205
list when is_list(list) -> list
163
206
end
164
207
end
165
208
166
- @spec reduce(term(), (Entry.entry, term() -> term())) :: term()
209
+ @spec reduce(term(), (Entry.entry(), term() -> term())) :: term()
167
210
def reduce(acc, fun) when is_function(fun, 2) do
168
211
:ets.foldl(fun, acc, @table_name)
169
212
end
170
213
171
-
172
- @spec update(term(), Keyword.t) :: boolean
214
+ @spec update(term(), Keyword.t()) :: boolean
173
215
defmacro update(key, updates) do
174
- fields = Enum.map(updates, fn {k, v} -> {Entry.index(k)+1, v} end)
216
+ fields = Enum.map(updates, fn {k, v} -> {Entry.index(k) + 1, v} end)
217
+
175
218
quote bind_quoted: [table_name: @table_name, key: key, fields: fields] do
176
219
:ets.update_element(table_name, key, fields)
177
220
end
 
@@ -180,15 +223,19 @@ defmodule Swarm.Registry do
180
223
## GenServer Implementation
181
224
182
225
def start_link(), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
226
+
183
227
def init(_) do
184
228
# start ETS table for registry
185
- t = :ets.new(@table_name, [
186
- :set,
187
- :named_table,
188
- :public,
189
- keypos: 2,
190
- read_concurrency: true,
191
- write_concurrency: true])
229
+ t =
230
+ :ets.new(@table_name, [
231
+ :set,
232
+ :named_table,
233
+ :public,
234
+ keypos: 2,
235
+ read_concurrency: true,
236
+ write_concurrency: true
237
+ ])
238
+
192
239
{:ok, t}
193
240
end
194
241
end
changed lib/swarm/tracker/crdt.ex
 
@@ -6,33 +6,32 @@ defmodule Swarm.IntervalTreeClock do
6
6
"""
7
7
use Bitwise
8
8
import Kernel, except: [max: 2, min: 2]
9
- @compile {:inline, [min: 2, max: 2,
10
- drop: 2, lift: 2,
11
- base: 1, height: 1]}
9
+ @compile {:inline, [min: 2, max: 2, drop: 2, lift: 2, base: 1, height: 1]}
12
10
13
11
@type int_tuple :: {non_neg_integer, non_neg_integer}
14
- @type t :: int_tuple |
15
- {int_tuple, non_neg_integer} |
16
- {non_neg_integer, int_tuple} |
17
- {int_tuple, int_tuple}
12
+ @type t ::
13
+ int_tuple
14
+ | {int_tuple, non_neg_integer}
15
+ | {non_neg_integer, int_tuple}
16
+ | {int_tuple, int_tuple}
18
17
19
18
@doc """
20
19
Creates a new interval tree clock
21
20
"""
22
- @spec seed() :: __MODULE__.t
21
+ @spec seed() :: __MODULE__.t()
23
22
def seed(), do: {1, 0}
24
23
25
24
@doc """
26
25
Joins two forked clocks into a single clock with both causal histories,
27
26
used for retiring a replica.
28
27
"""
29
- @spec join(__MODULE__.t, __MODULE__.t) :: __MODULE__.t
28
+ @spec join(__MODULE__.t(), __MODULE__.t()) :: __MODULE__.t()
30
29
def join({i1, e1}, {i2, e2}), do: {sum(i1, i2), join_ev(e1, e2)}
31
30
32
31
@doc """
33
32
Forks a clock containing a shared causal history, used for creating new replicas.
34
33
"""
35
- @spec fork(__MODULE__.t) :: __MODULE__.t
34
+ @spec fork(__MODULE__.t()) :: __MODULE__.t()
36
35
def fork({i, e}) do
37
36
{i1, i2} = split(i)
38
37
{{i1, e}, {i2, e}}
 
@@ -42,18 +41,19 @@ defmodule Swarm.IntervalTreeClock do
42
41
Gets a snapshot of a clock without its identity. Useful for sending the clock with messages,
43
42
but cannot be used to track events.
44
43
"""
45
- @spec peek(__MODULE__.t) :: __MODULE__.t
46
- def peek({i, e}), do: {{0, e}, {i, e}}
44
+ @spec peek(__MODULE__.t()) :: __MODULE__.t()
45
+ def peek({_i, e}), do: {0, e}
47
46
48
47
@doc """
49
48
Records an event on the given clock
50
49
"""
51
- @spec event(__MODULE__.t) :: __MODULE__.t
50
+ @spec event(__MODULE__.t()) :: __MODULE__.t()
52
51
def event({i, e}) do
53
52
case fill(i, e) do
54
53
^e ->
55
54
{_, e1} = grow(i, e)
56
55
{i, e1}
56
+
57
57
e1 ->
58
58
{i, e1}
59
59
end
 
@@ -64,7 +64,7 @@ defmodule Swarm.IntervalTreeClock do
64
64
If the left-hand clock is LEQ than the right-hand clock, and vice-versa, then they are
65
65
causally equivalent.
66
66
"""
67
- @spec leq(__MODULE__.t, __MODULE__.t) :: boolean
67
+ @spec leq(__MODULE__.t(), __MODULE__.t()) :: boolean
68
68
def leq({_, e1}, {_, e2}), do: leq_ev(e1, e2)
69
69
70
70
@doc """
 
@@ -74,32 +74,34 @@ defmodule Swarm.IntervalTreeClock do
74
74
If :gt is returned, the second clock is causally dominated by the first
75
75
If :concurrent is returned, the two clocks are concurrent (conflicting)
76
76
"""
77
- @spec compare(__MODULE__.t, __MODULE__.t) :: :lt | :gt | :eq | :concurrent
77
+ @spec compare(__MODULE__.t(), __MODULE__.t()) :: :lt | :gt | :eq | :concurrent
78
78
def compare(a, b) do
79
79
a_leq = leq(a, b)
80
80
b_leq = leq(b, a)
81
+
81
82
cond do
82
83
a_leq and b_leq -> :eq
83
- a_leq -> :lt
84
- b_leq -> :gt
85
- :else -> :concurrent
84
+ a_leq -> :lt
85
+ b_leq -> :gt
86
+ :else -> :concurrent
86
87
end
87
88
end
88
89
89
90
@doc """
90
91
Encodes the clock as a binary
91
92
"""
92
- @spec encode(__MODULE__.t) :: binary
93
+ @spec encode(__MODULE__.t()) :: binary
93
94
def encode({i, e}), do: :erlang.term_to_binary({i, e})
94
95
95
96
@doc """
96
97
Decodes the clock from a binary
97
98
"""
98
- @spec decode(binary) :: {:ok, __MODULE__.t} | {:error, {:invalid_clock, term}}
99
+ @spec decode(binary) :: {:ok, __MODULE__.t()} | {:error, {:invalid_clock, term}}
99
100
def decode(b) when is_binary(b) do
100
101
case :erlang.binary_to_term(b) do
101
102
{_i, _e} = clock ->
102
103
clock
104
+
103
105
other ->
104
106
{:error, {:invalid_clock, other}}
105
107
end
 
@@ -108,32 +110,31 @@ defmodule Swarm.IntervalTreeClock do
108
110
@doc """
109
111
Returns the length of the encoded binary representation of the clock
110
112
"""
111
- @spec len(__MODULE__.t) :: non_neg_integer
113
+ @spec len(__MODULE__.t()) :: non_neg_integer
112
114
def len(d), do: :erlang.size(encode(d))
113
115
114
116
## Private API
115
117
116
118
defp leq_ev({n1, l1, r1}, {n2, l2, r2}) do
117
- n1 <= n2 and
118
- leq_ev(lift(n1, l1), lift(n2, l2)) and
119
- leq_ev(lift(n1, r1), lift(n2, r2))
119
+ n1 <= n2 and leq_ev(lift(n1, l1), lift(n2, l2)) and leq_ev(lift(n1, r1), lift(n2, r2))
120
120
end
121
+
121
122
defp leq_ev({n1, l1, r1}, n2) do
122
- n1 <= n2 and
123
- leq_ev(lift(n1, l1), n2) and
124
- leq_ev(lift(n1, r1), n2)
123
+ n1 <= n2 and leq_ev(lift(n1, l1), n2) and leq_ev(lift(n1, r1), n2)
125
124
end
125
+
126
126
defp leq_ev(n1, {n2, _, _}), do: n1 <= n2
127
127
defp leq_ev(n1, n2), do: n1 <= n2
128
128
129
129
defp norm_id({0, 0}), do: 0
130
130
defp norm_id({1, 1}), do: 1
131
- defp norm_id(x), do: x
131
+ defp norm_id(x), do: x
132
132
133
133
defp norm_ev({n, m, m}) when is_integer(m), do: n + m
134
+
134
135
defp norm_ev({n, l, r}) do
135
136
m = min(base(l), base(r))
136
- {n+m, drop(m, l), drop(m, r)}
137
+ {n + m, drop(m, l), drop(m, r)}
137
138
end
138
139
139
140
defp sum(0, x), do: x
 
@@ -142,62 +143,75 @@ defmodule Swarm.IntervalTreeClock do
142
143
143
144
defp split(0), do: {0, 0}
144
145
defp split(1), do: {{1, 0}, {0, 1}}
146
+
145
147
defp split({0, i}) do
146
148
{i1, i2} = split(i)
147
149
{{0, i1}, {0, i2}}
148
150
end
151
+
149
152
defp split({i, 0}) do
150
153
{i1, i2} = split(i)
151
154
{{i1, 0}, {i2, 0}}
152
155
end
153
- defp split({i1, i2}), do: {{i1,0}, {0,i2}}
156
+
157
+ defp split({i1, i2}), do: {{i1, 0}, {0, i2}}
154
158
155
159
defp join_ev({n1, _, _} = e1, {n2, _, _} = e2) when n1 > n2, do: join_ev(e2, e1)
160
+
156
161
defp join_ev({n1, l1, r1}, {n2, l2, r2}) when n1 <= n2 do
157
162
d = n2 - n1
158
163
norm_ev({n1, join_ev(l1, lift(d, l2)), join_ev(r1, lift(d, r2))})
159
164
end
160
- defp join_ev(n1, {n2, l2, r2}), do: join_ev({n1,0,0}, {n2,l2,r2})
161
- defp join_ev({n1, l1, r1}, n2), do: join_ev({n1,l1,r1}, {n2,0,0})
165
+
166
+ defp join_ev(n1, {n2, l2, r2}), do: join_ev({n1, 0, 0}, {n2, l2, r2})
167
+ defp join_ev({n1, l1, r1}, n2), do: join_ev({n1, l1, r1}, {n2, 0, 0})
162
168
defp join_ev(n1, n2), do: max(n1, n2)
163
169
164
170
defp fill(0, e), do: e
165
- defp fill(1, {_,_,_}=e), do: height(e)
171
+ defp fill(1, {_, _, _} = e), do: height(e)
166
172
defp fill(_, n) when is_integer(n), do: n
173
+
167
174
defp fill({1, r}, {n, el, er}) do
168
175
er1 = fill(r, er)
169
176
d = max(height(el), base(er1))
170
177
norm_ev({n, d, er1})
171
178
end
179
+
172
180
defp fill({l, 1}, {n, el, er}) do
173
181
el1 = fill(l, el)
174
182
d = max(height(er), base(el1))
175
183
norm_ev({n, el1, d})
176
184
end
185
+
177
186
defp fill({l, r}, {n, el, er}) do
178
187
norm_ev({n, fill(l, el), fill(r, er)})
179
188
end
180
189
181
- defp grow(1, n) when is_integer(n), do: {0, n+1}
190
+ defp grow(1, n) when is_integer(n), do: {0, n + 1}
191
+
182
192
defp grow({0, i}, {n, l, r}) do
183
193
{h, e1} = grow(i, r)
184
- {h+1, {n, l, e1}}
194
+ {h + 1, {n, l, e1}}
185
195
end
196
+
186
197
defp grow({i, 0}, {n, l, r}) do
187
198
{h, e1} = grow(i, l)
188
- {h+1, {n, e1, r}}
199
+ {h + 1, {n, e1, r}}
189
200
end
201
+
190
202
defp grow({il, ir}, {n, l, r}) do
191
203
{hl, el} = grow(il, l)
192
204
{hr, er} = grow(ir, r)
205
+
193
206
cond do
194
- hl < hr -> {hl+1, {n, el, r}}
195
- :else -> {hr+1, {n, l, er}}
207
+ hl < hr -> {hl + 1, {n, el, r}}
208
+ :else -> {hr + 1, {n, l, er}}
196
209
end
197
210
end
211
+
198
212
defp grow(i, n) when is_integer(n) do
199
213
{h, e} = grow(i, {n, 0, 0})
200
- {h+100_000, e}
214
+ {h + 100_000, e}
201
215
end
202
216
203
217
defp height({n, l, r}), do: n + max(height(l), height(r))
 
@@ -206,10 +220,10 @@ defmodule Swarm.IntervalTreeClock do
206
220
defp base({n, _, _}), do: n
207
221
defp base(n), do: n
208
222
209
- defp lift(m, {n, l, r}), do: {n+m, l, r}
223
+ defp lift(m, {n, l, r}), do: {n + m, l, r}
210
224
defp lift(m, n), do: n + m
211
225
212
- defp drop(m, {n, l, r}) when m <= n, do: {n-m, l, r}
226
+ defp drop(m, {n, l, r}) when m <= n, do: {n - m, l, r}
213
227
defp drop(m, n) when m <= n, do: n - m
214
228
215
229
defp max(x, y) when x <= y, do: y
 
@@ -218,18 +232,18 @@ defmodule Swarm.IntervalTreeClock do
218
232
defp min(x, y) when x <= y, do: x
219
233
defp min(_, y), do: y
220
234
221
- def str({i, e}), do: List.to_string(List.flatten([List.flatten(stri(i)), List.flatten(stre(e))]))
235
+ def str({i, e}),
236
+ do: List.to_string(List.flatten([List.flatten(stri(i)), List.flatten(stre(e))]))
222
237
223
238
defp stri(0), do: '0'
224
239
defp stri(1), do: ''
225
- defp stri({0, i}), do: 'R'++stri(i)
226
- defp stri({i, 0}), do: 'L'++stri(i)
227
- defp stri({l, r}), do: ['(L'++stri(l), '+', 'R'++stri(r), ')']
240
+ defp stri({0, i}), do: 'R' ++ stri(i)
241
+ defp stri({i, 0}), do: 'L' ++ stri(i)
242
+ defp stri({l, r}), do: ['(L' ++ stri(l), '+', 'R' ++ stri(r), ')']
228
243
229
244
defp stre({n, l, 0}), do: [stre(n), 'L', stre(l)]
230
245
defp stre({n, 0, r}), do: [stre(n), 'R', stre(r)]
231
246
defp stre({n, l, r}), do: [stre(n), '(L', stre(l), '+R', stre(r), ')']
232
247
defp stre(n) when n > 0, do: :erlang.integer_to_list(n)
233
248
defp stre(_), do: ''
234
-
235
249
end
changed lib/swarm/tracker/entry.ex
 
@@ -5,9 +5,17 @@ defmodule Swarm.Entry do
5
5
@fields [name: nil, pid: nil, ref: nil, meta: %{}, clock: nil]
6
6
7
7
require Record
8
- Record.defrecord :entry, @fields
8
+ Record.defrecord(:entry, @fields)
9
9
10
- @type entry :: record(:entry, name: term, pid: pid, ref: reference, meta: nil | map, clock: nil | ITC.t)
10
+ @type entry ::
11
+ record(
12
+ :entry,
13
+ name: term,
14
+ pid: pid,
15
+ ref: reference,
16
+ meta: nil | map,
17
+ clock: nil | ITC.t()
18
+ )
11
19
12
20
def index(field) when is_atom(field) do
13
21
Record.__access__(:entry, @fields, field, Swarm.Entry)
changed lib/swarm/tracker/tracker.ex
 
@@ -22,27 +22,27 @@ defmodule Swarm.Tracker do
22
22
defmodule Tracking do
23
23
@moduledoc false
24
24
@type t :: %__MODULE__{
25
- name: term(),
26
- meta: %{mfa: {m :: atom(), f :: function(), a :: list()}},
27
- from: {pid, tag :: term},
28
- }
25
+ name: term(),
26
+ meta: %{mfa: {m :: atom(), f :: function(), a :: list()}},
27
+ from: {pid, tag :: term}
28
+ }
29
29
defstruct [:name, :meta, :from]
30
30
end
31
31
32
32
defmodule TrackerState do
33
33
@moduledoc false
34
34
@type t :: %__MODULE__{
35
- clock: nil | Swarm.IntervalTreeClock.t,
36
- strategy: Strategy.t,
37
- self: atom(),
38
- sync_node: nil | atom(),
39
- sync_ref: nil | reference(),
40
- pending_sync_reqs: [pid()],
41
- }
35
+ clock: nil | Swarm.IntervalTreeClock.t(),
36
+ strategy: Strategy.t(),
37
+ self: atom(),
38
+ sync_node: nil | atom(),
39
+ sync_ref: nil | reference(),
40
+ pending_sync_reqs: [pid()]
41
+ }
42
42
defstruct clock: nil,
43
43
nodes: [],
44
44
strategy: nil,
45
- self: :'nonode@nohost',
45
+ self: :nonode@nohost,
46
46
sync_node: nil,
47
47
sync_ref: nil,
48
48
pending_sync_reqs: []
 
@@ -56,6 +56,15 @@ defmodule Swarm.Tracker do
56
56
def whereis(name),
57
57
do: GenStateMachine.call(__MODULE__, {:whereis, name}, :infinity)
58
58
59
+ @doc """
60
+ Hand off all the processes running on the given worker to the remaining nodes in the cluster.
61
+ This can be used to gracefully shut down a node.
62
+ Note that if you don't shut down the node after the handoff a rebalance can lead to processes being scheduled on it again.
63
+ In other words the handoff doesn't blacklist the node for further rebalances.
64
+ """
65
+ def handoff(worker_name, state),
66
+ do: GenStateMachine.call(__MODULE__, {:handoff, worker_name, state}, :infinity)
67
+
59
68
@doc """
60
69
Tracks a process (pid) with the given name.
61
70
Tracking processes with this function will *not* restart the process when
 
@@ -99,24 +108,31 @@ defmodule Swarm.Tracker do
99
108
100
109
defmacrop debug(msg) do
101
110
{current_state, _arity} = __CALLER__.function
111
+
102
112
quote do
103
113
Swarm.Logger.debug("[tracker:#{unquote(current_state)}] #{unquote(msg)}")
104
114
end
105
115
end
116
+
106
117
defmacrop info(msg) do
107
118
{current_state, _arity} = __CALLER__.function
119
+
108
120
quote do
109
121
Swarm.Logger.info("[tracker:#{unquote(current_state)}] #{unquote(msg)}")
110
122
end
111
123
end
124
+
112
125
defmacrop warn(msg) do
113
126
{current_state, _arity} = __CALLER__.function
127
+
114
128
quote do
115
129
Swarm.Logger.warn("[tracker:#{unquote(current_state)}] #{unquote(msg)}")
116
130
end
117
131
end
132
+
118
133
defmacrop error(msg) do
119
134
{current_state, _arity} = __CALLER__.function
135
+
120
136
quote do
121
137
Swarm.Logger.error("[tracker:#{unquote(current_state)}] #{unquote(msg)}")
122
138
end
 
@@ -131,15 +147,17 @@ defmodule Swarm.Tracker do
131
147
Process.flag(:trap_exit, true)
132
148
# If this node is ignored, then make sure we ignore everyone else
133
149
# to prevent accidentally interfering with the cluster
134
- if ignore_node?(Node.self) do
150
+ if ignore_node?(Node.self()) do
135
151
Application.put_env(:swarm, :node_blacklist, [~r/^.+$/])
136
152
end
153
+
137
154
# Start monitoring nodes
138
- :ok = :net_kernel.monitor_nodes(true, [node_type: :all])
139
- info "started"
155
+ :ok = :net_kernel.monitor_nodes(true, node_type: :all)
156
+ info("started")
140
157
nodelist = Enum.reject(Node.list(:connected), &ignore_node?/1)
158
+
141
159
strategy =
142
- Node.self
160
+ Node.self()
143
161
|> Strategy.create()
144
162
|> Strategy.add_nodes(nodelist)
145
163
 
@@ -150,84 +168,107 @@ defmodule Swarm.Tracker do
150
168
timeout = Application.get_env(:swarm, :sync_nodes_timeout, @sync_nodes_timeout)
151
169
Process.send_after(self(), :cluster_join, timeout)
152
170
153
- state = %TrackerState{nodes: nodelist, strategy: strategy, self: node()}
171
+ state = %TrackerState{clock: Clock.seed(), nodes: nodelist, strategy: strategy, self: node()}
154
172
155
173
{:ok, :cluster_wait, state}
156
174
end
157
175
158
176
def cluster_wait(:info, {:nodeup, node, _}, %TrackerState{} = state) do
159
- new_state = case nodeup(state, node) do
160
- {:ok, new_state} -> new_state
161
- {:ok, new_state, _next_state} -> new_state
162
- end
177
+ new_state =
178
+ case nodeup(state, node) do
179
+ {:ok, new_state} -> new_state
180
+ {:ok, new_state, _next_state} -> new_state
181
+ end
182
+
163
183
{:keep_state, new_state}
164
184
end
185
+
165
186
def cluster_wait(:info, {:nodedown, node, _}, %TrackerState{} = state) do
166
- new_state = case nodedown(state, node) do
167
- {:ok, new_state} -> new_state
168
- {:ok, new_state, _next_state} -> new_state
169
- end
187
+ new_state =
188
+ case nodedown(state, node) do
189
+ {:ok, new_state} -> new_state
190
+ {:ok, new_state, _next_state} -> new_state
191
+ end
192
+
170
193
{:keep_state, new_state}
171
194
end
195
+
172
196
def cluster_wait(:info, :cluster_join, %TrackerState{nodes: []} = state) do
173
- info "joining cluster.."
174
- info "no connected nodes, proceeding without sync"
197
+ info("joining cluster..")
198
+ info("no connected nodes, proceeding without sync")
175
199
interval = Application.get_env(:swarm, :anti_entropy_interval, @default_anti_entropy_interval)
176
200
Process.send_after(self(), :anti_entropy, interval)
177
201
{:next_state, :tracking, %{state | clock: Clock.seed()}}
178
202
end
203
+
179
204
def cluster_wait(:info, :cluster_join, %TrackerState{nodes: nodes} = state) do
180
- info "joining cluster.."
181
- info "found connected nodes: #{inspect nodes}"
205
+ info("joining cluster..")
206
+ info("found connected nodes: #{inspect(nodes)}")
182
207
# Connect to a random node and sync registries,
183
208
# start anti-entropy, and start loop with forked clock of
184
209
# remote node
185
210
sync_node = Enum.random(nodes)
186
- info "selected sync node: #{sync_node}"
211
+ info("selected sync node: #{sync_node}")
187
212
# Send sync request
188
- clock = Clock.seed()
189
213
ref = Process.monitor({__MODULE__, sync_node})
190
- GenStateMachine.cast({__MODULE__, sync_node}, {:sync, self(), clock})
191
- {:next_state, :syncing, %{state | clock: clock, sync_node: sync_node, sync_ref: ref}}
214
+ GenStateMachine.cast({__MODULE__, sync_node}, {:sync, self(), state.clock})
215
+ {:next_state, :syncing, %{state | sync_node: sync_node, sync_ref: ref}}
192
216
end
193
- def cluster_wait(:cast, {:sync, from, rclock}, %TrackerState{nodes: [from_node]} = state) when node(from) == from_node do
194
- info "joining cluster.."
217
+
218
+ def cluster_wait(:cast, {:sync, from, rclock}, %TrackerState{nodes: [from_node]} = state)
219
+ when node(from) == from_node do
220
+ info("joining cluster..")
195
221
sync_node = node(from)
196
- info "syncing with #{sync_node}"
222
+ info("syncing with #{sync_node}")
197
223
ref = Process.monitor({__MODULE__, sync_node})
198
- GenStateMachine.cast(from, {:sync_recv, self(), rclock, Registry.snapshot()})
199
- {:next_state, :awaiting_sync_ack, %{state | clock: rclock, sync_node: sync_node, sync_ref: ref}}
224
+ {lclock, rclock} = Clock.fork(rclock)
225
+ debug("forking clock: #{inspect state.clock}, lclock: #{inspect lclock}, rclock: #{inspect rclock}")
226
+ GenStateMachine.cast(from, {:sync_recv, self(), rclock, get_registry_snapshot()})
227
+
228
+ {:next_state, :awaiting_sync_ack,
229
+ %{state | clock: lclock, sync_node: sync_node, sync_ref: ref}}
200
230
end
231
+
201
232
def cluster_wait(:cast, {:sync, from, _rclock}, %TrackerState{} = state) do
202
233
if ignore_node?(node(from)) do
203
234
GenStateMachine.cast(from, {:sync_err, :node_ignored})
204
235
:keep_state_and_data
205
236
else
206
- info "pending sync request from #{node(from)}"
207
- {:keep_state, %{state | pending_sync_reqs: [from|state.pending_sync_reqs]}}
237
+ info("pending sync request from #{node(from)}")
238
+ {:keep_state, %{state | pending_sync_reqs: [from | state.pending_sync_reqs]}}
208
239
end
209
240
end
241
+
210
242
def cluster_wait(_event_type, _event_data, _state) do
211
243
{:keep_state_and_data, :postpone}
212
244
end
213
245
214
246
def syncing(:info, {:nodeup, node, _}, %TrackerState{} = state) do
215
- new_state = case nodeup(state, node) do
216
- {:ok, new_state} -> new_state
217
- {:ok, new_state, _next_state} -> new_state
218
- end
247
+ new_state =
248
+ case nodeup(state, node) do
249
+ {:ok, new_state} -> new_state
250
+ {:ok, new_state, _next_state} -> new_state
251
+ end
252
+
219
253
{:keep_state, new_state}
220
254
end
221
- def syncing(:info, {:DOWN, ref, _type, _pid, _info}, %TrackerState{clock: clock, sync_ref: ref} = state) do
222
- info "the remote tracker we're syncing with has crashed, selecting a new one"
255
+
256
+ def syncing(
257
+ :info,
258
+ {:DOWN, ref, _type, _pid, _info},
259
+ %TrackerState{clock: clock, sync_ref: ref} = state
260
+ ) do
261
+ info("the remote tracker we're syncing with has crashed, selecting a new one")
262
+
223
263
case state.nodes -- [state.sync_node] do
224
264
[] ->
225
- info "no other available nodes, cancelling sync"
265
+ info("no other available nodes, cancelling sync")
226
266
new_state = %{state | sync_node: nil, sync_ref: nil}
227
267
{:next_state, :tracking, new_state}
268
+
228
269
new_nodes ->
229
270
new_sync_node = Enum.random(new_nodes)
230
- info "selected sync node: #{new_sync_node}"
271
+ info("selected sync node: #{new_sync_node}")
231
272
# Send sync request
232
273
ref = Process.monitor({__MODULE__, new_sync_node})
233
274
GenStateMachine.cast({__MODULE__, new_sync_node}, {:sync, self(), clock})
 
@@ -235,286 +276,375 @@ defmodule Swarm.Tracker do
235
276
{:keep_state, new_state}
236
277
end
237
278
end
238
- def syncing(:info, {:nodedown, node, _}, %TrackerState{strategy: strategy, clock: clock, nodes: nodes, sync_node: node} = state) do
239
- info "the selected sync node #{node} went down, selecting new node"
279
+
280
+ def syncing(
281
+ :info,
282
+ {:nodedown, node, _},
283
+ %TrackerState{strategy: strategy, clock: clock, nodes: nodes, sync_node: node} = state
284
+ ) do
285
+ info("the selected sync node #{node} went down, selecting new node")
240
286
Process.demonitor(state.sync_ref, [:flush])
287
+
241
288
case nodes -- [node] do
242
289
[] ->
243
290
# there are no other nodes to select, nothing to do
244
- info "no other available nodes, cancelling sync"
245
- new_state = %{state | nodes: [],
246
- strategy: Strategy.remove_node(strategy, node),
247
- sync_node: nil,
248
- sync_ref: nil}
291
+ info("no other available nodes, cancelling sync")
292
+
293
+ new_state = %{
294
+ state
295
+ | nodes: [],
296
+ strategy: Strategy.remove_node(strategy, node),
297
+ sync_node: nil,
298
+ sync_ref: nil
299
+ }
300
+
249
301
{:next_state, :tracking, new_state}
302
+
250
303
new_nodes ->
251
304
new_sync_node = Enum.random(new_nodes)
252
- info "selected sync node: #{new_sync_node}"
305
+ info("selected sync node: #{new_sync_node}")
253
306
# Send sync request
254
307
ref = Process.monitor({__MODULE__, new_sync_node})
255
308
GenStateMachine.cast({__MODULE__, new_sync_node}, {:sync, self(), clock})
256
- new_state = %{state | nodes: new_nodes,
257
- strategy: Strategy.remove_node(strategy, node),
258
- sync_node: new_sync_node,
259
- sync_ref: ref}
309
+
310
+ new_state = %{
311
+ state
312
+ | nodes: new_nodes,
313
+ strategy: Strategy.remove_node(strategy, node),
314
+ sync_node: new_sync_node,
315
+ sync_ref: ref
316
+ }
317
+
260
318
{:keep_state, new_state}
261
319
end
262
320
end
321
+
263
322
def syncing(:info, {:nodedown, node, _}, %TrackerState{} = state) do
264
- new_state = case nodedown(state, node) do
265
- {:ok, new_state} -> new_state
266
- {:ok, new_state, _next_state} -> new_state
267
- end
323
+ new_state =
324
+ case nodedown(state, node) do
325
+ {:ok, new_state} -> new_state
326
+ {:ok, new_state, _next_state} -> new_state
327
+ end
328
+
268
329
{:keep_state, new_state}
269
330
end
331
+
270
332
# Successful anti-entropy sync
271
- def syncing(:cast, {:sync_recv, from, sync_clock, registry}, %TrackerState{sync_node: sync_node} = state)
272
- when node(from) == sync_node do
273
- info "received registry from #{sync_node}, merging.."
274
- new_state = sync_registry(from, sync_clock, registry, state)
275
- # let remote node know we've got the registry
276
- GenStateMachine.cast(from, {:sync_ack, self(), new_state.clock, Registry.snapshot()})
277
- info "local synchronization with #{sync_node} complete!"
278
- resolve_pending_sync_requests(new_state)
333
+ def syncing(
334
+ :cast,
335
+ {:sync_recv, from, sync_clock, registry},
336
+ %TrackerState{sync_node: sync_node} = state
337
+ )
338
+ when node(from) == sync_node do
339
+ info("received registry from #{sync_node}, merging..")
340
+ new_state = sync_registry(from, sync_clock, registry, state)
341
+ # let remote node know we've got the registry
342
+ GenStateMachine.cast(from, {:sync_ack, self(), sync_clock, get_registry_snapshot()})
343
+ info("local synchronization with #{sync_node} complete!")
344
+ resolve_pending_sync_requests(%{new_state | clock: sync_clock})
279
345
end
280
- def syncing(:cast, {:sync_err, from}, %TrackerState{nodes: nodes, sync_node: sync_node} = state) when node(from) == sync_node do
346
+
347
+ def syncing(:cast, {:sync_err, from}, %TrackerState{nodes: nodes, sync_node: sync_node} = state)
348
+ when node(from) == sync_node do
281
349
Process.demonitor(state.sync_ref, [:flush])
350
+
282
351
cond do
283
352
# Something weird happened during sync, so try a different node,
284
353
# with this implementation, we *could* end up selecting the same node
285
354
# again, but that's fine as this is effectively a retry
286
355
length(nodes) > 0 ->
287
- warn "a problem occurred during sync, choosing a new node to sync with"
356
+ warn("a problem occurred during sync, choosing a new node to sync with")
288
357
# we need to choose a different node to sync with and try again
289
358
new_sync_node = Enum.random(nodes)
290
359
ref = Process.monitor({__MODULE__, new_sync_node})
291
360
GenStateMachine.cast({__MODULE__, new_sync_node}, {:sync, self(), state.clock})
292
361
{:keep_state, %{state | sync_node: new_sync_node, sync_ref: ref}}
362
+
293
363
# Something went wrong during sync, but there are no other nodes to sync with,
294
364
# not even the original sync node (which probably implies it shutdown or crashed),
295
365
# so we're the sync node now
296
366
:else ->
297
- warn "a problem occurred during sync, but no other available sync targets, becoming seed node"
367
+ warn(
368
+ "a problem occurred during sync, but no other available sync targets, becoming seed node"
369
+ )
370
+
298
371
{:next_state, :tracking, %{state | pending_sync_reqs: [], sync_node: nil, sync_ref: nil}}
299
372
end
300
373
end
301
- def syncing(:cast, {:sync, from, rclock}, %TrackerState{sync_node: sync_node} = state) when node(from) == sync_node do
374
+
375
+ def syncing(:cast, {:sync, from, rclock}, %TrackerState{sync_node: sync_node} = state)
376
+ when node(from) == sync_node do
302
377
# We're trying to sync with another node while it is trying to sync with us, deterministically
303
378
# choose the node which will coordinate the synchronization.
304
- local_node = Node.self
379
+ local_node = Node.self()
380
+
305
381
case Clock.compare(state.clock, rclock) do
306
382
:lt ->
307
383
# The local clock is dominated by the remote clock, so the remote node will begin the sync
308
- info "syncing from #{sync_node} based on tracker clock"
384
+ info("syncing from #{sync_node} based on tracker clock")
309
385
:keep_state_and_data
386
+
310
387
:gt ->
311
388
# The local clock dominates the remote clock, so the local node will begin the sync
312
- info "syncing to #{sync_node} based on tracker clock"
389
+ info("syncing to #{sync_node} based on tracker clock")
313
390
{lclock, rclock} = Clock.fork(state.clock)
314
- GenStateMachine.cast(from, {:sync_recv, self(), rclock, Registry.snapshot()})
391
+ debug("forking clock when local: #{inspect state.clock}, lclock: #{inspect lclock}, rclock: #{inspect rclock}")
392
+ GenStateMachine.cast(from, {:sync_recv, self(), rclock, get_registry_snapshot()})
315
393
{:next_state, :awaiting_sync_ack, %{state | clock: lclock}}
394
+
316
395
result when result in [:eq, :concurrent] and sync_node > local_node ->
317
396
# The remote node will begin the sync
318
- info "syncing from #{sync_node} based on node precedence"
397
+ info("syncing from #{sync_node} based on node precedence")
319
398
:keep_state_and_data
399
+
320
400
result when result in [:eq, :concurrent] ->
321
401
# The local node begins the sync
322
- info "syncing to #{sync_node} based on node precedence"
402
+ info("syncing to #{sync_node} based on node precedence")
323
403
{lclock, rclock} = Clock.fork(state.clock)
324
- GenStateMachine.cast(from, {:sync_recv, self(), rclock, Registry.snapshot()})
404
+ debug("forking clock when concurrent: #{inspect state.clock}, lclock: #{inspect lclock}, rclock: #{inspect rclock}")
405
+ GenStateMachine.cast(from, {:sync_recv, self(), rclock, get_registry_snapshot()})
325
406
{:next_state, :awaiting_sync_ack, %{state | clock: lclock}}
326
407
end
327
408
end
409
+
328
410
def syncing(:cast, {:sync, from, _rclock}, %TrackerState{} = state) do
329
411
if ignore_node?(node(from)) do
330
412
GenStateMachine.cast(from, {:sync_err, :node_ignored})
331
413
:keep_state_and_data
332
414
else
333
- info "pending sync request from #{node(from)}"
334
- new_pending_reqs = Enum.uniq([from|state.pending_sync_reqs])
415
+ info("pending sync request from #{node(from)}")
416
+ new_pending_reqs = Enum.uniq([from | state.pending_sync_reqs])
335
417
{:keep_state, %{state | pending_sync_reqs: new_pending_reqs}}
336
418
end
337
419
end
420
+
338
421
def syncing(_event_type, _event_data, _state) do
339
422
{:keep_state_and_data, :postpone}
340
423
end
341
424
342
- defp sync_registry(from, sync_clock, registry, %TrackerState{} = state) when is_pid(from) do
425
+ defp sync_registry(from, _sync_clock, registry, %TrackerState{} = state) when is_pid(from) do
343
426
sync_node = node(from)
344
427
# map over the registry and check that all local entries are correct
345
- Enum.reduce(registry, state, fn
346
- entry(name: rname, pid: rpid, meta: rmeta, clock: rclock) = rreg, %TrackerState{clock: clock} = state ->
428
+ Enum.each(registry, fn entry(name: rname, pid: rpid, meta: rmeta, clock: rclock) = rreg ->
347
429
case Registry.get_by_name(rname) do
348
430
:undefined ->
349
431
# missing local registration
350
- debug "local tracker is missing #{inspect rname}, adding to registry"
432
+ debug("local tracker is missing #{inspect(rname)}, adding to registry")
351
433
ref = Process.monitor(rpid)
352
- Registry.new!(entry(name: rname, pid: rpid, ref: ref, meta: rmeta, clock: rclock))
353
- %{state | clock: Clock.event(clock)}
354
- entry(pid: ^rpid, meta: ^rmeta, clock: ^rclock) ->
355
- # this entry matches, nothing to do
356
- state
357
- entry(pid: ^rpid, meta: ^rmeta, clock: _) ->
358
- # the clocks differ, but the data is identical, so let's update it so they're the same
359
- Registry.update(rname, clock: rclock)
360
- state
434
+ lclock = Clock.join(state.clock, rclock)
435
+ Registry.new!(entry(name: rname, pid: rpid, ref: ref, meta: rmeta, clock: lclock))
436
+
361
437
entry(pid: ^rpid, meta: lmeta, clock: lclock) ->
362
- # the metadata differs, we need to merge it
363
438
case Clock.compare(lclock, rclock) do
364
439
:lt ->
365
- # the remote clock dominates, so merge favoring data from the remote registry
366
- new_meta = Map.merge(lmeta, rmeta)
367
- Registry.update(rname, clock: Clock.event(rclock), meta: new_meta)
440
+ # the remote clock dominates, take remote data
441
+ lclock = Clock.join(lclock, rclock)
442
+ Registry.update(rname, meta: rmeta, clock: lclock)
443
+
444
+ debug(
445
+ "sync metadata for #{inspect(rpid)} (#{inspect(rmeta)}) is causally dominated by remote, updated registry..."
446
+ )
447
+
368
448
:gt ->
369
- # the local clock dominates, so merge favoring data from the local registry
370
- new_meta = Map.merge(rmeta, lmeta)
371
- Registry.update(rname, clock: Clock.event(lclock), meta: new_meta)
372
- cmp when cmp in [:eq, :concurrent] ->
373
- # the clocks are equivalent or concurrently modified, but the data is different, so check the registry clocks
374
- if cmp == :concurrent do
375
- warn "local and remote metadata for #{inspect rname} was concurrently modified"
376
- end
377
- case Clock.compare(clock, sync_clock) do
378
- :lt ->
379
- # merge favoring remote, use remote clock as new entry clock
380
- new_meta = Map.merge(lmeta, rmeta)
381
- Registry.update(rname, clock: sync_clock, meta: new_meta)
382
- :gt ->
383
- # merge favoring local, use local clock as new entry clock
384
- new_meta = Map.merge(rmeta, lmeta)
385
- Registry.update(rname, clock: clock, meta: new_meta)
386
- _ ->
387
- # we can't break the tie using the registry clock, so we'll break the tie using node priority
388
- new_meta =
389
- if Node.self > sync_node do
390
- Map.merge(rmeta, lmeta)
391
- else
392
- Map.merge(lmeta, rmeta)
393
- end
394
- Registry.update(rname, clock: clock, meta: new_meta)
395
- end
449
+ # the local clock dominates, keep local data
450
+ debug(
451
+ "sync metadata for #{inspect(rpid)} (#{inspect(rmeta)}) is causally dominated by local, ignoring..."
452
+ )
453
+
454
+ :ok
455
+
456
+ :eq ->
457
+ # the clocks are the same, no-op
458
+ debug(
459
+ "sync metadata for #{inspect(rpid)} (#{inspect(rmeta)}) has equal clocks, ignoring..."
460
+ )
461
+
462
+ :ok
463
+
464
+ :concurrent ->
465
+ warn("local and remote metadata for #{inspect(rname)} was concurrently modified")
466
+ new_meta = Map.merge(lmeta, rmeta)
467
+
468
+ # we're going to join and bump our local clock though and re-broadcast the update to ensure we converge
469
+ lclock = Clock.join(lclock, rclock)
470
+ lclock = Clock.event(lclock)
471
+ Registry.update(rname, meta: new_meta, clock: lclock)
472
+ broadcast_event(state.nodes, lclock, {:update_meta, new_meta, rpid})
396
473
end
397
- state
474
+
398
475
entry(pid: lpid, clock: lclock) = lreg ->
399
476
# there are two different processes for the same name, we need to resolve
400
477
case Clock.compare(lclock, rclock) do
401
478
:lt ->
402
479
# the remote registration dominates
403
480
resolve_incorrect_local_reg(sync_node, lreg, rreg, state)
481
+
404
482
:gt ->
405
483
# local registration dominates
406
- debug "remote view of #{inspect rname} is outdated, resolving.."
484
+ debug("remote view of #{inspect(rname)} is outdated, resolving..")
407
485
resolve_incorrect_remote_reg(sync_node, lreg, rreg, state)
486
+
408
487
_ ->
409
488
# the entry clocks conflict, determine which one is correct based on
410
489
# current topology and resolve the conflict
411
490
rpid_node = node(rpid)
412
491
lpid_node = node(lpid)
492
+
413
493
case Strategy.key_to_node(state.strategy, rname) do
414
494
^rpid_node when lpid_node != rpid_node ->
415
- debug "remote and local view of #{inspect rname} conflict, but remote is correct, resolving.."
495
+ debug(
496
+ "remote and local view of #{inspect(rname)} conflict, but remote is correct, resolving.."
497
+ )
498
+
416
499
resolve_incorrect_local_reg(sync_node, lreg, rreg, state)
500
+
417
501
^lpid_node when lpid_node != rpid_node ->
418
- debug "remote and local view of #{inspect rname} conflict, but local is correct, resolving.."
502
+ debug(
503
+ "remote and local view of #{inspect(rname)} conflict, but local is correct, resolving.."
504
+ )
505
+
419
506
resolve_incorrect_remote_reg(sync_node, lreg, rreg, state)
507
+
420
508
_ ->
421
509
cond do
422
510
lpid_node == rpid_node and lpid > rpid ->
423
- debug "remote and local view of #{inspect rname} conflict, but local is more recent, resolving.."
511
+ debug(
512
+ "remote and local view of #{inspect(rname)} conflict, but local is more recent, resolving.."
513
+ )
514
+
424
515
resolve_incorrect_remote_reg(sync_node, lreg, rreg, state)
516
+
425
517
lpid_node == rpid_node and lpid < rpid ->
426
- debug "remote and local view of #{inspect rname} conflict, but remote is more recent, resolving.."
518
+ debug(
519
+ "remote and local view of #{inspect(rname)} conflict, but remote is more recent, resolving.."
520
+ )
521
+
427
522
resolve_incorrect_local_reg(sync_node, lreg, rreg, state)
523
+
428
524
:else ->
429
- # name should be on another node, so neither registration is correct, break tie
430
- # using registry clock instead
431
- case Clock.compare(clock, sync_clock) do
432
- :lt ->
433
- # remote dominates
434
- resolve_incorrect_local_reg(sync_node, lreg, rreg, state)
435
- :gt ->
436
- # local dominates
437
- debug "remote view of #{inspect rname} is outdated based on registry clock, resolving.."
438
- resolve_incorrect_remote_reg(sync_node, lreg, rreg, state)
439
- _ when lpid_node > rpid_node ->
440
- # break tie using node priority
441
- debug "remote view of #{inspect rname} is outdated based on node priority, resolving.."
442
- resolve_incorrect_remote_reg(sync_node, lreg, rreg, state)
443
- _ ->
444
- # break tie using node priority
445
- resolve_incorrect_local_reg(sync_node, lreg, rreg, state)
446
- end
525
+ # name should be on another node, so neither registration is correct
526
+ debug(
527
+ "remote and local view of #{inspect(rname)} are both outdated, resolving.."
528
+ )
529
+
530
+ resolve_incorrect_local_reg(sync_node, lreg, rreg, state)
447
531
end
448
532
end
449
533
end
450
534
end
451
535
end)
536
+
537
+ state
452
538
end
453
539
454
540
defp resolve_pending_sync_requests(%TrackerState{pending_sync_reqs: []} = state) do
455
- info "pending sync requests cleared"
541
+ info("pending sync requests cleared")
542
+
456
543
case state.sync_ref do
457
544
nil -> :ok
458
545
ref -> Process.demonitor(ref, [:flush])
459
546
end
547
+
460
548
{:next_state, :tracking, %{state | sync_node: nil, sync_ref: nil}}
461
549
end
462
- defp resolve_pending_sync_requests(%TrackerState{pending_sync_reqs: [pid|pending]} = state) do
550
+
551
+ defp resolve_pending_sync_requests(
552
+ %TrackerState{sync_node: sync_node, pending_sync_reqs: [pid | pending]} = state
553
+ )
554
+ when sync_node == node(pid) do
555
+ info("discarding sync_node from pending_sync_reqs")
556
+
557
+ resolve_pending_sync_requests(%{state | pending_sync_reqs: pending})
558
+ end
559
+
560
+ defp resolve_pending_sync_requests(%TrackerState{pending_sync_reqs: [pid | pending]} = state) do
463
561
pending_node = node(pid)
464
562
# Remove monitoring of the previous sync node
465
563
case state.sync_ref do
466
564
nil -> :ok
467
565
ref -> Process.demonitor(ref, [:flush])
468
566
end
567
+
469
568
cond do
470
569
Enum.member?(state.nodes, pending_node) ->
471
- info "clearing pending sync request for #{pending_node}"
570
+ info("clearing pending sync request for #{pending_node}")
472
571
{lclock, rclock} = Clock.fork(state.clock)
572
+ debug("forking clock when resolving: #{inspect state.clock}, lclock: #{inspect lclock}, rclock: #{inspect rclock}")
473
573
ref = Process.monitor(pid)
474
- GenStateMachine.cast(pid, {:sync_recv, self(), rclock, Registry.snapshot()})
475
- new_state = %{state | sync_node: node(pid),
476
- sync_ref: ref,
477
- pending_sync_reqs: pending,
478
- clock: lclock}
574
+ GenStateMachine.cast(pid, {:sync_recv, self(), rclock, get_registry_snapshot()})
575
+
576
+ new_state = %{
577
+ state
578
+ | sync_node: node(pid),
579
+ sync_ref: ref,
580
+ pending_sync_reqs: pending,
581
+ clock: lclock
582
+ }
583
+
479
584
{:next_state, :awaiting_sync_ack, new_state}
585
+
480
586
:else ->
481
- resolve_pending_sync_requests(%{state | sync_node: nil, sync_ref: nil, pending_sync_reqs: pending})
587
+ resolve_pending_sync_requests(%{
588
+ state
589
+ | sync_node: nil,
590
+ sync_ref: nil,
591
+ pending_sync_reqs: pending
592
+ })
482
593
end
483
594
end
484
595
485
- def awaiting_sync_ack(:cast, {:sync_ack, from, sync_clock, registry}, %TrackerState{sync_node: sync_node} = state)
486
- when sync_node == node(from) do
487
- info "received sync acknowledgement from #{node(from)}, syncing with remote registry"
488
- new_state = sync_registry(from, sync_clock, registry, state)
489
- info "local synchronization with #{node(from)} complete!"
490
- resolve_pending_sync_requests(new_state)
596
+ def awaiting_sync_ack(
597
+ :cast,
598
+ {:sync_ack, from, sync_clock, registry},
599
+ %TrackerState{sync_node: sync_node} = state
600
+ )
601
+ when sync_node == node(from) do
602
+ info("received sync acknowledgement from #{node(from)}, syncing with remote registry")
603
+ new_state = sync_registry(from, sync_clock, registry, state)
604
+ info("local synchronization with #{node(from)} complete!")
605
+ resolve_pending_sync_requests(new_state)
491
606
end
492
- def awaiting_sync_ack(:info, {:DOWN, ref, _type, _pid, _info}, %TrackerState{sync_ref: ref} = state) do
493
- warn "wait for acknowledgement from #{state.sync_node} cancelled, tracker down"
607
+
608
+ def awaiting_sync_ack(
609
+ :info,
610
+ {:DOWN, ref, _type, _pid, _info},
611
+ %TrackerState{sync_ref: ref} = state
612
+ ) do
613
+ warn("wait for acknowledgement from #{state.sync_node} cancelled, tracker down")
494
614
resolve_pending_sync_requests(%{state | sync_node: nil, sync_ref: nil})
495
615
end
616
+
496
617
def awaiting_sync_ack(:info, {:nodeup, node, _}, %TrackerState{} = state) do
497
- new_state = case nodeup(state, node) do
498
- {:ok, new_state} -> new_state
499
- {:ok, new_state, _next_state} -> new_state
500
- end
618
+ new_state =
619
+ case nodeup(state, node) do
620
+ {:ok, new_state} -> new_state
621
+ {:ok, new_state, _next_state} -> new_state
622
+ end
623
+
501
624
{:keep_state, new_state}
502
625
end
626
+
503
627
def awaiting_sync_ack(:info, {:nodedown, node, _}, %TrackerState{sync_node: node} = state) do
504
- new_state = case nodedown(state, node) do
505
- {:ok, new_state} -> new_state
506
- {:ok, new_state, _next_state} -> new_state
507
- end
628
+ new_state =
629
+ case nodedown(state, node) do
630
+ {:ok, new_state} -> new_state
631
+ {:ok, new_state, _next_state} -> new_state
632
+ end
633
+
508
634
Process.demonitor(state.sync_ref, [:flush])
509
635
resolve_pending_sync_requests(%{new_state | sync_node: nil, sync_ref: nil})
510
636
end
637
+
511
638
def awaiting_sync_ack(:info, {:nodedown, node, _}, %TrackerState{} = state) do
512
- new_state = case nodedown(state, node) do
513
- {:ok, new_state} -> new_state
514
- {:ok, new_state, _next_state} -> new_state
515
- end
639
+ new_state =
640
+ case nodedown(state, node) do
641
+ {:ok, new_state} -> new_state
642
+ {:ok, new_state, _next_state} -> new_state
643
+ end
644
+
516
645
{:keep_state, new_state}
517
646
end
647
+
518
648
def awaiting_sync_ack(_event_type, _event_data, _state) do
519
649
{:keep_state_and_data, :postpone}
520
650
end
 
@@ -523,6 +653,7 @@ defmodule Swarm.Tracker do
523
653
# A child process started by this tracker has crashed
524
654
:keep_state_and_data
525
655
end
656
+
526
657
def tracking(:info, {:nodeup, node, _}, %TrackerState{nodes: []} = state) do
527
658
if ignore_node?(node) do
528
659
:keep_state_and_data
 
@@ -536,50 +667,62 @@ defmodule Swarm.Tracker do
536
667
{:ok, new_state} -> new_state
537
668
{:ok, new_state, _next_state} -> new_state
538
669
end
670
+
539
671
cluster_wait(:info, :cluster_join, new_state)
540
672
end
541
673
end
674
+
542
675
def tracking(:info, {:nodeup, node, _}, state) do
543
676
state
544
677
|> nodeup(node)
545
678
|> handle_node_status()
546
679
end
680
+
547
681
def tracking(:info, {:nodedown, node, _}, state) do
548
682
state
549
683
|> nodedown(node)
550
684
|> handle_node_status()
551
685
end
686
+
552
687
def tracking(:info, {:ensure_swarm_started_on_remote_node, node, attempts}, state) do
553
688
state
554
689
|> ensure_swarm_started_on_remote_node(node, attempts)
555
690
|> handle_node_status()
556
691
end
692
+
557
693
def tracking(:info, :anti_entropy, state) do
558
694
anti_entropy(state)
559
695
end
696
+
560
697
# A change event received from another replica/node
561
698
def tracking(:cast, {:event, from, rclock, event}, state) do
562
699
handle_replica_event(from, event, rclock, state)
563
700
end
701
+
564
702
# Received a handoff request from a node
565
703
def tracking(:cast, {:handoff, from, {name, meta, handoff_state, rclock}}, state) do
566
704
handle_handoff(from, {name, meta, handoff_state, rclock}, state)
567
705
end
706
+
568
707
# A remote registration failed due to nodedown during the call
569
708
def tracking(:cast, {:retry, from, {:track, name, m, f, a}}, state) do
570
709
handle_retry(from, {:track, name, %{mfa: {m, f, a}}}, state)
571
710
end
711
+
572
712
# A change event received locally
573
713
def tracking({:call, from}, msg, state) do
574
714
handle_call(msg, from, state)
575
715
end
716
+
576
717
def tracking(:cast, msg, state) do
577
718
handle_cast(msg, state)
578
719
end
720
+
579
721
# A tracked process has gone down
580
722
def tracking(:info, {:DOWN, ref, _type, pid, info}, state) do
581
723
handle_monitor(ref, pid, info, state)
582
724
end
725
+
583
726
def tracking(event_type, event_data, state) do
584
727
handle_event(event_type, event_data, state)
585
728
end
 
@@ -592,9 +735,10 @@ defmodule Swarm.Tracker do
592
735
Process.send_after(self(), :anti_entropy, interval)
593
736
:keep_state_and_data
594
737
end
738
+
595
739
def anti_entropy(%TrackerState{nodes: nodes} = state) do
596
740
sync_node = Enum.random(nodes)
597
- info "syncing with #{sync_node}"
741
+ info("syncing with #{sync_node}")
598
742
ref = Process.monitor({__MODULE__, sync_node})
599
743
GenStateMachine.cast({__MODULE__, sync_node}, {:sync, self(), state.clock})
600
744
new_state = %{state | sync_node: sync_node, sync_ref: ref}
 
@@ -603,11 +747,11 @@ defmodule Swarm.Tracker do
603
747
{:next_state, :syncing, new_state}
604
748
end
605
749
606
-
607
750
# This message is sent as a broadcast message for replication
608
751
def handle_event(:info, {:event, from, rclock, event}, state) do
609
752
handle_replica_event(from, event, rclock, state)
610
753
end
754
+
611
755
# If we receive cluster_join outside of cluster_wait it's because
612
756
# we implicitly joined the cluster due to a sync event, we know if
613
757
# we receive such an event the cluster is already formed due to how
 
@@ -615,15 +759,18 @@ defmodule Swarm.Tracker do
615
759
def handle_event(:info, :cluster_join, _state) do
616
760
:keep_state_and_data
617
761
end
762
+
618
763
def handle_event({:call, from}, msg, state) do
619
764
handle_call(msg, from, state)
620
765
end
766
+
621
767
def handle_event(:cast, msg, state) do
622
768
handle_cast(msg, state)
623
769
end
770
+
624
771
# Default event handler
625
772
def handle_event(event_type, event_data, _state) do
626
- debug "unexpected event: #{inspect {event_type, event_data}}"
773
+ debug("unexpected event: #{inspect({event_type, event_data})}")
627
774
:keep_state_and_data
628
775
end
629
776
 
@@ -632,33 +779,41 @@ defmodule Swarm.Tracker do
632
779
end
633
780
634
781
defp handle_node_status({:ok, new_state}), do: {:keep_state, new_state}
782
+
635
783
defp handle_node_status({:ok, new_state, {:topology_change, change_info}}) do
636
784
handle_topology_change(change_info, new_state)
637
785
end
638
786
639
787
# This is the callback for when a process is being handed off from a remote node to this node.
640
- defp handle_handoff(from, {name, meta, handoff_state, rclock}, %TrackerState{clock: clock} = state) do
788
+ defp handle_handoff(
789
+ from,
790
+ {name, meta, handoff_state, rclock},
791
+ %TrackerState{clock: clock} = state
792
+ ) do
641
793
try do
642
794
# If a network split is being healed, we almost certainly will have a
643
795
# local registration already for this name (since it was present on this side of the split)
644
796
# If not, we'll restart it, but if so, we'll send the handoff state to the old process and
645
797
# let it determine how to resolve the conflict
646
- current_node = Node.self
798
+ current_node = Node.self()
799
+
647
800
case Registry.get_by_name(name) do
648
801
:undefined ->
649
802
{{m, f, a}, _other_meta} = Map.pop(meta, :mfa)
650
803
{:ok, pid} = apply(m, f, a)
651
804
GenServer.cast(pid, {:swarm, :end_handoff, handoff_state})
652
805
ref = Process.monitor(pid)
653
- new_clock = Clock.event(clock)
654
- Registry.new!(entry(name: name, pid: pid, ref: ref, meta: meta, clock: Clock.peek(new_clock)))
655
- broadcast_event(state.nodes, Clock.peek(new_clock), {:track, name, pid, meta})
656
- {:keep_state, %{state | clock: new_clock}}
806
+ lclock = Clock.join(clock, rclock)
807
+ Registry.new!(entry(name: name, pid: pid, ref: ref, meta: meta, clock: lclock))
808
+ broadcast_event(state.nodes, lclock, {:track, name, pid, meta})
809
+ {:keep_state, state}
810
+
657
811
entry(pid: pid) when node(pid) == current_node ->
658
812
GenServer.cast(pid, {:swarm, :resolve_conflict, handoff_state})
659
- new_clock = Clock.event(clock)
660
- broadcast_event(state.nodes, Clock.peek(new_clock), {:track, name, pid, meta})
661
- {:keep_state, %{state | clock: new_clock}}
813
+ lclock = Clock.join(clock, rclock)
814
+ broadcast_event(state.nodes, lclock, {:track, name, pid, meta})
815
+ {:keep_state, state}
816
+
662
817
entry(pid: pid, ref: ref) = obj when node(pid) == node(from) ->
663
818
# We have received the handoff before we've received the untrack event, but because
664
819
# the handoff is coming from the node where the registration existed, we can safely
 
@@ -670,7 +825,7 @@ defmodule Swarm.Tracker do
670
825
end
671
826
catch
672
827
kind, err ->
673
- error Exception.format(kind, err, System.stacktrace)
828
+ error(Exception.format(kind, err, System.stacktrace()))
674
829
:keep_state_and_data
675
830
end
676
831
end
 
@@ -678,349 +833,394 @@ defmodule Swarm.Tracker do
678
833
# This is the callback for when a nodeup/down event occurs after the tracker has entered
679
834
# the main receive loop. Topology changes are handled a bit differently during startup.
680
835
defp handle_topology_change({type, remote_node}, %TrackerState{} = state) do
681
- debug "topology change (#{type} for #{remote_node})"
836
+ debug("topology change (#{type} for #{remote_node})")
682
837
current_node = state.self
683
- new_clock = Registry.reduce(state.clock, fn
684
- entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, lclock when node(pid) == current_node ->
685
- case Strategy.key_to_node(state.strategy, name) do
686
- :undefined ->
687
- # No node available to host process, it must be stopped
688
- debug "#{inspect pid} must be stopped as no node is available to host it"
689
- {:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
690
- send(pid, {:swarm, :die})
691
- new_state.clock
692
- ^current_node ->
693
- # This process is correct
694
- lclock
695
- other_node ->
696
- debug "#{inspect pid} belongs on #{other_node}"
697
- # This process needs to be moved to the new node
698
- try do
699
- case GenServer.call(pid, {:swarm, :begin_handoff}) do
700
- :ignore ->
701
- debug "#{inspect name} has requested to be ignored"
702
- lclock
703
- {:resume, handoff_state} ->
704
- debug "#{inspect name} has requested to be resumed"
705
- {:ok, state} = remove_registration(obj, %{state | clock: lclock})
706
- send(pid, {:swarm, :die})
707
- debug "sending handoff for #{inspect name} to #{other_node}"
708
- GenStateMachine.cast({__MODULE__, other_node},
709
- {:handoff, self(), {name, meta, handoff_state, Clock.peek(state.clock)}})
710
- state.clock
711
- :restart ->
712
- debug "#{inspect name} has requested to be restarted"
713
- {:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
714
- send(pid, {:swarm, :die})
715
- case do_track(%Tracking{name: name, meta: meta}, new_state) do
716
- :keep_state_and_data -> new_state.clock
717
- {:keep_state, new_state} -> new_state.clock
718
- end
719
- end
720
- catch
721
- _, err ->
722
- warn "handoff failed for #{inspect name}: #{inspect err}"
723
- lclock
724
- end
725
- end
726
- entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, lclock when is_map(meta) ->
727
- cond do
728
- Enum.member?(state.nodes, node(pid)) ->
729
- # the parent node is still up
730
- lclock
731
- :else ->
732
- # pid is dead, we're going to restart it
733
- case Strategy.key_to_node(state.strategy, name) do
734
- :undefined ->
735
- # No node available to restart process on, so remove registration
736
- warn "no node available to restart #{inspect name}"
737
- {:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
738
- new_state.clock
739
- ^current_node ->
740
- debug "restarting #{inspect name} on #{current_node}"
741
- {:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
742
- case do_track(%Tracking{name: name, meta: meta}, new_state) do
743
- :keep_state_and_data -> new_state.clock
744
- {:keep_state, new_state} -> new_state.clock
745
- end
746
- _other_node ->
747
- # other_node will tell us to unregister/register the restarted pid
748
- lclock
749
- end
750
- end
751
- entry(name: name, pid: pid) = obj, lclock ->
752
- pid_node = node(pid)
753
- cond do
754
- pid_node == current_node or Enum.member?(state.nodes, pid_node) ->
755
- # the parent node is still up
756
- lclock
757
- :else ->
758
- # the parent node is down, but we cannot restart this pid, so unregister it
759
- debug "removing registration for #{inspect name}, #{pid_node} is down"
760
- {:ok, new_state} = remove_registration(obj, %{state | clock: lclock})
761
- new_state.clock
762
- end
763
- end)
764
838
765
- info "topology change complete"
766
- {:keep_state, %{state | clock: new_clock}}
839
+ new_state =
840
+ Registry.reduce(state, fn
841
+ entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, state
842
+ when node(pid) == current_node ->
843
+ case Strategy.key_to_node(state.strategy, name) do
844
+ :undefined ->
845
+ # No node available to host process, it must be stopped
846
+ debug("#{inspect(pid)} must be stopped as no node is available to host it")
847
+ {:ok, new_state} = remove_registration(obj, state)
848
+ send(pid, {:swarm, :die})
849
+ new_state
850
+
851
+ ^current_node ->
852
+ # This process is correct
853
+ state
854
+
855
+ other_node ->
856
+ debug("#{inspect(pid)} belongs on #{other_node}")
857
+ # This process needs to be moved to the new node
858
+ try do
859
+ case GenServer.call(pid, {:swarm, :begin_handoff}) do
860
+ :ignore ->
861
+ debug("#{inspect(name)} has requested to be ignored")
862
+ state
863
+
864
+ {:resume, handoff_state} ->
865
+ debug("#{inspect(name)} has requested to be resumed")
866
+ {:ok, new_state} = remove_registration(obj, state)
867
+ send(pid, {:swarm, :die})
868
+ debug("sending handoff for #{inspect(name)} to #{other_node}")
869
+
870
+ GenStateMachine.cast(
871
+ {__MODULE__, other_node},
872
+ {:handoff, self(), {name, meta, handoff_state, Clock.peek(new_state.clock)}}
873
+ )
874
+
875
+ new_state
876
+
877
+ :restart ->
878
+ debug("#{inspect(name)} has requested to be restarted")
879
+ {:ok, new_state} = remove_registration(obj, state)
880
+ send(pid, {:swarm, :die})
881
+
882
+ case do_track(%Tracking{name: name, meta: meta}, new_state) do
883
+ :keep_state_and_data -> new_state
884
+ {:keep_state, new_state} -> new_state
885
+ end
886
+ end
887
+ catch
888
+ _, err ->
889
+ warn("handoff failed for #{inspect(name)}: #{inspect(err)}")
890
+ state
891
+ end
892
+ end
893
+
894
+ entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj, state when is_map(meta) ->
895
+ cond do
896
+ Enum.member?(state.nodes, node(pid)) ->
897
+ # the parent node is still up
898
+ state
899
+
900
+ :else ->
901
+ # pid is dead, we're going to restart it
902
+ case Strategy.key_to_node(state.strategy, name) do
903
+ :undefined ->
904
+ # No node available to restart process on, so remove registration
905
+ warn("no node available to restart #{inspect(name)}")
906
+ {:ok, new_state} = remove_registration(obj, state)
907
+ new_state
908
+
909
+ ^current_node ->
910
+ debug("restarting #{inspect(name)} on #{current_node}")
911
+ {:ok, new_state} = remove_registration(obj, state)
912
+
913
+ case do_track(%Tracking{name: name, meta: meta}, new_state) do
914
+ :keep_state_and_data -> new_state
915
+ {:keep_state, new_state} -> new_state
916
+ end
917
+
918
+ _other_node ->
919
+ # other_node will tell us to unregister/register the restarted pid
920
+ state
921
+ end
922
+ end
923
+
924
+ entry(name: name, pid: pid) = obj, state ->
925
+ pid_node = node(pid)
926
+
927
+ cond do
928
+ pid_node == current_node or Enum.member?(state.nodes, pid_node) ->
929
+ # the parent node is still up
930
+ state
931
+
932
+ :else ->
933
+ # the parent node is down, but we cannot restart this pid, so unregister it
934
+ debug("removing registration for #{inspect(name)}, #{pid_node} is down")
935
+ {:ok, new_state} = remove_registration(obj, state)
936
+ new_state
937
+ end
938
+ end)
939
+
940
+ info("topology change complete")
941
+ {:keep_state, new_state}
767
942
end
768
943
769
944
# This is the callback for tracker events which are being replicated from other nodes in the cluster
770
- defp handle_replica_event(_from, {:track, name, pid, meta}, rclock, %TrackerState{clock: clock} = state) do
771
- debug "replicating registration for #{inspect name} (#{inspect pid}) locally"
945
+ defp handle_replica_event(_from, {:track, name, pid, meta}, rclock, %TrackerState{clock: clock}) do
946
+ debug("replicating registration for #{inspect(name)} (#{inspect(pid)}) locally")
947
+
772
948
case Registry.get_by_name(name) do
773
949
entry(name: ^name, pid: ^pid, meta: ^meta) ->
774
950
# We're already up to date
775
951
:keep_state_and_data
776
- entry(name: ^name, pid: ^pid, meta: lmeta) ->
952
+
953
+ entry(name: ^name, pid: ^pid, clock: lclock) ->
777
954
# We don't have the same view of the metadata
778
955
cond do
779
- Clock.leq(clock, rclock) ->
956
+ Clock.leq(lclock, rclock) ->
780
957
# The remote version is dominant
781
- Registry.update(name, meta: Map.merge(lmeta, meta))
782
- {:keep_state, %{state | clock: Clock.event(clock)}}
783
- Clock.leq(rclock, clock) ->
958
+ lclock = Clock.join(lclock, rclock)
959
+ Registry.update(name, meta: meta, clock: lclock)
960
+ :keep_state_and_data
961
+
962
+ Clock.leq(rclock, lclock) ->
784
963
# The local version is dominant
785
- Registry.update(name, meta: Map.merge(meta, lmeta))
786
- {:keep_state, %{state | clock: Clock.event(clock)}}
964
+ :keep_state_and_data
965
+
787
966
:else ->
788
- warn "received track event for #{inspect name}, but local clock conflicts with remote clock, event unhandled"
967
+ warn(
968
+ "received track event for #{inspect(name)}, but local clock conflicts with remote clock, event unhandled"
969
+ )
970
+
789
971
:keep_state_and_data
790
972
end
791
- entry(name: ^name, pid: other_pid, ref: ref) = obj ->
973
+
974
+ entry(name: ^name, pid: other_pid, ref: ref, clock: lclock) = obj ->
792
975
# we have conflicting views of this name, compare clocks and fix it
793
- current_node = Node.self
976
+ current_node = Node.self()
977
+
794
978
cond do
795
- Clock.leq(clock, rclock) and node(other_pid) == current_node ->
979
+ Clock.leq(lclock, rclock) and node(other_pid) == current_node ->
796
980
# The remote version is dominant, kill the local pid and remove the registration
797
981
Process.demonitor(ref, [:flush])
798
982
Process.exit(other_pid, :kill)
799
983
Registry.remove(obj)
800
984
new_ref = Process.monitor(pid)
801
- Registry.new!(entry(name: name, pid: pid, ref: new_ref, meta: meta, clock: rclock))
802
- {:keep_state, %{state | clock: Clock.event(clock)}}
803
- Clock.leq(rclock, clock) ->
985
+ lclock = Clock.join(lclock, rclock)
986
+ Registry.new!(entry(name: name, pid: pid, ref: new_ref, meta: meta, clock: lclock))
987
+ :keep_state_and_data
988
+
989
+ Clock.leq(rclock, lclock) ->
804
990
# The local version is dominant, so ignore this event
805
991
:keep_state_and_data
992
+
806
993
:else ->
807
994
# The clocks are conflicted, warn, and ignore this event
808
- warn "received track event for #{inspect name}, mismatched pids, " <>
809
- "local clock conflicts with remote clock, event unhandled"
995
+ warn(
996
+ "received track event for #{inspect(name)}, mismatched pids, local clock conflicts with remote clock, event unhandled"
997
+ )
998
+
810
999
:keep_state_and_data
811
1000
end
1001
+
812
1002
:undefined ->
813
1003
ref = Process.monitor(pid)
814
- Registry.new!(entry(name: name, pid: pid, ref: ref, meta: meta, clock: rclock))
815
- {:keep_state, %{state | clock: Clock.event(clock)}}
1004
+ lclock = Clock.join(clock, rclock)
1005
+ Registry.new!(entry(name: name, pid: pid, ref: ref, meta: meta, clock: lclock))
1006
+ :keep_state_and_data
816
1007
end
817
1008
end
818
- defp handle_replica_event(_from, {:untrack, pid}, rclock, %TrackerState{clock: clock} = state) do
819
- debug "replica event: untrack #{inspect pid}"
1009
+
1010
+ defp handle_replica_event(_from, {:untrack, pid}, rclock, _state) do
1011
+ debug("replica event: untrack #{inspect(pid)}")
1012
+
820
1013
case Registry.get_by_pid(pid) do
821
1014
:undefined ->
822
1015
:keep_state_and_data
1016
+
823
1017
entries when is_list(entries) ->
824
- new_clock = Enum.reduce(entries, clock, fn entry(ref: ref, clock: lclock) = obj, nclock ->
1018
+ Enum.each(entries, fn entry(ref: ref, clock: lclock) = obj ->
825
1019
cond do
826
1020
Clock.leq(lclock, rclock) ->
827
1021
# registration came before unregister, so remove the registration
828
1022
Process.demonitor(ref, [:flush])
829
1023
Registry.remove(obj)
830
- Clock.event(nclock)
1024
+
831
1025
Clock.leq(rclock, lclock) ->
832
1026
# registration is newer than de-registration, ignore msg
833
- debug "untrack is causally dominated by track for #{inspect pid}, ignoring.."
834
- nclock
1027
+ debug("untrack is causally dominated by track for #{inspect(pid)}, ignoring..")
1028
+
835
1029
:else ->
836
- debug "untrack is causally conflicted with track for #{inspect pid}, ignoring.."
837
- nclock
1030
+ debug("untrack is causally conflicted with track for #{inspect(pid)}, ignoring..")
838
1031
end
839
1032
end)
840
- {:keep_state, %{state | clock: new_clock}}
1033
+
1034
+ :keep_state_and_data
841
1035
end
842
1036
end
843
- defp handle_replica_event(_from, {:add_meta, key, value, pid}, rclock, %TrackerState{clock: clock} = state) do
844
- debug "replica event: add_meta #{inspect {key, value}} to #{inspect pid}"
1037
+
1038
+ defp handle_replica_event(_from, {:update_meta, new_meta, pid}, rclock, state) do
1039
+ debug("replica event: update_meta #{inspect(new_meta)} for #{inspect(pid)}")
1040
+
845
1041
case Registry.get_by_pid(pid) do
846
1042
:undefined ->
847
1043
:keep_state_and_data
1044
+
848
1045
entries when is_list(entries) ->
849
- new_clock = Enum.reduce(entries, clock, fn entry(name: name, meta: old_meta, clock: lclock), nclock ->
1046
+ Enum.each(entries, fn entry(name: name, meta: old_meta, clock: lclock) ->
850
1047
cond do
851
1048
Clock.leq(lclock, rclock) ->
852
- new_meta = Map.put(old_meta, key, value)
853
- Registry.update(name, [meta: new_meta, clock: rclock])
854
- nclock = Clock.event(nclock)
1049
+ lclock = Clock.join(lclock, rclock)
1050
+ Registry.update(name, meta: new_meta, clock: lclock)
1051
+
1052
+ debug(
1053
+ "request to update meta from #{inspect(pid)} (#{inspect(new_meta)}) is causally dominated by remote, updated registry..."
1054
+ )
1055
+
855
1056
Clock.leq(rclock, lclock) ->
856
- cond do
857
- Map.has_key?(old_meta, key) ->
858
- debug "request to add meta to #{inspect pid} (#{inspect {key, value}}) is causally dominated by local, ignoring.."
859
- nclock
860
- :else ->
861
- new_meta = Map.put(old_meta, key, value)
862
- Registry.update(name, [meta: new_meta, clock: rclock])
863
- nclock = Clock.event(nclock)
864
- end
865
- :else ->
866
- # we're going to take the last-writer wins approach for resolution for now
867
- new_meta = Map.merge(old_meta, %{key => value})
868
- # we're going to keep our local clock though and re-broadcast the update to ensure we converge
869
- nclock = Clock.event(clock)
870
- Registry.update(name, [meta: new_meta, clock: Clock.peek(nclock)])
871
- debug "conflicting meta for #{inspect name}, updating and notifying other nodes"
872
- broadcast_event(state.nodes, Clock.peek(nclock), {:update_meta, new_meta, pid})
873
- nclock
874
- end
875
- end)
876
- {:keep_state, %{state | clock: new_clock}}
877
- end
878
- end
879
- defp handle_replica_event(_from, {:update_meta, new_meta, pid}, rclock, %TrackerState{clock: clock} = state) do
880
- debug "replica event: update_meta #{inspect new_meta} for #{inspect pid}"
881
- case Registry.get_by_pid(pid) do
882
- :undefined ->
883
- :keep_state_and_data
884
- entries when is_list(entries) ->
885
- new_clock = Enum.reduce(entries, clock, fn entry(name: name, meta: old_meta, clock: lclock), nclock ->
886
- cond do
887
- Clock.leq(lclock, rclock) ->
888
- meta = Map.merge(old_meta, new_meta)
889
- Registry.update(name, [meta: meta, clock: rclock])
890
- Clock.event(nclock)
891
- Clock.leq(rclock, lclock) ->
892
- meta = Map.merge(new_meta, old_meta)
893
- Registry.update(name, [meta: meta, clock: rclock])
894
- Clock.event(nclock)
895
- :else ->
896
- # we're going to take the last-writer wins approach for resolution for now
897
- new_meta = Map.merge(old_meta, new_meta)
898
- # we're going to keep our local clock though and re-broadcast the update to ensure we converge
899
- nclock = Clock.event(nclock)
900
- Registry.update(name, [meta: new_meta, clock: Clock.peek(nclock)])
901
- debug "conflicting meta for #{inspect name}, updating and notifying other nodes"
902
- broadcast_event(state.nodes, Clock.peek(nclock), {:update_meta, new_meta, pid})
903
- nclock
904
- end
905
- end)
906
- {:keep_state, %{state | clock: new_clock}}
907
- end
908
- end
909
- defp handle_replica_event(_from, {:remove_meta, key, pid}, rclock, %TrackerState{clock: clock} = state) do
910
- debug "replica event: remove_meta #{inspect key} from #{inspect pid}"
911
- case Registry.get_by_pid(pid) do
912
- :undefined ->
913
- :keep_state_and_data
914
- entries when is_list(entries) ->
915
- new_clock = Enum.reduce(entries, clock, fn entry(name: name, meta: meta, clock: lclock), nclock ->
916
- cond do
917
- Clock.leq(lclock, rclock) ->
918
- new_meta = Map.drop(meta, [key])
919
- Registry.update(name, [meta: new_meta, clock: rclock])
920
- Clock.event(nclock)
921
- Clock.leq(rclock, lclock) and Map.has_key?(meta, key) ->
922
1057
# ignore the request, as the local clock dominates the remote
923
- nclock
924
- Clock.leq(rclock, lclock) ->
925
- # local dominates the remote, but the key is not present anyway
926
- nclock
927
- Map.has_key?(meta, key) ->
928
- warn "received remove_meta event, but local clock conflicts with remote clock, event unhandled"
929
- nclock
1058
+ debug(
1059
+ "request to update meta from #{inspect(pid)} (#{inspect(new_meta)}) is causally dominated by local, ignoring.."
1060
+ )
1061
+
930
1062
:else ->
931
- nclock
1063
+ new_meta = Map.merge(old_meta, new_meta)
1064
+
1065
+ # we're going to join and bump our local clock though and re-broadcast the update to ensure we converge
1066
+ debug(
1067
+ "conflicting meta for #{inspect(name)}, updating and notifying other nodes, old meta: #{
1068
+ inspect(old_meta)
1069
+ }, new meta: #{inspect(new_meta)}"
1070
+ )
1071
+
1072
+ lclock = Clock.join(lclock, rclock)
1073
+ lclock = Clock.event(lclock)
1074
+ Registry.update(name, meta: new_meta, clock: lclock)
1075
+ broadcast_event(state.nodes, lclock, {:update_meta, new_meta, pid})
932
1076
end
933
1077
end)
934
- {:keep_state, %{state | clock: new_clock}}
1078
+
1079
+ :keep_state_and_data
935
1080
end
936
1081
end
1082
+
937
1083
defp handle_replica_event(_from, event, _clock, _state) do
938
- warn "received unrecognized replica event: #{inspect event}"
1084
+ warn("received unrecognized replica event: #{inspect(event)}")
939
1085
:keep_state_and_data
940
1086
end
941
1087
942
1088
# This is the handler for local operations on the tracker which require a response.
943
1089
defp handle_call({:whereis, name}, from, %TrackerState{strategy: strategy}) do
944
- current_node = Node.self
1090
+ current_node = Node.self()
1091
+
945
1092
case Strategy.key_to_node(strategy, name) do
946
1093
:undefined ->
947
1094
GenStateMachine.reply(from, :undefined)
1095
+
948
1096
^current_node ->
949
1097
case Registry.get_by_name(name) do
950
1098
:undefined ->
951
1099
GenStateMachine.reply(from, :undefined)
1100
+
952
1101
entry(pid: pid) ->
953
1102
GenStateMachine.reply(from, pid)
954
1103
end
1104
+
955
1105
other_node ->
956
- _ = Task.Supervisor.start_child(Swarm.TaskSupervisor, fn ->
957
- case :rpc.call(other_node, Swarm.Registry, :get_by_name, [name], :infinity) do
958
- :undefined ->
959
- GenStateMachine.reply(from, :undefined)
960
- entry(pid: pid) ->
961
- GenStateMachine.reply(from, pid)
962
- {:badrpc, reason} ->
963
- warn "failed to execute remote get_by_name on #{inspect other_node}: #{inspect reason}"
964
- GenStateMachine.reply(from, :undefined)
965
- end
966
- end)
1106
+ _ =
1107
+ Task.Supervisor.start_child(Swarm.TaskSupervisor, fn ->
1108
+ case :rpc.call(other_node, Swarm.Registry, :get_by_name, [name], :infinity) do
1109
+ :undefined ->
1110
+ GenStateMachine.reply(from, :undefined)
1111
+
1112
+ entry(pid: pid) ->
1113
+ GenStateMachine.reply(from, pid)
1114
+
1115
+ {:badrpc, reason} ->
1116
+ warn(
1117
+ "failed to execute remote get_by_name on #{inspect(other_node)}: #{
1118
+ inspect(reason)
1119
+ }"
1120
+ )
1121
+
1122
+ GenStateMachine.reply(from, :undefined)
1123
+ end
1124
+ end)
967
1125
end
1126
+
968
1127
:keep_state_and_data
969
1128
end
1129
+
970
1130
defp handle_call({:track, name, pid, meta}, from, %TrackerState{} = state) do
971
- debug "registering #{inspect pid} as #{inspect name}, with metadata #{inspect meta}"
1131
+ debug("registering #{inspect(pid)} as #{inspect(name)}, with metadata #{inspect(meta)}")
972
1132
add_registration({name, pid, meta}, from, state)
973
1133
end
1134
+
974
1135
defp handle_call({:track, name, meta}, from, state) do
975
1136
current_node = Node.self()
976
1137
{{m, f, a}, _other_meta} = Map.pop(meta, :mfa)
1138
+
977
1139
case from do
978
1140
{from_pid, _} when node(from_pid) != current_node ->
979
- debug "#{inspect node(from_pid)} is registering #{inspect name} as process started by #{m}.#{f}/#{length(a)} with args #{inspect a}"
1141
+ debug(
1142
+ "#{inspect(node(from_pid))} is registering #{inspect(name)} as process started by #{m}.#{
1143
+ f
1144
+ }/#{length(a)} with args #{inspect(a)}"
1145
+ )
1146
+
980
1147
_ ->
981
- debug "registering #{inspect name} as process started by #{m}.#{f}/#{length(a)} with args #{inspect a}"
1148
+ debug(
1149
+ "registering #{inspect(name)} as process started by #{m}.#{f}/#{length(a)} with args #{
1150
+ inspect(a)
1151
+ }"
1152
+ )
982
1153
end
1154
+
983
1155
do_track(%Tracking{name: name, meta: meta, from: from}, state)
984
1156
end
1157
+
985
1158
defp handle_call({:untrack, pid}, from, %TrackerState{} = state) do
986
- debug "untrack #{inspect pid}"
1159
+ debug("untrack #{inspect(pid)}")
987
1160
{:ok, new_state} = remove_registration_by_pid(pid, state)
988
1161
GenStateMachine.reply(from, :ok)
989
1162
{:keep_state, new_state}
990
1163
end
1164
+
991
1165
defp handle_call({:add_meta, key, value, pid}, from, %TrackerState{} = state) do
992
- debug "add_meta #{inspect {key, value}} to #{inspect pid}"
1166
+ debug("add_meta #{inspect({key, value})} to #{inspect(pid)}")
993
1167
{:ok, new_state} = add_meta_by_pid({key, value}, pid, state)
994
1168
GenStateMachine.reply(from, :ok)
995
1169
{:keep_state, new_state}
996
1170
end
1171
+
997
1172
defp handle_call({:remove_meta, key, pid}, from, %TrackerState{} = state) do
998
- debug "remote_meta #{inspect key} for #{inspect pid}"
1173
+ debug("remote_meta #{inspect(key)} for #{inspect(pid)}")
999
1174
{:ok, new_state} = remove_meta_by_pid(key, pid, state)
1000
1175
GenStateMachine.reply(from, :ok)
1001
1176
{:keep_state, new_state}
1002
1177
end
1178
+ defp handle_call({:handoff, worker_name, handoff_state}, from, state) do
1179
+ Registry.get_by_name(worker_name)
1180
+ |> case do
1181
+ :undefined ->
1182
+ # Worker was already removed from registry -> do nothing
1183
+ debug "The node #{worker_name} was not found in the registry"
1184
+ entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj ->
1185
+ case Strategy.remove_node(state.strategy, state.self) |> Strategy.key_to_node(name) do
1186
+ {:error, {:invalid_ring, :no_nodes}} ->
1187
+ debug "Cannot handoff #{inspect name} because there is no other node left"
1188
+ other_node ->
1189
+ debug "#{inspect name} has requested to be terminated and resumed on another node"
1190
+ {:ok, state} = remove_registration(obj, state)
1191
+ send(pid, {:swarm, :die})
1192
+ debug "sending handoff for #{inspect name} to #{other_node}"
1193
+ GenStateMachine.cast({__MODULE__, other_node},
1194
+ {:handoff, self(), {name, meta, handoff_state, Clock.peek(state.clock)}})
1195
+ end
1196
+ end
1197
+
1198
+ GenStateMachine.reply(from, :finished)
1199
+ :keep_state_and_data
1200
+ end
1003
1201
defp handle_call(msg, _from, _state) do
1004
- warn "unrecognized call: #{inspect msg}"
1202
+ warn("unrecognized call: #{inspect(msg)}")
1005
1203
:keep_state_and_data
1006
1204
end
1007
1205
1008
1206
# This is the handler for local operations on the tracker which are asynchronous
1009
- defp handle_cast({:sync, from, _rclock}, %TrackerState{clock: clock} = state) do
1207
+ defp handle_cast({:sync, from, rclock}, %TrackerState{} = state) do
1010
1208
if ignore_node?(node(from)) do
1011
1209
GenStateMachine.cast(from, {:sync_err, :node_ignored})
1012
1210
:keep_state_and_data
1013
1211
else
1014
- debug "received sync request from #{node(from)}"
1015
- {lclock, rclock} = Clock.fork(clock)
1212
+ debug("received sync request from #{node(from)}")
1016
1213
sync_node = node(from)
1017
1214
ref = Process.monitor(from)
1018
- GenStateMachine.cast(from, {:sync_recv, self(), rclock, Registry.snapshot()})
1019
- {:next_state, :awaiting_sync_ack, %{state | clock: lclock, sync_node: sync_node, sync_ref: ref}}
1215
+ GenStateMachine.cast(from, {:sync_recv, self(), rclock, get_registry_snapshot()})
1216
+
1217
+ {:next_state, :awaiting_sync_ack,
1218
+ %{state | sync_node: sync_node, sync_ref: ref}}
1020
1219
end
1021
1220
end
1221
+
1022
1222
defp handle_cast(msg, _state) do
1023
- warn "unrecognized cast: #{inspect msg}"
1223
+ warn("unrecognized cast: #{inspect(msg)}")
1024
1224
:keep_state_and_data
1025
1225
end
1026
1226
 
@@ -1031,6 +1231,7 @@ defmodule Swarm.Tracker do
1031
1231
defp handle_retry(from, {:track, name, meta}, state) do
1032
1232
handle_call({:track, name, meta}, from, state)
1033
1233
end
1234
+
1034
1235
defp handle_retry(_from, _event, _state) do
1035
1236
:keep_state_and_data
1036
1237
end
 
@@ -1040,121 +1241,181 @@ defmodule Swarm.Tracker do
1040
1241
# lost connection to the node this pid is running on, check if we should restart it
1041
1242
case Registry.get_by_ref(ref) do
1042
1243
:undefined ->
1043
- debug "lost connection to #{inspect pid}, but no registration could be found, ignoring.."
1244
+ debug(
1245
+ "lost connection to #{inspect(pid)}, but no registration could be found, ignoring.."
1246
+ )
1247
+
1044
1248
:keep_state_and_data
1249
+
1045
1250
entry(name: name, pid: ^pid, meta: %{mfa: _mfa}) ->
1046
- debug "lost connection to #{inspect name} (#{inspect pid}) on #{node(pid)}, node is down"
1251
+ debug(
1252
+ "lost connection to #{inspect(name)} (#{inspect(pid)}) on #{node(pid)}, node is down"
1253
+ )
1254
+
1047
1255
state
1048
1256
|> nodedown(node(pid))
1049
1257
|> handle_node_status()
1258
+
1050
1259
entry(pid: ^pid) = obj ->
1051
- debug "lost connection to #{inspect pid}, but not restartable, removing registration.."
1260
+ debug("lost connection to #{inspect(pid)}, but not restartable, removing registration..")
1052
1261
{:ok, new_state} = remove_registration(obj, state)
1053
1262
{:keep_state, new_state}
1054
1263
end
1055
1264
end
1265
+
1056
1266
defp handle_monitor(ref, pid, reason, %TrackerState{} = state) do
1057
1267
case Registry.get_by_ref(ref) do
1058
1268
:undefined ->
1059
- debug "#{inspect pid} is down: #{inspect reason}, but no registration found, ignoring.."
1269
+ debug(
1270
+ "#{inspect(pid)} is down: #{inspect(reason)}, but no registration found, ignoring.."
1271
+ )
1272
+
1060
1273
:keep_state_and_data
1274
+
1061
1275
entry(name: name, pid: ^pid) = obj ->
1062
- debug "#{inspect name} is down: #{inspect reason}"
1276
+ debug("#{inspect(name)} is down: #{inspect(reason)}")
1063
1277
{:ok, new_state} = remove_registration(obj, state)
1064
1278
{:keep_state, new_state}
1065
1279
end
1066
1280
end
1067
1281
1068
1282
# Attempt to start a named process on its destination node
1069
- defp do_track(%Tracking{name: name, meta: meta, from: from}, %TrackerState{strategy: strategy} = state) do
1283
+ defp do_track(
1284
+ %Tracking{name: name, meta: meta, from: from},
1285
+ %TrackerState{strategy: strategy} = state
1286
+ ) do
1070
1287
current_node = Node.self()
1071
1288
{{m, f, a}, _other_meta} = Map.pop(meta, :mfa)
1289
+
1072
1290
case Strategy.key_to_node(strategy, name) do
1073
1291
:undefined ->
1074
- warn "no node available to start #{inspect name} process"
1292
+ warn("no node available to start #{inspect(name)} process")
1075
1293
reply(from, {:error, :no_node_available})
1076
1294
:keep_state_and_data
1295
+
1077
1296
^current_node ->
1078
1297
case Registry.get_by_name(name) do
1079
1298
:undefined ->
1080
- debug "starting #{inspect name} on #{current_node}"
1299
+ debug("starting #{inspect(name)} on #{current_node}")
1300
+
1081
1301
try do
1082
1302
case apply(m, f, a) do
1083
1303
{:ok, pid} ->
1084
- debug "started #{inspect name} on #{current_node}"
1304
+ debug("started #{inspect(name)} on #{current_node}")
1085
1305
add_registration({name, pid, meta}, from, state)
1306
+
1086
1307
err ->
1087
- warn "failed to start #{inspect name} on #{current_node}: #{inspect err}"
1308
+ warn("failed to start #{inspect(name)} on #{current_node}: #{inspect(err)}")
1088
1309
reply(from, {:error, {:invalid_return, err}})
1089
1310
:keep_state_and_data
1090
1311
end
1091
1312
catch
1092
1313
kind, reason ->
1093
- warn Exception.format(kind, reason, System.stacktrace)
1314
+ warn(Exception.format(kind, reason, System.stacktrace()))
1094
1315
reply(from, {:error, reason})
1095
1316
:keep_state_and_data
1096
1317
end
1318
+
1097
1319
entry(pid: pid) ->
1098
- debug "found #{inspect name} already registered on #{node(pid)}"
1320
+ debug("found #{inspect(name)} already registered on #{node(pid)}")
1099
1321
reply(from, {:error, {:already_registered, pid}})
1100
1322
:keep_state_and_data
1101
1323
end
1324
+
1102
1325
remote_node ->
1103
- debug "starting #{inspect name} on remote node #{remote_node}"
1104
- {:ok, _pid} = Task.start(fn ->
1105
- start_pid_remotely(remote_node, from, name, meta, state)
1106
- end)
1326
+ debug("starting #{inspect(name)} on remote node #{remote_node}")
1327
+
1328
+ {:ok, _pid} =
1329
+ Task.start(fn ->
1330
+ start_pid_remotely(remote_node, from, name, meta, state)
1331
+ end)
1332
+
1107
1333
:keep_state_and_data
1108
1334
end
1109
1335
end
1110
1336
1111
1337
# Starts a process on a remote node. Handles failures with a retry mechanism
1112
1338
defp start_pid_remotely(remote_node, from, name, meta, state, attempts \\ 0)
1113
- defp start_pid_remotely(remote_node, from, name, meta, %TrackerState{} = state, attempts) when attempts <= @retry_max_attempts do
1339
+
1340
+ defp start_pid_remotely(remote_node, from, name, meta, %TrackerState{} = state, attempts)
1341
+ when attempts <= @retry_max_attempts do
1114
1342
try do
1115
1343
case GenStateMachine.call({__MODULE__, remote_node}, {:track, name, meta}, :infinity) do
1116
1344
{:ok, pid} ->
1117
- debug "remotely started #{inspect name} (#{inspect pid}) on #{remote_node}"
1345
+ debug("remotely started #{inspect(name)} (#{inspect(pid)}) on #{remote_node}")
1118
1346
reply(from, {:ok, pid})
1347
+
1119
1348
{:error, {:already_registered, pid}} ->
1120
- debug "#{inspect name} already registered to #{inspect pid} on #{node(pid)}, registering locally"
1349
+ debug(
1350
+ "#{inspect(name)} already registered to #{inspect(pid)} on #{node(pid)}, registering locally"
1351
+ )
1352
+
1121
1353
# register named process that is unknown locally
1122
1354
add_registration({name, pid, meta}, from, state)
1123
1355
:ok
1356
+
1124
1357
{:error, {:noproc, _}} = err ->
1125
- warn "#{inspect name} could not be started on #{remote_node}: #{inspect err}, retrying operation after #{@retry_interval}ms.."
1126
- :timer.sleep @retry_interval
1358
+ warn(
1359
+ "#{inspect(name)} could not be started on #{remote_node}: #{inspect(err)}, retrying operation after #{
1360
+ @retry_interval
1361
+ }ms.."
1362
+ )
1363
+
1364
+ :timer.sleep(@retry_interval)
1127
1365
start_pid_remotely(remote_node, from, name, meta, state, attempts + 1)
1366
+
1128
1367
{:error, :undef} ->
1129
- warn "#{inspect name} could not be started on #{remote_node}: target module not available on remote node, retrying operation after #{@retry_interval}ms.."
1130
- :timer.sleep @retry_interval
1368
+ warn(
1369
+ "#{inspect(name)} could not be started on #{remote_node}: target module not available on remote node, retrying operation after #{
1370
+ @retry_interval
1371
+ }ms.."
1372
+ )
1373
+
1374
+ :timer.sleep(@retry_interval)
1131
1375
start_pid_remotely(remote_node, from, name, meta, state, attempts + 1)
1376
+
1132
1377
{:error, _reason} = err ->
1133
- warn "#{inspect name} could not be started on #{remote_node}: #{inspect err}"
1378
+ warn("#{inspect(name)} could not be started on #{remote_node}: #{inspect(err)}")
1134
1379
reply(from, err)
1135
1380
end
1136
1381
catch
1137
1382
_, {:noproc, _} ->
1138
- warn "remote tracker on #{remote_node} went down during registration, retrying operation.."
1383
+ warn(
1384
+ "remote tracker on #{remote_node} went down during registration, retrying operation.."
1385
+ )
1386
+
1139
1387
start_pid_remotely(remote_node, from, name, meta, state)
1388
+
1140
1389
_, {{:nodedown, _}, _} ->
1141
- warn "failed to start #{inspect name} on #{remote_node}: nodedown, retrying operation.."
1142
- new_state = %{state | nodes: state.nodes -- [remote_node], strategy: Strategy.remove_node(state.strategy, remote_node)}
1390
+ warn("failed to start #{inspect(name)} on #{remote_node}: nodedown, retrying operation..")
1391
+
1392
+ new_state = %{
1393
+ state
1394
+ | nodes: state.nodes -- [remote_node],
1395
+ strategy: Strategy.remove_node(state.strategy, remote_node)
1396
+ }
1397
+
1143
1398
case Strategy.key_to_node(new_state.strategy, name) do
1144
1399
:undefined ->
1145
- warn "failed to start #{inspect name} as no node available"
1400
+ warn("failed to start #{inspect(name)} as no node available")
1146
1401
reply(from, {:error, :no_node_available})
1402
+
1147
1403
new_node ->
1148
1404
start_pid_remotely(new_node, from, name, meta, new_state)
1149
1405
end
1406
+
1150
1407
kind, err ->
1151
- error Exception.format(kind, err, System.stacktrace)
1152
- warn "failed to start #{inspect name} on #{remote_node}: #{inspect err}"
1408
+ error(Exception.format(kind, err, System.stacktrace()))
1409
+ warn("failed to start #{inspect(name)} on #{remote_node}: #{inspect(err)}")
1153
1410
reply(from, {:error, err})
1154
1411
end
1155
1412
end
1413
+
1156
1414
defp start_pid_remotely(remote_node, from, name, _meta, _state, attempts) do
1157
- warn "#{inspect name} could not be started on #{remote_node}, failed to start after #{attempts} attempt(s)"
1415
+ warn(
1416
+ "#{inspect(name)} could not be started on #{remote_node}, failed to start after #{attempts} attempt(s)"
1417
+ )
1418
+
1158
1419
reply(from, {:error, :too_many_attempts})
1159
1420
end
1160
1421
 
@@ -1164,14 +1425,13 @@ defmodule Swarm.Tracker do
1164
1425
defp reply(nil, _message), do: :ok
1165
1426
defp reply(from, message), do: GenStateMachine.reply(from, message)
1166
1427
1167
- defp broadcast_event([], _clock, _event), do: :ok
1428
+ defp broadcast_event([], _clock, _event), do: :ok
1429
+
1168
1430
defp broadcast_event(nodes, clock, event) do
1169
- case :rpc.sbcast(nodes, __MODULE__, {:event, self(), clock, event}) do
1170
- {_good, []} -> :ok
1171
- {_good, bad_nodes} ->
1172
- warn "broadcast of event (#{inspect event}) was not recevied by #{inspect bad_nodes}"
1173
- :ok
1174
- end
1431
+ clock = Clock.peek(clock)
1432
+
1433
+ :abcast = :rpc.abcast(nodes, __MODULE__, {:event, self(), clock, event})
1434
+ :ok
1175
1435
end
1176
1436
1177
1437
# Add a registration and reply to the caller with the result, then return the state transition
 
@@ -1180,6 +1440,7 @@ defmodule Swarm.Tracker do
1180
1440
{:ok, reply, new_state} ->
1181
1441
reply(from, {:ok, reply})
1182
1442
{:keep_state, new_state}
1443
+
1183
1444
{:error, reply, new_state} ->
1184
1445
reply(from, {:error, reply})
1185
1446
{:keep_state, new_state}
 
@@ -1191,88 +1452,95 @@ defmodule Swarm.Tracker do
1191
1452
case Registry.get_by_name(name) do
1192
1453
:undefined ->
1193
1454
ref = Process.monitor(pid)
1194
- clock = Clock.event(clock)
1195
- Registry.new!(entry(name: name, pid: pid, ref: ref, meta: meta, clock: Clock.peek(clock)))
1196
- broadcast_event(nodes, Clock.peek(clock), {:track, name, pid, meta})
1197
- {:ok, pid, %{state | clock: clock}}
1455
+ lclock = Clock.event(clock)
1456
+ Registry.new!(entry(name: name, pid: pid, ref: ref, meta: meta, clock: lclock))
1457
+ broadcast_event(nodes, lclock, {:track, name, pid, meta})
1458
+ {:ok, pid, state}
1459
+
1198
1460
entry(pid: ^pid) ->
1199
1461
# Not sure how this could happen, but hey, no need to return an error
1200
1462
{:ok, pid, state}
1463
+
1201
1464
entry(pid: other_pid) ->
1202
- debug "conflicting registration for #{inspect name}: remote (#{inspect pid}) vs. local #{inspect other_pid}"
1465
+ debug(
1466
+ "conflicting registration for #{inspect(name)}: remote (#{inspect(pid)}) vs. local #{
1467
+ inspect(other_pid)
1468
+ }"
1469
+ )
1470
+
1203
1471
# Since there is already a registration, we need to check whether to kill the newly
1204
1472
# created process
1205
1473
pid_node = node(pid)
1206
- current_node = Node.self
1474
+ current_node = Node.self()
1475
+
1207
1476
case meta do
1208
1477
%{mfa: _} when pid_node == current_node ->
1209
1478
# This was created via register_name/5, which means we need to kill the pid we started
1210
1479
Process.exit(pid, :kill)
1480
+
1211
1481
_ ->
1212
1482
# This was a pid started by something else, so we can ignore it
1213
1483
:ok
1214
1484
end
1485
+
1215
1486
{:error, {:already_registered, other_pid}, state}
1216
1487
end
1217
1488
end
1218
1489
1219
1490
# Remove a registration, and return the result of the remove
1220
- defp remove_registration(entry(pid: pid, ref: ref) = obj, %TrackerState{clock: clock} = state) do
1491
+ defp remove_registration(entry(pid: pid, ref: ref, clock: lclock) = obj, state) do
1221
1492
Process.demonitor(ref, [:flush])
1222
1493
Registry.remove(obj)
1223
- clock = Clock.event(clock)
1224
- broadcast_event(state.nodes, Clock.peek(clock), {:untrack, pid})
1225
- {:ok, %{state | clock: clock}}
1494
+ lclock = Clock.event(lclock)
1495
+ broadcast_event(state.nodes, lclock, {:untrack, pid})
1496
+ {:ok, state}
1226
1497
end
1227
1498
1228
- defp remove_registration_by_pid(pid, %TrackerState{clock: clock} = state) do
1499
+ defp remove_registration_by_pid(pid, state) do
1229
1500
case Registry.get_by_pid(pid) do
1230
1501
:undefined ->
1231
- broadcast_event(state.nodes, Clock.peek(clock), {:untrack, pid})
1232
1502
{:ok, state}
1503
+
1233
1504
entries when is_list(entries) ->
1234
- new_clock = Enum.reduce(entries, clock, fn entry(ref: ref) = obj, nclock ->
1235
- Process.demonitor(ref, [:flush])
1236
- Registry.remove(obj)
1237
- nclock = Clock.event(nclock)
1238
- broadcast_event(state.nodes, Clock.peek(nclock), {:untrack, pid})
1239
- nclock
1505
+ Enum.each(entries, fn entry ->
1506
+ remove_registration(entry, state)
1240
1507
end)
1241
- {:ok, %{state | clock: new_clock}}
1508
+
1509
+ {:ok, state}
1242
1510
end
1243
1511
end
1244
1512
1245
- defp add_meta_by_pid({key, value}, pid, %TrackerState{clock: clock} = state) do
1513
+ defp add_meta_by_pid({key, value}, pid, state) do
1246
1514
case Registry.get_by_pid(pid) do
1247
1515
:undefined ->
1248
- broadcast_event(state.nodes, Clock.peek(clock), {:add_meta, key, value, pid})
1249
1516
{:ok, state}
1517
+
1250
1518
entries when is_list(entries) ->
1251
- new_clock = Enum.reduce(entries, clock, fn entry(name: name, meta: old_meta), nclock ->
1519
+ Enum.each(entries, fn entry(name: name, meta: old_meta, clock: lclock) ->
1252
1520
new_meta = Map.put(old_meta, key, value)
1253
- nclock = Clock.event(nclock)
1254
- Registry.update(name, [meta: new_meta, clock: Clock.peek(nclock)])
1255
- broadcast_event(state.nodes, Clock.peek(nclock), {:add_meta, key, value, pid})
1256
- nclock
1521
+ lclock = Clock.event(lclock)
1522
+ Registry.update(name, meta: new_meta, clock: lclock)
1523
+ broadcast_event(state.nodes, lclock, {:update_meta, new_meta, pid})
1257
1524
end)
1258
- {:ok, %{state | clock: new_clock}}
1525
+
1526
+ {:ok, state}
1259
1527
end
1260
1528
end
1261
1529
1262
- defp remove_meta_by_pid(key, pid, %TrackerState{clock: clock} = state) do
1530
+ defp remove_meta_by_pid(key, pid, state) do
1263
1531
case Registry.get_by_pid(pid) do
1264
1532
:undefined ->
1265
- broadcast_event(state.nodes, Clock.peek(clock), {:remove_meta, key, pid})
1266
1533
{:ok, state}
1534
+
1267
1535
entries when is_list(entries) ->
1268
- new_clock = Enum.reduce(entries, clock, fn entry(name: name, meta: old_meta), nclock ->
1536
+ Enum.each(entries, fn entry(name: name, meta: old_meta, clock: lclock) ->
1269
1537
new_meta = Map.drop(old_meta, [key])
1270
- nclock = Clock.event(nclock)
1271
- Registry.update(name, [meta: new_meta, clock: Clock.peek(nclock)])
1272
- broadcast_event(state.nodes, Clock.peek(nclock), {:remove_meta, key, pid})
1273
- nclock
1538
+ lclock = Clock.event(lclock)
1539
+ Registry.update(name, meta: new_meta, clock: lclock)
1540
+ broadcast_event(state.nodes, lclock, {:update_meta, new_meta, pid})
1274
1541
end)
1275
- {:ok, %{state | clock: new_clock}}
1542
+
1543
+ {:ok, state}
1276
1544
end
1277
1545
end
1278
1546
 
@@ -1285,6 +1553,7 @@ defmodule Swarm.Tracker do
1285
1553
|> MapSet.union(@global_blacklist)
1286
1554
|> MapSet.to_list()
1287
1555
end
1556
+
1288
1557
# The list of configured whitelist patterns for nodes
1289
1558
# If a whitelist is provided, any nodes which do not match the whitelist are ignored
1290
1559
defp node_whitelist(), do: Application.get_env(:swarm, :node_whitelist, [])
 
@@ -1301,18 +1570,29 @@ defmodule Swarm.Tracker do
1301
1570
end
1302
1571
1303
1572
# Used during anti-entropy checks to remove local registrations and replace them with the remote version
1304
- defp resolve_incorrect_local_reg(_remote_node, entry(pid: lpid) = lreg, entry(name: rname, pid: rpid, meta: rmeta, clock: rclock), state) do
1573
+ defp resolve_incorrect_local_reg(
1574
+ _remote_node,
1575
+ entry(pid: lpid, clock: lclock) = lreg,
1576
+ entry(name: rname, pid: rpid, meta: rmeta, clock: rclock),
1577
+ state
1578
+ ) do
1305
1579
# the remote registration is correct
1306
1580
{:ok, new_state} = remove_registration(lreg, state)
1307
1581
send(lpid, {:swarm, :die})
1308
1582
# add the remote registration
1309
1583
ref = Process.monitor(rpid)
1310
- Registry.new!(entry(name: rname, pid: rpid, ref: ref, meta: rmeta, clock: rclock))
1584
+ lclock = Clock.join(lclock, rclock)
1585
+ Registry.new!(entry(name: rname, pid: rpid, ref: ref, meta: rmeta, clock: lclock))
1311
1586
new_state
1312
1587
end
1313
1588
1314
1589
# Used during anti-entropy checks to remove remote registrations and replace them with the local version
1315
- defp resolve_incorrect_remote_reg(remote_node, entry(pid: lpid, meta: lmeta), entry(name: rname, pid: rpid), state) do
1590
+ defp resolve_incorrect_remote_reg(
1591
+ remote_node,
1592
+ entry(pid: lpid, meta: lmeta),
1593
+ entry(name: rname, pid: rpid),
1594
+ state
1595
+ ) do
1316
1596
GenStateMachine.cast({__MODULE__, remote_node}, {:untrack, rpid})
1317
1597
send(rpid, {:swarm, :die})
1318
1598
GenStateMachine.cast({__MODULE__, remote_node}, {:track, rname, lpid, lmeta})
 
@@ -1323,43 +1603,75 @@ defmodule Swarm.Tracker do
1323
1603
# processes to new nodes based on the new topology.
1324
1604
defp nodeup(%TrackerState{nodes: nodes, strategy: strategy} = state, node) do
1325
1605
cond do
1326
- node == Node.self ->
1606
+ node == Node.self() ->
1327
1607
new_strategy =
1328
1608
strategy
1329
1609
|> Strategy.remove_node(state.self)
1330
1610
|> Strategy.add_node(node)
1331
- info "node name changed from #{state.self} to #{node}"
1611
+
1612
+ info("node name changed from #{state.self} to #{node}")
1332
1613
{:ok, %{state | self: node, strategy: new_strategy}}
1614
+
1333
1615
Enum.member?(nodes, node) ->
1334
1616
{:ok, state}
1617
+
1335
1618
ignore_node?(node) ->
1336
1619
{:ok, state}
1620
+
1337
1621
:else ->
1338
1622
ensure_swarm_started_on_remote_node(state, node)
1339
1623
end
1340
1624
end
1341
1625
1342
1626
defp ensure_swarm_started_on_remote_node(state, node, attempts \\ 0)
1343
- defp ensure_swarm_started_on_remote_node(%TrackerState{nodes: nodes, strategy: strategy} = state, node, attempts) when attempts <= @retry_max_attempts do
1627
+
1628
+ defp ensure_swarm_started_on_remote_node(
1629
+ %TrackerState{nodes: nodes, strategy: strategy} = state,
1630
+ node,
1631
+ attempts
1632
+ )
1633
+ when attempts <= @retry_max_attempts do
1344
1634
case :rpc.call(node, :application, :which_applications, []) do
1345
1635
app_list when is_list(app_list) ->
1346
1636
case List.keyfind(app_list, :swarm, 0) do
1347
1637
{:swarm, _, _} ->
1348
- info "nodeup #{node}"
1349
- new_state = %{state | nodes: [node|nodes], strategy: Strategy.add_node(strategy, node)}
1638
+ info("nodeup #{node}")
1639
+
1640
+ new_state = %{
1641
+ state
1642
+ | nodes: [node | nodes],
1643
+ strategy: Strategy.add_node(strategy, node)
1644
+ }
1645
+
1350
1646
{:ok, new_state, {:topology_change, {:nodeup, node}}}
1647
+
1351
1648
nil ->
1352
- debug "nodeup for #{node} was ignored because swarm not started yet, will retry in #{@retry_interval}ms.."
1353
- Process.send_after(self(), {:ensure_swarm_started_on_remote_node, node, attempts + 1}, @retry_interval)
1649
+ debug(
1650
+ "nodeup for #{node} was ignored because swarm not started yet, will retry in #{
1651
+ @retry_interval
1652
+ }ms.."
1653
+ )
1654
+
1655
+ Process.send_after(
1656
+ self(),
1657
+ {:ensure_swarm_started_on_remote_node, node, attempts + 1},
1658
+ @retry_interval
1659
+ )
1660
+
1354
1661
{:ok, state}
1355
1662
end
1663
+
1356
1664
other ->
1357
- warn "nodeup for #{node} was ignored because: #{inspect other}"
1665
+ warn("nodeup for #{node} was ignored because: #{inspect(other)}")
1358
1666
{:ok, state}
1359
1667
end
1360
1668
end
1669
+
1361
1670
defp ensure_swarm_started_on_remote_node(%TrackerState{} = state, node, attempts) do
1362
- warn "nodeup for #{node} was ignored because swarm failed to start after #{attempts} attempt(s)"
1671
+ warn(
1672
+ "nodeup for #{node} was ignored because swarm failed to start after #{attempts} attempt(s)"
1673
+ )
1674
+
1363
1675
{:ok, state}
1364
1676
end
1365
1677
 
@@ -1368,13 +1680,34 @@ defmodule Swarm.Tracker do
1368
1680
defp nodedown(%TrackerState{nodes: nodes, strategy: strategy} = state, node) do
1369
1681
cond do
1370
1682
Enum.member?(nodes, node) ->
1371
- info "nodedown #{node}"
1683
+ info("nodedown #{node}")
1372
1684
strategy = Strategy.remove_node(strategy, node)
1373
- pending_reqs = Enum.filter(state.pending_sync_reqs, fn ^node -> false; _ -> true end)
1374
- new_state = %{state | nodes: nodes -- [node], strategy: strategy, pending_sync_reqs: pending_reqs}
1685
+
1686
+ pending_reqs =
1687
+ Enum.filter(state.pending_sync_reqs, fn
1688
+ ^node -> false
1689
+ _ -> true
1690
+ end)
1691
+
1692
+ new_state = %{
1693
+ state
1694
+ | nodes: nodes -- [node],
1695
+ strategy: strategy,
1696
+ pending_sync_reqs: pending_reqs
1697
+ }
1698
+
1375
1699
{:ok, new_state, {:topology_change, {:nodedown, node}}}
1700
+
1376
1701
:else ->
1377
1702
{:ok, state}
1378
1703
end
1379
1704
end
1705
+
1706
+ defp get_registry_snapshot() do
1707
+ snapshot = Registry.snapshot()
1708
+
1709
+ Enum.map(snapshot, fn entry(name: name, pid: pid, ref: ref, meta: meta, clock: clock) ->
1710
+ entry(name: name, pid: pid, ref: ref, meta: meta, clock: Clock.peek(clock))
1711
+ end)
1712
+ end
1380
1713
end
changed mix.exs
 
@@ -4,63 +4,76 @@ otp_release =
4
4
String.split("#{:erlang.system_info(:otp_release)}", ".")
5
5
|> List.first()
6
6
|> String.to_integer()
7
+
7
8
if otp_release < 19 do
8
- IO.warn "Swarm requires Erlang/OTP 19 or greater", []
9
+ IO.warn("Swarm requires Erlang/OTP 19 or greater", [])
9
10
end
10
11
11
12
defmodule Swarm.Mixfile do
12
13
use Mix.Project
13
14
14
15
def project do
15
- [app: :swarm,
16
- version: "3.3.1",
17
- elixir: "~> 1.3",
18
- elixirc_paths: elixirc_paths(Mix.env),
19
- build_embedded: Mix.env == :prod,
20
- start_permanent: Mix.env == :prod,
21
- description: "A fast, multi-master, distributed global process registry, with automatic distribution of worker processes.",
22
- package: package(),
23
- docs: docs(),
24
- deps: deps(),
25
- aliases: aliases(),
26
- dialyzer: [
27
- plt_add_apps: [:inets],
28
- plt_add_deps: :transitive,
29
- flags: ~w(-Wunmatched_returns -Werror_handling -Wrace_conditions -Wunderspecs)
30
- ]]
16
+ [
17
+ app: :swarm,
18
+ version: "3.4.0",
19
+ elixir: "~> 1.3",
20
+ elixirc_paths: elixirc_paths(Mix.env()),
21
+ build_embedded: Mix.env() == :prod,
22
+ start_permanent: Mix.env() == :prod,
23
+ description:
24
+ "A fast, multi-master, distributed global process registry, with automatic distribution of worker processes.",
25
+ package: package(),
26
+ docs: docs(),
27
+ deps: deps(),
28
+ aliases: aliases(),
29
+ dialyzer: [
30
+ plt_add_apps: [:inets],
31
+ plt_add_deps: :transitive,
32
+ flags: ~w(-Wunmatched_returns -Werror_handling -Wrace_conditions -Wunderspecs)
33
+ ]
34
+ ]
31
35
end
32
36
33
37
def application do
34
- [extra_applications: [:logger, :crypto],
35
- mod: {Swarm, []}]
38
+ [extra_applications: [:logger, :crypto], mod: {Swarm, []}]
36
39
end
37
40
38
41
defp deps do
39
- [{:ex_doc, "~> 0.13", only: :dev},
40
- {:dialyxir, "~> 0.3", only: :dev},
41
- {:benchee, "~> 0.4", only: :dev},
42
- {:porcelain, "~> 2.0", only: [:dev, :test]},
43
- {:libring, "~> 1.0"},
44
- {:gen_state_machine, "~> 2.0"}]
42
+ [
43
+ {:ex_doc, "~> 0.13", only: :dev},
44
+ {:dialyxir, "~> 0.3", only: :dev},
45
+ {:benchee, "~> 0.4", only: :dev},
46
+ {:porcelain, "~> 2.0", only: [:dev, :test]},
47
+ {:libring, "~> 1.0"},
48
+ {:gen_state_machine, "~> 2.0"}
49
+ ]
45
50
end
46
51
47
52
defp package do
48
- [files: ["lib", "src", "mix.exs", "README.md", "LICENSE.md"],
49
- maintainers: ["Paul Schoenfelder"],
50
- licenses: ["MIT"],
51
- links: %{ "Github": "https://github.com/bitwalker/swarm" }]
53
+ [
54
+ files: ["lib", "src", "mix.exs", "README.md", "LICENSE.md"],
55
+ maintainers: ["Paul Schoenfelder"],
56
+ licenses: ["MIT"],
57
+ links: %{Github: "https://github.com/bitwalker/swarm"}
58
+ ]
52
59
end
53
60
54
61
defp docs do
55
- [main: "readme",
56
- formatter_opts: [gfm: true],
57
- extras: [
58
- "README.md"
59
- ]]
62
+ [
63
+ main: "readme",
64
+ formatter_opts: [gfm: true],
65
+ extras: [
66
+ "README.md"
67
+ ]
68
+ ]
60
69
end
61
70
62
71
defp aliases() do
63
- ["test": "test --no-start --trace --seed=0"]
72
+ if System.get_env("SWARM_TEST_DEBUG") do
73
+ [test: "test --no-start --trace"]
74
+ else
75
+ [test: "test --no-start"]
76
+ end
64
77
end
65
78
66
79
defp elixirc_paths(:test), do: ["lib", "test/support"]