-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathcluster.rb
251 lines (230 loc) · 9.7 KB
/
cluster.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# frozen_string_literal: true
module ElastomerClient
class Client
# Returns a Cluster instance.
def cluster
@cluster ||= Cluster.new self
end
class Cluster
# Create a new cluster client for making API requests that pertain to
# the cluster health and management.
#
# client - ElastomerClient::Client used for HTTP requests to the server
#
def initialize(client)
@client = client
end
attr_reader :client
# Simple status on the health of the cluster. The API can also be executed
# against one or more indices to get just the specified indices health.
#
# params - Parameters Hash
# :index - a single index name or an Array of index names
# :level - one of "cluster", "indices", or "shards"
# :wait_for_status - one of "green", "yellow", or "red"
# :wait_for_relocating_shards - a number controlling to how many relocating shards to wait for
# :wait_for_nodes - the request waits until the specified number N of nodes is available
# :timeout - how long to wait [default is "30s"]
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html
#
# Returns the response as a Hash
def health(params = {})
response = client.get "/_cluster/health{/index}", params.merge(action: "cluster.health", rest_api: "cluster.health")
response.body
end
# Comprehensive state information of the whole cluster. For 1.x metric
# and index filtering, use the :metrics and :indices parameter keys.
#
# The list of available metrics are:
# version, master_node, nodes, routing_table, metadata, blocks
#
# params - Parameters Hash
# :metrics - list of metrics to select as an Array
# :indices - a single index name or an Array of index names
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-state.html
#
# Returns the response as a Hash
def state(params = {})
response = client.get "/_cluster/state{/metrics}{/indices}", params.merge(action: "cluster.state", rest_api: "cluster.state")
response.body
end
# Retrieve statistics from a cluster wide perspective. The API returns
# basic index metrics (shard numbers, store size, memory usage) and
# information about the current nodes that form the cluster (number,
# roles, os, jvm versions, memory usage, cpu and installed plugins).
#
# params - Parameters Hash
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html
#
# Returns the response as a Hash
def stats(params = {})
response = client.get "/_cluster/stats", params.merge(action: "cluster.stats", rest_api: "cluster.stats")
response.body
end
# Returns a list of any cluster-level changes (e.g. create index, update
# mapping, allocate or fail shard) which have not yet been executed.
#
# params - Parameters Hash
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-pending.html
#
# Returns the response as a Hash
def pending_tasks(params = {})
response = client.get "/_cluster/pending_tasks", params.merge(action: "cluster.pending_tasks", rest_api: "cluster.pending_tasks")
response.body
end
# Returns `true` if there items in the pending task list. Returns `false`
# if the pending task list is empty. Returns `nil` if the response body
# does not contain the "tasks" field.
def pending_tasks?
hash = pending_tasks
return nil unless hash.key? "tasks" # rubocop:disable Style/ReturnNilInPredicateMethodDefinition
hash["tasks"].length > 0
end
# Cluster wide settings that have been modified via the update API.
#
# params - Parameters Hash
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html
#
# Returns the response as a Hash
def get_settings(params = {})
response = client.get "/_cluster/settings", params.merge(action: "cluster.get_settings", rest_api: "cluster.get_settings")
response.body
end
alias_method :settings, :get_settings
# Update cluster wide specific settings. Settings updated can either be
# persistent (applied cross restarts) or transient (will not survive a
# full cluster restart).
#
# body - The new settings as a Hash or a JSON encoded String
# params - Parameters Hash
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html
#
# Returns the response as a Hash
def update_settings(body, params = {})
response = client.put "/_cluster/settings", params.merge(body:, action: "cluster.update_settings", rest_api: "cluster.put_settings")
response.body
end
# Explicitly execute a cluster reroute allocation command. For example,
# a shard can be moved from one node to another explicitly, an
# allocation can be canceled, or an unassigned shard can be explicitly
# allocated on a specific node.
#
# commands - A command Hash or an Array of command Hashes
# params - Parameters Hash
#
# Examples
#
# reroute(move: { index: 'test', shard: 0, from_node: 'node1', to_node: 'node2' })
#
# reroute([
# { move: { index: 'test', shard: 0, from_node: 'node1', to_node: 'node2' }},
# { allocate: { index: 'test', shard: 1, node: 'node3' }}
# ])
#
# reroute(commands: [
# { move: { index: 'test', shard: 0, from_node: 'node1', to_node: 'node2' }},
# { allocate: { index: 'test', shard: 1, node: 'node3' }}
# ])
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-reroute.html
#
# Returns the response as a Hash
def reroute(commands, params = {})
if commands.is_a?(Hash) && commands.key?(:commands)
body = commands
elsif commands.is_a?(Hash)
# Array() on a Hash does not do what you think it does - that is why
# we are explicitly wrapping the Hash via [commands] here.
body = {commands: [commands]}
else
body = {commands: Array(commands)}
end
response = client.post "/_cluster/reroute", params.merge(body:, action: "cluster.reroute", rest_api: "cluster.reroute")
response.body
end
# Retrieve the current aliases. An :index name can be given (or an
# array of index names) to get just the aliases for those indexes. You
# can also use the alias name here since it is acting the part of an
# index.
#
# params - Parameters Hash
# :index - an index name or Array of index names
# :name - an alias name or Array of alias names
#
# Examples
#
# get_aliases
# get_aliases( index: 'users' )
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html
#
# Returns the response body as a Hash
def get_aliases(params = {})
response = client.get "{/index}/_alias{/name}", params.merge(action: "cluster.get_aliases", rest_api: "indices.get_alias")
response.body
end
alias_method :aliases, :get_aliases
# Perform an aliases action on the cluster. We are just a teensy bit
# clever here in that a single action can be given or an array of
# actions. This API method will wrap the request in the appropriate
# {actions: [...]} body construct.
#
# actions - An action Hash or an Array of action Hashes
# params - Parameters Hash
#
# Examples
#
# update_aliases(add: { index: 'users-1', alias: 'users' })
#
# update_aliases([
# { remove: { index: 'users-1', alias: 'users' }},
# { add: { index: 'users-2', alias: 'users' }}
# ])
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html
#
# Returns the response body as a Hash
def update_aliases(actions, params = {})
if actions.is_a?(Hash) && actions.key?(:actions)
body = actions
elsif actions.is_a?(Hash)
# Array() on a Hash does not do what you think it does - that is why
# we are explicitly wrapping the Hash via [actions] here.
body = {actions: [actions]}
else
body = {actions: Array(actions)}
end
response = client.post "/_aliases", params.merge(body:, action: "cluster.update_aliases", rest_api: "indices.update_aliases")
response.body
end
# List all templates currently defined. This is just a convenience method
# around the `state` call that extracts and returns the templates section.
#
# Returns the template definitions as a Hash
def templates
state(metrics: "metadata").dig("metadata", "templates")
end
# List all indices currently defined. This is just a convenience method
# around the `state` call that extracts and returns the indices section.
#
# Returns the indices definitions as a Hash
def indices
state(metrics: "metadata").dig("metadata", "indices")
end
# List all nodes currently part of the cluster. This is just a convenience
# method around the `state` call that extracts and returns the nodes
# section.
#
# Returns the nodes definitions as a Hash
def nodes
state(metrics: "nodes").dig("nodes")
end
end
end
end