Skip to content

Commit

Permalink
Merge pull request #2937 from fluent/add-ignore_repeated_log_interval
Browse files Browse the repository at this point in the history
log: Add ignore_repeated_log_interval parameter
  • Loading branch information
repeatedly authored Apr 10, 2020
2 parents 8fcd150 + 033a248 commit 0e901e7
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 10 deletions.
51 changes: 45 additions & 6 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def initialize(logger, opts={})
@optional_attrs = nil

@suppress_repeated_stacktrace = opts[:suppress_repeated_stacktrace]
@ignore_repeated_log_interval = opts[:ignore_repeated_log_interval]

@process_type = opts[:process_type] # :supervisor, :worker0, :workers Or :standalone
@process_type ||= :standalone # to keep behavior of existing code
Expand Down Expand Up @@ -139,7 +140,8 @@ def dup
dl_opts = {}
dl_opts[:log_level] = @level - 1
logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type, worker_id: @worker_id)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace, process_type: @process_type,
worker_id: @worker_id, ignore_repeated_log_interval: @ignore_repeated_log_interval)
clone.format = @format
clone.time_format = @time_format
clone.log_event_enabled = @log_event_enabled
Expand All @@ -149,7 +151,7 @@ def dup

attr_reader :format
attr_reader :time_format
attr_accessor :log_event_enabled
attr_accessor :log_event_enabled, :ignore_repeated_log_interval
attr_accessor :out
attr_accessor :level
attr_accessor :optional_header, :optional_attrs
Expand Down Expand Up @@ -278,6 +280,7 @@ def trace(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:trace, args)
return if time.nil?
puts [@color_trace, @formatter.call(type, time, LEVEL_TRACE, msg), @color_reset].join
rescue
# logger should not raise an exception. This rescue prevents unexpected behaviour.
Expand All @@ -299,6 +302,7 @@ def debug(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:debug, args)
return if time.nil?
puts [@color_debug, @formatter.call(type, time, LEVEL_DEBUG, msg), @color_reset].join
rescue
end
Expand All @@ -319,6 +323,7 @@ def info(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:info, args)
return if time.nil?
puts [@color_info, @formatter.call(type, time, LEVEL_INFO, msg), @color_reset].join
rescue
end
Expand All @@ -339,6 +344,7 @@ def warn(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:warn, args)
return if time.nil?
puts [@color_warn, @formatter.call(type, time, LEVEL_WARN, msg), @color_reset].join
rescue
end
Expand All @@ -359,6 +365,7 @@ def error(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:error, args)
return if time.nil?
puts [@color_error, @formatter.call(type, time, LEVEL_ERROR, msg), @color_reset].join
rescue
end
Expand All @@ -379,6 +386,7 @@ def fatal(*args, &block)
return if skipped_type?(type)
args << block.call if block
time, msg = event(:fatal, args)
return if time.nil?
puts [@color_fatal, @formatter.call(type, time, LEVEL_FATAL, msg), @color_reset].join
rescue
end
Expand Down Expand Up @@ -412,20 +420,37 @@ def reset
@out.reset if @out.respond_to?(:reset)
end

CachedLog = Struct.new(:msg, :time)

def ignore_repeated_log?(key, time, message)
cached_log = Thread.current[key]
return false if cached_log.nil?
(cached_log.msg == message) && (time - cached_log.time <= @ignore_repeated_log_interval)
end

def suppress_stacktrace?(backtrace)
cached_log = Thread.current[:last_repeated_stacktrace]
return false if cached_log.nil?
cached_log.msg == backtrace
end

def dump_stacktrace(type, backtrace, level)
return if @level > level

time = Time.now

if @format == :text
line = caller_line(type, time, 5, level)
if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace)
if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
return
elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
puts [" ", line, 'suppressed same stacktrace'].join
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
else
backtrace.each { |msg|
puts [" ", line, msg].join
}
Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
end
else
r = {
Expand All @@ -436,11 +461,14 @@ def dump_stacktrace(type, backtrace, level)
r['worker_id'] = wid
end

if @suppress_repeated_stacktrace && (Thread.current[:last_repeated_stacktrace] == backtrace)
if @ignore_repeated_log_interval && ignore_repeated_log?(:last_repeated_stacktrace, time, backtrace)
return
elsif @suppress_repeated_stacktrace && suppress_stacktrace?(backtrace)
r['message'] = 'suppressed same stacktrace'
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @ignore_repeated_log_interval
else
r['message'] = backtrace.join("\n")
Thread.current[:last_repeated_stacktrace] = backtrace if @suppress_repeated_stacktrace
Thread.current[:last_repeated_stacktrace] = CachedLog.new(backtrace, time) if @suppress_repeated_stacktrace
end

puts Yajl.dump(r)
Expand Down Expand Up @@ -479,6 +507,14 @@ def event(level, args)
end
}

if @ignore_repeated_log_interval
if ignore_repeated_log?(:last_repeated_log, time, message)
return nil, nil
else
Thread.current[:last_repeated_log] = CachedLog.new(message, time)
end
end

if @log_event_enabled && !@threads_exclude_events.include?(Thread.current)
record = map.dup
record.keys.each {|key|
Expand Down Expand Up @@ -530,6 +566,9 @@ def initialize(logger)
if logger.instance_variable_defined?(:@suppress_repeated_stacktrace)
@suppress_repeated_stacktrace = logger.instance_variable_get(:@suppress_repeated_stacktrace)
end
if logger.instance_variable_defined?(:@ignore_repeated_log_interval)
@ignore_repeated_log_interval = logger.instance_variable_get(:@ignore_repeated_log_interval)
end

self.format = @logger.format
self.time_format = @logger.time_format
Expand Down
12 changes: 9 additions & 3 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,15 @@ def self.load_config(path, params = {})

log_level = params['log_level']
suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']
ignore_repeated_log_interval = params['ignore_repeated_log_interval']

log_path = params['log_path']
chuser = params['chuser']
chgroup = params['chgroup']
log_rotate_age = params['log_rotate_age']
log_rotate_size = params['log_rotate_size']

log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace}
log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval}
logger_initializer = Supervisor::LoggerInitializer.new(
log_path, log_level, chuser, chgroup, log_opts,
log_rotate_age: log_rotate_age,
Expand Down Expand Up @@ -345,6 +346,7 @@ def self.load_config(path, params = {})
chgroup: chgroup,
chumask: 0,
suppress_repeated_stacktrace: suppress_repeated_stacktrace,
ignore_repeated_log_interval: ignore_repeated_log_interval,
daemonize: daemonize,
rpc_endpoint: params['rpc_endpoint'],
counter_server: params['counter_server'],
Expand Down Expand Up @@ -439,9 +441,10 @@ def reopen!
self
end

def apply_options(format: nil, time_format: nil, log_dir_perm: nil)
def apply_options(format: nil, time_format: nil, log_dir_perm: nil, ignore_repeated_log_interval: nil)
$log.format = format if format
$log.time_format = time_format if time_format
$log.ignore_repeated_log_interval = ignore_repeated_log_interval if ignore_repeated_log_interval

if @path && log_dir_perm
File.chmod(log_dir_perm || 0755, File.dirname(@path))
Expand All @@ -468,6 +471,7 @@ def self.default_options
root_dir: nil,
suppress_interval: 0,
suppress_repeated_stacktrace: true,
ignore_repeated_log_interval: nil,
without_source: nil,
use_v1_config: true,
strict_config_value: nil,
Expand Down Expand Up @@ -507,7 +511,7 @@ def initialize(opt)
@cl_opt = opt
@conf = nil

log_opts = { suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace] }
log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace], ignore_repeated_log_interval: opt[:ignore_repeated_log_interval]}
@log = LoggerInitializer.new(
@log_path, opt[:log_level], @chuser, @chgroup, log_opts,
log_rotate_age: @log_rotate_age,
Expand Down Expand Up @@ -628,6 +632,7 @@ def configure(supervisor: false)
format: @system_config.log.format,
time_format: @system_config.log.time_format,
log_dir_perm: @system_config.dir_permission,
ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval
)

$log.info :supervisor, 'parsing config file is succeeded', path: @config_path
Expand Down Expand Up @@ -690,6 +695,7 @@ def supervise
'root_dir' => @system_config.root_dir,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
'ignore_repeated_log_interval' => @system_config.ignore_repeated_log_interval,
'rpc_endpoint' => @system_config.rpc_endpoint,
'enable_get_dump' => @system_config.enable_get_dump,
'counter_server' => @system_config.counter_server,
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SystemConfig
SYSTEM_CONFIG_PARAMETERS = [
:workers, :root_dir, :log_level,
:suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump,
:log_event_verbose,
:log_event_verbose, :ignore_repeated_log_interval,
:without_source, :rpc_endpoint, :enable_get_dump, :process_name,
:file_permission, :dir_permission, :counter_server, :counter_client,
:strict_config_value, :enable_msgpack_time_support
Expand All @@ -34,6 +34,7 @@ class SystemConfig
config_param :root_dir, :string, default: nil
config_param :log_level, :enum, list: [:trace, :debug, :info, :warn, :error, :fatal], default: 'info'
config_param :suppress_repeated_stacktrace, :bool, default: nil
config_param :ignore_repeated_log_interval, :time, default: nil
config_param :emit_error_log_interval, :time, default: nil
config_param :suppress_config_dump, :bool, default: nil
config_param :log_event_verbose, :bool, default: nil
Expand Down
2 changes: 2 additions & 0 deletions test/config/test_system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def parse_text(text)
assert_nil(sc.root_dir)
assert_equal(Fluent::Log::LEVEL_INFO, sc.log_level)
assert_nil(sc.suppress_repeated_stacktrace)
assert_nil(sc.ignore_repeated_log_interval)
assert_nil(sc.emit_error_log_interval)
assert_nil(sc.suppress_config_dump)
assert_nil(sc.without_source)
Expand All @@ -86,6 +87,7 @@ def parse_text(text)
'root_dir' => ['root_dir', File.join(TMP_DIR, 'root')],
'log_level' => ['log_level', 'error'],
'suppress_repeated_stacktrace' => ['suppress_repeated_stacktrace', true],
'ignore_repeated_log_interval' => ['ignore_repeated_log_interval', 10],
'log_event_verbose' => ['log_event_verbose', true],
'suppress_config_dump' => ['suppress_config_dump', true],
'without_source' => ['without_source', true],
Expand Down
44 changes: 44 additions & 0 deletions test/test_log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,50 @@ def test_different_log_level
end
end

sub_test_case "ignore_repeated_log_interval" do
def test_same_message
message = "This is test"
logger = ServerEngine::DaemonLogger.new(@log_device, {log_level: ServerEngine::DaemonLogger::INFO})
log = Fluent::Log.new(logger, {ignore_repeated_log_interval: 5})

log.error message
10.times { |i|
Timecop.freeze(@timestamp + i)
log.error message
}

expected = [
"2016-04-21 02:58:41 +0000 [error]: This is test\n",
"2016-04-21 02:58:47 +0000 [error]: This is test\n"
]
assert_equal(expected, log.out.logs)
end

def test_different_message
message = "This is test"
logger = ServerEngine::DaemonLogger.new(@log_device, {log_level: ServerEngine::DaemonLogger::INFO})
log = Fluent::Log.new(logger, {ignore_repeated_log_interval: 10})

log.error message
3.times { |i|
Timecop.freeze(@timestamp + i)
log.error message
log.error message
log.info "Hello! " + message
}

expected = [
"2016-04-21 02:58:41 +0000 [error]: This is test\n",
"2016-04-21 02:58:41 +0000 [info]: Hello! This is test\n",
"2016-04-21 02:58:42 +0000 [error]: This is test\n",
"2016-04-21 02:58:42 +0000 [info]: Hello! This is test\n",
"2016-04-21 02:58:43 +0000 [error]: This is test\n",
"2016-04-21 02:58:43 +0000 [info]: Hello! This is test\n",
]
assert_equal(expected, log.out.logs)
end
end

def test_dup
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
Expand Down

0 comments on commit 0e901e7

Please sign in to comment.