-
Notifications
You must be signed in to change notification settings - Fork 369
/
Copy pathsyncer.rb
221 lines (194 loc) · 8.12 KB
/
syncer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
module Chewy
class Index
# This class is able to find missing and outdated documents in the ES
# comparing ids from the data source and the ES index. Also, if `outdated_sync_field`
# exists in the index definition, it performs comparison of this field
# values for each source object and corresponding ES document. Usually,
# this field is `updated_at` and if its value in the source is not equal
# to the value in the index - this means that this document outdated and
# should be reindexed.
#
# To fetch necessary data from the source it uses adapter method
# {Chewy::Index::Adapter::Base#import_fields}, in case when the Object
# adapter is used it makes sense to read corresponding documentation.
#
# If `parallel` option is passed to the initializer - it will fetch source and
# index data in parallel and then perform outdated objects calculation in
# parallel processes. Also, further import (if required) will be performed
# in parallel as well.
#
# @note
# In rails 4.0 time converted to json with the precision of seconds
# without milliseconds used, so outdated check is not so precise there.
#
# ATTENTION: synchronization may be slow in case when synchronized tables
# are missing compound index on primary key and `outdated_sync_field`.
#
# @see Chewy::Index::Actions::ClassMethods#sync
class Syncer
DEFAULT_SYNC_BATCH_SIZE = 20_000
ISO_DATETIME = /\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/
OUTDATED_IDS_WORKER = lambda do |outdated_sync_field_type, source_data_hash, index, total, index_data|
::Process.setproctitle("chewy [#{index}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if index
index_data.each_with_object([]) do |(id, index_sync_value), result|
next unless source_data_hash[id]
outdated = if outdated_sync_field_type == 'date'
!Chewy::Index::Syncer.dates_equal(typecast_date(source_data_hash[id]), Time.iso8601(index_sync_value))
else
source_data_hash[id] != index_sync_value
end
result.push(id) if outdated
end
end
SOURCE_OR_INDEX_DATA_WORKER = lambda do |syncer, index, kind|
::Process.setproctitle("chewy [#{index}]: sync fetching data (#{kind})")
result = case kind
when :source
syncer.send(:fetch_source_data)
when :index
syncer.send(:fetch_index_data)
end
{kind => result}
end
def self.typecast_date(string)
if string.is_a?(String) && (match = ISO_DATETIME.match(string))
microsec = (match[7].to_r * 1_000_000).to_i
day = "#{match[1]}-#{match[2]}-#{match[3]}"
time_with_seconds = "#{match[4]}:#{match[5]}:#{match[6]}"
microseconds = format('%06d', microsec)
date = "#{day}T#{time_with_seconds}.#{microseconds}+00:00"
Time.iso8601(date)
else
string
end
end
# Compares times with ms precision.
def self.dates_equal(one, two)
[one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000]
end
# @param index [Chewy::Index] chewy index
# @param parallel [true, Integer, Hash] options for parallel execution or the number of processes
def initialize(index, parallel: nil)
@index = index
@parallel = if !parallel || parallel.is_a?(Hash)
parallel
elsif parallel.is_a?(Integer)
{in_processes: parallel}
else
{}
end
end
# Finds all the missing and outdated ids and performs import for them.
#
# @return [Integer, nil] the amount of missing and outdated documents reindexed, nil in case of errors
def perform
ids = missing_ids | outdated_ids
return 0 if ids.blank?
@index.import(ids, parallel: @parallel) && ids.count
end
# Finds ids of all the objects that are not indexed yet or deleted
# from the source already.
#
# @return [Array<String>] an array of missing ids from both sides
def missing_ids
return [] if source_data.blank?
@missing_ids ||= begin
source_data_ids = data_ids(source_data)
index_data_ids = data_ids(index_data)
(source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids)
end
end
# If index supports outdated sync, it compares the values of the
# `outdated_sync_field` for each object and document in the source
# and index and returns the ids of entities which differ.
#
# @see Chewy::Index::Mapping::ClassMethods#supports_outdated_sync?
# @return [Array<String>] an array of outdated ids
def outdated_ids
return [] if source_data.blank? || index_data.blank? || !@index.supports_outdated_sync?
@outdated_ids ||= if @parallel
parallel_outdated_ids
else
linear_outdated_ids
end
end
private
def source_data
@source_data ||= source_and_index_data.first
end
def index_data
@index_data ||= source_and_index_data.second
end
def source_and_index_data
@source_and_index_data ||= if @parallel
::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self, @index])
::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
if result.first.keys.first == :source
[result.first.values.first, result.second.values.first]
else
[result.second.values.first, result.first.values.first]
end
else
[fetch_source_data, fetch_index_data]
end
end
def fetch_source_data
if @index.supports_outdated_sync?
import_fields_args = {
fields: [@index.outdated_sync_field],
batch_size: DEFAULT_SYNC_BATCH_SIZE,
typecast: false
}
@index.adapter.import_fields(import_fields_args).to_a.flatten(1).each do |data|
data[0] = data[0].to_s
end
else
@index.adapter.import_fields(batch_size: DEFAULT_SYNC_BATCH_SIZE, typecast: false).to_a.flatten(1).map(&:to_s)
end
end
def fetch_index_data
if @index.supports_outdated_sync?
@index.pluck(:_id, @index.outdated_sync_field).each do |data|
data[0] = data[0].to_s
end
else
@index.pluck(:_id).map(&:to_s)
end
end
def data_ids(data)
return data unless @index.supports_outdated_sync?
data.map(&:first)
end
def linear_outdated_ids
OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, nil, nil, index_data)
end
def parallel_outdated_ids
size = processor_count.zero? ? index_data.size : (index_data.size / processor_count.to_f).ceil
batches = index_data.each_slice(size)
::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
curried_outdated_ids_worker = OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h, @index, batches.size]
result = ::Parallel.map(
batches,
@parallel,
&curried_outdated_ids_worker
).flatten(1)
::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
result
end
def processor_count
@processor_count ||= @parallel[:in_processes] || @parallel[:in_threads] || ::Parallel.processor_count
end
def outdated_sync_field_type
return @outdated_sync_field_type if instance_variable_defined?(:@outdated_sync_field_type)
return unless @index.outdated_sync_field
mappings = @index.client.indices.get_mapping(index: @index.index_name).values.first.fetch('mappings', {})
@outdated_sync_field_type = mappings
.fetch('properties', {})
.fetch(@index.outdated_sync_field.to_s, {})['type']
rescue Elastic::Transport::Transport::Errors::NotFound
nil
end
end
end
end