diff --git a/example/copy_roundrobin.conf b/example/copy_roundrobin.conf index 935d29a45c..a05cabd9bc 100644 --- a/example/copy_roundrobin.conf +++ b/example/copy_roundrobin.conf @@ -1,12 +1,12 @@ - @type dummy + @type sample @label @test tag test.copy auto_increment_key id - @type dummy + @type sample @label @test tag test.rr auto_increment_key id @@ -36,4 +36,4 @@ output_type ltsv - \ No newline at end of file + diff --git a/example/counter.conf b/example/counter.conf index e8fcbaf6a8..0b394991de 100644 --- a/example/counter.conf +++ b/example/counter.conf @@ -8,7 +8,7 @@ - @type dummy + @type sample tag "test.data" auto_increment_key number diff --git a/example/filter_stdout.conf b/example/filter_stdout.conf index 5754fea023..08b582a8fb 100644 --- a/example/filter_stdout.conf +++ b/example/filter_stdout.conf @@ -1,6 +1,6 @@ - @type dummy - tag dummy + @type sample + tag sample diff --git a/example/in_dummy_blocks.conf b/example/in_sample_blocks.conf similarity index 61% rename from example/in_dummy_blocks.conf rename to example/in_sample_blocks.conf index b1635d9425..8977b367d8 100644 --- a/example/in_dummy_blocks.conf +++ b/example/in_sample_blocks.conf @@ -1,11 +1,11 @@ - @type dummy - tag dummy + @type sample + tag sample rate 100 - dummy {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"} + sample {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"} - + @type null never_flush true diff --git a/example/in_dummy_with_compression.conf b/example/in_sample_with_compression.conf similarity index 68% rename from example/in_dummy_with_compression.conf rename to example/in_sample_with_compression.conf index 55dda796e2..d46eaedeee 100644 --- a/example/in_dummy_with_compression.conf +++ b/example/in_sample_with_compression.conf @@ -1,16 +1,16 @@ - @type dummy + @type sample @label @main tag "test.data" size 2 rate 10 - dummy {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"} + sample {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"} auto_increment_key number - @type buffered_stdout + @type stdout @type file path "#{Dir.pwd}/compressed_buffers" diff --git a/example/logevents.conf b/example/logevents.conf index 7b1b55182d..51838b7515 100644 --- a/example/logevents.conf +++ b/example/logevents.conf @@ -1,10 +1,10 @@ - @type dummy - @label @dummylog + @type sample + @label @samplelog tag "data" - dummy {"message":"yay"} + sample {"message":"yay"} - + @type stdout @@ -22,4 +22,4 @@ # hostname_key "host" # # - \ No newline at end of file + diff --git a/example/multi_filters.conf b/example/multi_filters.conf index 0a33d211e8..02a2ab8a7b 100644 --- a/example/multi_filters.conf +++ b/example/multi_filters.conf @@ -1,7 +1,7 @@ # This example is to measure optimized filter pipeline performance. - @type dummy + @type sample tag test size 1000 diff --git a/example/out_exec_filter.conf b/example/out_exec_filter.conf index 5ec0f26c5d..5f6d2f48ae 100644 --- a/example/out_exec_filter.conf +++ b/example/out_exec_filter.conf @@ -1,10 +1,10 @@ - @type dummy + @type sample @label @exec tag exec_input rate 10 auto_increment_key num - dummy {"data":"mydata"} + sample {"data":"mydata"} diff --git a/example/out_forward.conf b/example/out_forward.conf index 276f47d116..a1fe4c2c79 100644 --- a/example/out_forward.conf +++ b/example/out_forward.conf @@ -1,5 +1,5 @@ - @type dummy + @type sample tag test diff --git a/example/out_forward_buf_file.conf b/example/out_forward_buf_file.conf index 299506d1fe..9fffe079f6 100644 --- a/example/out_forward_buf_file.conf +++ b/example/out_forward_buf_file.conf @@ -1,5 +1,5 @@ - @type dummy + @type sample tag test diff --git a/example/out_forward_client.conf b/example/out_forward_client.conf index e202fc9da0..bb07b9da17 100644 --- a/example/out_forward_client.conf +++ b/example/out_forward_client.conf @@ -1,21 +1,21 @@ - @type dummy + @type sample tag test - @type dummy + @type sample tag test2 - @type dummy + @type sample tag test3 - @type dummy + @type sample tag test4 - @type dummy + @type sample tag test5 diff --git a/example/out_forward_heartbeat_none.conf b/example/out_forward_heartbeat_none.conf index 8fa0dbec1b..f51f23e0a6 100644 --- a/example/out_forward_heartbeat_none.conf +++ b/example/out_forward_heartbeat_none.conf @@ -1,5 +1,5 @@ - @type dummy + @type sample tag test diff --git a/example/out_forward_sd.conf b/example/out_forward_sd.conf index 1d5c695262..3503a7cb81 100644 --- a/example/out_forward_sd.conf +++ b/example/out_forward_sd.conf @@ -1,5 +1,5 @@ - @type dummy + @type sample tag test diff --git a/example/out_forward_shared_key.conf b/example/out_forward_shared_key.conf index 99c680eaf3..142e610e3d 100644 --- a/example/out_forward_shared_key.conf +++ b/example/out_forward_shared_key.conf @@ -1,9 +1,9 @@ - @type dummy + @type sample tag test - @type dummy + @type sample tag test2 diff --git a/example/out_forward_tls.conf b/example/out_forward_tls.conf index d6dfb8a719..b852767228 100644 --- a/example/out_forward_tls.conf +++ b/example/out_forward_tls.conf @@ -1,5 +1,5 @@ - @type dummy + @type sample tag test diff --git a/example/out_forward_users.conf b/example/out_forward_users.conf index a21af6d4a4..2769e06832 100644 --- a/example/out_forward_users.conf +++ b/example/out_forward_users.conf @@ -1,13 +1,13 @@ - @type dummy + @type sample tag test - @type dummy + @type sample tag test2 - @type dummy + @type sample tag test3 diff --git a/example/out_null.conf b/example/out_null.conf index 0c13dd4769..ba3d86be3a 100644 --- a/example/out_null.conf +++ b/example/out_null.conf @@ -2,17 +2,17 @@ # bundle exec bin/fluentd -c example/out_buffered_null.conf # (+ --emit-error-log-interval 10) - @type dummy - tag dummy + @type sample + tag sample rate 500000000 - dummy [ + sample [ {"message": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, {"message": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}, {"message": "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"} ] - + @type null flush_interval 60s diff --git a/example/secondary_file.conf b/example/secondary_file.conf index 6b94e860eb..049097e59f 100644 --- a/example/secondary_file.conf +++ b/example/secondary_file.conf @@ -3,7 +3,7 @@ - @type dummy + @type sample tag test diff --git a/lib/fluent/plugin/in_dummy.rb b/lib/fluent/plugin/in_dummy.rb index 9338a59e72..c8078081e9 100644 --- a/lib/fluent/plugin/in_dummy.rb +++ b/lib/fluent/plugin/in_dummy.rb @@ -14,126 +14,5 @@ # limitations under the License. # -require 'json' - -require 'fluent/plugin/input' -require 'fluent/config/error' - -module Fluent::Plugin - class DummyInput < Input - Fluent::Plugin.register_input('dummy', self) - - helpers :thread, :storage - - BIN_NUM = 10 - DEFAULT_STORAGE_TYPE = 'local' - - desc "The value is the tag assigned to the generated events." - config_param :tag, :string - desc "The number of events in event stream of each emits." - config_param :size, :integer, default: 1 - desc "It configures how many events to generate per second." - config_param :rate, :integer, default: 1 - desc "If specified, each generated event has an auto-incremented key field." - config_param :auto_increment_key, :string, default: nil - desc "The boolean to suspend-and-resume incremental value after restart" - config_param :suspend, :bool, default: false,deprecated: 'This parameters is ignored' - desc "The dummy data to be generated. An array of JSON hashes or a single JSON hash." - config_param :dummy, default: [{"message"=>"dummy"}] do |val| - begin - parsed = JSON.parse(val) - rescue JSON::ParserError => ex - # Fluent::ConfigParseError, "got incomplete JSON" will be raised - # at literal_parser.rb with --use-v1-config, but I had to - # take care at here for the case of --use-v0-config. - raise Fluent::ConfigError, "#{ex.class}: #{ex.message}" - end - dummy = parsed.is_a?(Array) ? parsed : [parsed] - dummy.each_with_index do |e, i| - raise Fluent::ConfigError, "#{i}th element of dummy, #{e}, is not a hash" unless e.is_a?(Hash) - end - dummy - end - - def initialize - super - @storage = nil - end - - def configure(conf) - super - @dummy_index = 0 - config = conf.elements.select{|e| e.name == 'storage' }.first - @storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE) - end - - def multi_workers_ready? - true - end - - def start - super - - @storage.put(:increment_value, 0) unless @storage.get(:increment_value) - @storage.put(:dummy_index, 0) unless @storage.get(:dummy_index) - - if @auto_increment_key && !@storage.get(:auto_increment_value) - @storage.put(:auto_increment_value, -1) - end - - thread_create(:dummy_input, &method(:run)) - end - - def run - batch_num = (@rate / BIN_NUM).to_i - residual_num = (@rate % BIN_NUM) - while thread_current_running? - current_time = Time.now.to_i - BIN_NUM.times do - break unless (thread_current_running? && Time.now.to_i <= current_time) - wait(0.1) { emit(batch_num) } - end - emit(residual_num) if thread_current_running? - # wait for next second - while thread_current_running? && Time.now.to_i <= current_time - sleep 0.01 - end - end - end - - def emit(num) - begin - if @size > 1 - num.times do - router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] }) - end - else - num.times { router.emit(@tag, Fluent::EventTime.now, generate) } - end - rescue => _ - # ignore all errors not to stop emits by emit errors - end - end - - def generate - d = @dummy[@dummy_index] - unless d - @dummy_index = 0 - d = @dummy[@dummy_index] - end - @dummy_index += 1 - if @auto_increment_key - d = d.dup - d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 } - end - d - end - - def wait(time) - start_time = Time.now - yield - sleep_time = time - (Time.now - start_time) - sleep sleep_time if sleep_time > 0 - end - end -end +# Remove this file in fluentd v2 +require_relative 'in_sample' diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 8385587211..06fea2ec9a 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -72,7 +72,7 @@ class HttpInput < Input desc 'Add REMOTE_ADDR header to the record.' config_param :add_remote_addr, :bool, default: false config_param :blocking_timeout, :time, default: 0.5 - desc 'Set a white list of domains that can do CORS (Cross-Origin Resource Sharing)' + desc 'Set a allow list of domains that can do CORS (Cross-Origin Resource Sharing)' config_param :cors_allow_origins, :array, default: nil desc 'Respond with empty gif image of 1x1 pixel.' config_param :respond_with_empty_img, :bool, default: false @@ -490,7 +490,7 @@ def on_message_complete # CORS check # ========== # For every incoming request, we check if we have some CORS - # restrictions and white listed origins through @cors_allow_origins. + # restrictions and allow listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') or include_cors_allow_origin send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") diff --git a/lib/fluent/plugin/in_sample.rb b/lib/fluent/plugin/in_sample.rb new file mode 100644 index 0000000000..6f36762e03 --- /dev/null +++ b/lib/fluent/plugin/in_sample.rb @@ -0,0 +1,141 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'json' + +require 'fluent/plugin/input' +require 'fluent/config/error' + +module Fluent::Plugin + class SampleInput < Input + Fluent::Plugin.register_input('sample', self) + Fluent::Plugin.register_input('dummy', self) + + helpers :thread, :storage + + BIN_NUM = 10 + DEFAULT_STORAGE_TYPE = 'local' + + desc "The value is the tag assigned to the generated events." + config_param :tag, :string + desc "The number of events in event stream of each emits." + config_param :size, :integer, default: 1 + desc "It configures how many events to generate per second." + config_param :rate, :integer, default: 1 + desc "If specified, each generated event has an auto-incremented key field." + config_param :auto_increment_key, :string, default: nil + desc "The boolean to suspend-and-resume incremental value after restart" + config_param :suspend, :bool, default: false,deprecated: 'This parameters is ignored' + desc "The sample data to be generated. An array of JSON hashes or a single JSON hash." + config_param :sample, alias: :dummy, default: [{"message" => "sample"}] do |val| + begin + parsed = JSON.parse(val) + rescue JSON::ParserError => ex + # Fluent::ConfigParseError, "got incomplete JSON" will be raised + # at literal_parser.rb with --use-v1-config, but I had to + # take care at here for the case of --use-v0-config. + raise Fluent::ConfigError, "#{ex.class}: #{ex.message}" + end + sample = parsed.is_a?(Array) ? parsed : [parsed] + sample.each_with_index do |e, i| + raise Fluent::ConfigError, "#{i}th element of sample, #{e}, is not a hash" unless e.is_a?(Hash) + end + sample + end + + def initialize + super + @storage = nil + end + + def configure(conf) + super + @sample_index = 0 + config = conf.elements.select{|e| e.name == 'storage' }.first + @storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE) + end + + def multi_workers_ready? + true + end + + def start + super + + @storage.put(:increment_value, 0) unless @storage.get(:increment_value) + # keep 'dummy' to avoid breaking changes for existing environment. Change it in fluentd v2 + @storage.put(:dummy_index, 0) unless @storage.get(:dummy_index) + + if @auto_increment_key && !@storage.get(:auto_increment_value) + @storage.put(:auto_increment_value, -1) + end + + thread_create(:sample_input, &method(:run)) + end + + def run + batch_num = (@rate / BIN_NUM).to_i + residual_num = (@rate % BIN_NUM) + while thread_current_running? + current_time = Time.now.to_i + BIN_NUM.times do + break unless (thread_current_running? && Time.now.to_i <= current_time) + wait(0.1) { emit(batch_num) } + end + emit(residual_num) if thread_current_running? + # wait for next second + while thread_current_running? && Time.now.to_i <= current_time + sleep 0.01 + end + end + end + + def emit(num) + begin + if @size > 1 + num.times do + router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] }) + end + else + num.times { router.emit(@tag, Fluent::EventTime.now, generate) } + end + rescue => _ + # ignore all errors not to stop emits by emit errors + end + end + + def generate + d = @sample[@sample_index] + unless d + @sample_index = 0 + d = @sample[@sample_index] + end + @sample_index += 1 + if @auto_increment_key + d = d.dup + d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 } + end + d + end + + def wait(time) + start_time = Time.now + yield + sleep_time = time - (Time.now - start_time) + sleep sleep_time if sleep_time > 0 + end + end +end diff --git a/test/plugin/test_in_dummy.rb b/test/plugin/test_in_sample.rb similarity index 77% rename from test/plugin/test_in_dummy.rb rename to test/plugin/test_in_sample.rb index 594d5facf0..be5aa48368 100644 --- a/test/plugin/test_in_dummy.rb +++ b/test/plugin/test_in_sample.rb @@ -1,33 +1,33 @@ require_relative '../helper' require 'fluent/test/driver/input' -require 'fluent/plugin/in_dummy' +require 'fluent/plugin/in_sample' require 'fileutils' -class DummyTest < Test::Unit::TestCase +class SampleTest < Test::Unit::TestCase def setup Fluent::Test.setup end def create_driver(conf) - Fluent::Test::Driver::Input.new(Fluent::Plugin::DummyInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::SampleInput).configure(conf) end sub_test_case 'configure' do test 'required parameters' do assert_raise_message("'tag' parameter is required") do - Fluent::Plugin::DummyInput.new.configure(config_element('ROOT','')) + Fluent::Plugin::SampleInput.new.configure(config_element('ROOT','')) end end test 'tag' do d = create_driver(%[ - tag dummy + tag sample ]) - assert_equal "dummy", d.instance.tag + assert_equal "sample", d.instance.tag end config = %[ - tag dummy + tag sample ] test 'auto_increment_key' do @@ -44,30 +44,30 @@ def create_driver(conf) assert_equal 10, d.instance.rate end - test 'dummy' do + test 'sample' do # hash is okay - d = create_driver(config + %[dummy {"foo":"bar"}]) - assert_equal [{"foo"=>"bar"}], d.instance.dummy + d = create_driver(config + %[sample {"foo":"bar"}]) + assert_equal [{"foo"=>"bar"}], d.instance.sample # array of hash is okay - d = create_driver(config + %[dummy [{"foo":"bar"}]]) - assert_equal [{"foo"=>"bar"}], d.instance.dummy + d = create_driver(config + %[sample [{"foo":"bar"}]]) + assert_equal [{"foo"=>"bar"}], d.instance.sample assert_raise_message(/JSON::ParserError|got incomplete JSON/) do - create_driver(config + %[dummy "foo"]) + create_driver(config + %[sample "foo"]) end assert_raise_message(/is not a hash/) do - create_driver(config + %[dummy ["foo"]]) + create_driver(config + %[sample ["foo"]]) end end end sub_test_case "emit" do config = %[ - tag dummy + tag sample rate 10 - dummy {"foo":"bar"} + sample {"foo":"bar"} ] test 'simple' do @@ -75,7 +75,7 @@ def create_driver(conf) d.run(timeout: 0.5) d.events.each do |tag, time, record| - assert_equal("dummy", tag) + assert_equal("sample", tag) assert_equal({"foo"=>"bar"}, record) assert(time.is_a?(Fluent::EventTime)) end @@ -86,20 +86,20 @@ def create_driver(conf) d.run(timeout: 0.5) d.events.each_with_index do |(tag, _time, record), i| - assert_equal("dummy", tag) + assert_equal("sample", tag) assert_equal({"foo"=>"bar", "id"=>i}, record) end end end - TEST_PLUGIN_STORAGE_PATH = File.join( File.dirname(File.dirname(__FILE__)), 'tmp', 'in_dummy', 'store' ) + TEST_PLUGIN_STORAGE_PATH = File.join( File.dirname(File.dirname(__FILE__)), 'tmp', 'in_sample', 'store' ) FileUtils.mkdir_p TEST_PLUGIN_STORAGE_PATH - sub_test_case 'when dummy plugin has storage which is not specified the path' do + sub_test_case 'when sample plugin has storage which is not specified the path' do config1 = { - 'tag' => 'dummy', + 'tag' => 'sample', 'rate' => '0', - 'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]', + 'sample' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]', 'auto_increment_key' => 'id', } conf1 = config_element('ROOT', '', config1, []) @@ -135,7 +135,7 @@ def create_driver(conf) end end - sub_test_case 'when dummy plugin has storage which is specified the path' do + sub_test_case 'when sample plugin has storage which is specified the path' do setup do FileUtils.rm_rf(TEST_PLUGIN_STORAGE_PATH) FileUtils.mkdir_p(File.join(TEST_PLUGIN_STORAGE_PATH, 'json')) @@ -144,9 +144,9 @@ def create_driver(conf) config2 = { '@id' => 'test-02', - 'tag' => 'dummy', + 'tag' => 'sample', 'rate' => '0', - 'dummy' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]', + 'sample' => '[{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}]', 'auto_increment_key' => 'id', } conf2 = config_element('ROOT', '', config2, [ diff --git a/test/test_static_config_analysis.rb b/test/test_static_config_analysis.rb index 50477db2f2..d2792e4329 100644 --- a/test/test_static_config_analysis.rb +++ b/test/test_static_config_analysis.rb @@ -6,7 +6,7 @@ require 'fluent/plugin/out_stdout' require 'fluent/plugin/out_exec' require 'fluent/plugin/in_forward' -require 'fluent/plugin/in_dummy' +require 'fluent/plugin/in_sample' require 'fluent/plugin/filter_grep' require 'fluent/plugin/filter_stdout' require 'fluent/plugin/filter_parser' @@ -74,7 +74,7 @@ class StaticConfigAnalysisTest < ::Test::Unit::TestCase c = Fluent::Config.parse(conf_data, '(test)', '(test_dir)', true) ret = Fluent::StaticConfigAnalysis.call(c) assert_equal [Fluent::Plugin::ExecOutput, Fluent::Plugin::StdoutOutput, Fluent::Plugin::ForwardOutput], ret.outputs.map(&:plugin).map(&:class) - assert_equal [Fluent::Plugin::DummyInput, Fluent::Plugin::ForwardInput], ret.inputs.map(&:plugin).map(&:class) + assert_equal [Fluent::Plugin::SampleInput, Fluent::Plugin::ForwardInput], ret.inputs.map(&:plugin).map(&:class) assert_equal [Fluent::Plugin::ParserFilter, Fluent::Plugin::StdoutFilter, Fluent::Plugin::GrepFilter], ret.filters.map(&:plugin).map(&:class) assert_equal 1, ret.labels.size assert_equal '@test', ret.labels[0].name