Skip to content

Commit

Permalink
Add BatchingOptions.RetryTimeLimit (#2120)
Browse files Browse the repository at this point in the history
  • Loading branch information
nblumhardt authored Oct 2, 2024
1 parent 3da0979 commit fe40d19
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 78 deletions.
6 changes: 6 additions & 0 deletions src/Serilog/Configuration/BatchingOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,10 @@ public class BatchingOptions
/// backpressure is applied.
/// </summary>
public int? QueueLimit { get; set; } = 100000;

/// <summary>
/// The maximum time that the sink will keep retrying failed batches for. The default is ten minutes. Lower
/// this value to reduce buffering and backpressure in high-load scenarios.
/// </summary>
public TimeSpan RetryTimeLimit { get; set; } = TimeSpan.FromMinutes(10);
}
12 changes: 7 additions & 5 deletions src/Serilog/Core/Sinks/Batching/BatchingSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ public BatchingSink(IBatchedLogEventSink batchedSink, BatchingOptions options)
if (options.BatchSizeLimit <= 0)
throw new ArgumentOutOfRangeException(nameof(options), "The batch size limit must be greater than zero.");
if (options.BufferingTimeLimit <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero.");
throw new ArgumentOutOfRangeException(nameof(options), "The buffering time limit must be greater than zero.");
if (options.RetryTimeLimit < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(options), "The retry time limit must not be negative.");

_targetSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink));
_batchSizeLimit = options.BatchSizeLimit;
_queue = options.QueueLimit is { } limit
? Channel.CreateBounded<LogEvent>(new BoundedChannelOptions(limit) { SingleReader = true })
: Channel.CreateUnbounded<LogEvent>(new UnboundedChannelOptions { SingleReader = true });
_batchScheduler = new FailureAwareBatchScheduler(options.BufferingTimeLimit);
_batchScheduler = new FailureAwareBatchScheduler(options.BufferingTimeLimit, options.RetryTimeLimit);
_eagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent;
_waitForShutdownSignal = Task.Delay(Timeout.InfiniteTimeSpan, _shutdownSignal.Token)
.ContinueWith(e => e.Exception, TaskContinuationOptions.OnlyOnFaulted);
Expand Down Expand Up @@ -143,15 +145,15 @@ async Task LoopAsync()
catch (Exception ex)
{
_failureListener.OnLoggingFailed(this, LoggingFailureKind.Temporary, "failed emitting a batch", _currentBatch, ex);
_batchScheduler.MarkFailure();
_batchScheduler.MarkFailure(out var shouldDropBatch, out var shouldDropQueue);

if (_batchScheduler.ShouldDropBatch)
if (shouldDropBatch)
{
_failureListener.OnLoggingFailed(this, LoggingFailureKind.Permanent, "dropping the current batch", _currentBatch, ex);
_currentBatch.Clear();
}

if (_batchScheduler.ShouldDropQueue)
if (shouldDropQueue)
{
DrainOnFailure(LoggingFailureKind.Permanent, "dropping all queued events", ex);
}
Expand Down
58 changes: 41 additions & 17 deletions src/Serilog/Core/Sinks/Batching/FailureAwareBatchScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,74 @@ namespace Serilog.Core.Sinks.Batching;
class FailureAwareBatchScheduler
{
static readonly TimeSpan MinimumBackoffPeriod = TimeSpan.FromSeconds(5);
static readonly TimeSpan MaximumBackoffInterval = TimeSpan.FromMinutes(10);
static readonly TimeSpan MaximumBackoffInterval = TimeSpan.FromMinutes(1);
const int DroppedBatchesBeforeDroppingQueue = 10;

const int FailuresBeforeDroppingBatch = 8;
const int FailuresBeforeDroppingQueue = 10;
readonly TimeSpan _bufferingTimeLimit, _retryTimeLimit;
readonly TimeProvider _timeProvider;

readonly TimeSpan _period;
long? _firstFailureTimestamp;
int _failuresSinceSuccessfulBatch, _consecutiveDroppedBatches;

int _failuresSinceSuccessfulBatch;
public FailureAwareBatchScheduler(TimeSpan bufferingTimeLimit, TimeSpan retryTimeLimit)
: this(bufferingTimeLimit, retryTimeLimit, TimeProvider.System)
{
}

public FailureAwareBatchScheduler(TimeSpan period)
internal FailureAwareBatchScheduler(TimeSpan bufferingTimeLimit, TimeSpan retryTimeLimit, TimeProvider timeProvider)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(period), "The batching period must be a positive timespan.");
if (bufferingTimeLimit < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(bufferingTimeLimit), "The buffering time limit must be a positive timespan.");

if (retryTimeLimit < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(retryTimeLimit), "The retry time limit must be a positive timespan.");

_period = period;
_bufferingTimeLimit = bufferingTimeLimit;
_retryTimeLimit = retryTimeLimit;
_timeProvider = timeProvider;
}

public void MarkSuccess()
{
_failuresSinceSuccessfulBatch = 0;
_consecutiveDroppedBatches = 0;
_firstFailureTimestamp = null;
}

public void MarkFailure()
public void MarkFailure(out bool shouldDropBatch, out bool shouldDropQueue)
{
++_failuresSinceSuccessfulBatch;
_firstFailureTimestamp ??= _timeProvider.GetTimestamp();

// Once we're up against the time limit, we'll try each subsequent batch once and then drop it.
var now = _timeProvider.GetElapsedTime(_firstFailureTimestamp.Value);
var wouldRetryAt = now.Add(NextInterval);
shouldDropBatch = wouldRetryAt >= _retryTimeLimit;

if (shouldDropBatch)
{
++_consecutiveDroppedBatches;
}

// After trying and dropping enough batches consecutively, we'll try to get out of the way and just drop
// everything after each subsequent failure. Each time a batch is tried and fails, we'll drop it and
// drain the whole queue.
shouldDropQueue = _consecutiveDroppedBatches >= DroppedBatchesBeforeDroppingQueue;
}

public TimeSpan NextInterval
{
get
{
// Available, and first failure, just try the batch interval
if (_failuresSinceSuccessfulBatch <= 1) return _period;
if (_failuresSinceSuccessfulBatch <= 1) return _bufferingTimeLimit;

// Second failure, start ramping up the interval - first 2x, then 4x, ...
var backoffFactor = Math.Pow(2, _failuresSinceSuccessfulBatch - 1);

// If the period is ridiculously short, give it a boost so we get some
// visible backoff.
var backoffPeriod = Math.Max(_period.Ticks, MinimumBackoffPeriod.Ticks);
var backoffPeriod = Math.Max(_bufferingTimeLimit.Ticks, MinimumBackoffPeriod.Ticks);

// The "ideal" interval
var backedOff = (long) (backoffPeriod * backoffFactor);
Expand All @@ -76,13 +104,9 @@ public TimeSpan NextInterval
var cappedBackoff = Math.Min(MaximumBackoffInterval.Ticks, backedOff);

// Unless that's shorter than the period, in which case we'll just apply the period
var actual = Math.Max(_period.Ticks, cappedBackoff);
var actual = Math.Max(_bufferingTimeLimit.Ticks, cappedBackoff);

return TimeSpan.FromTicks(actual);
}
}

public bool ShouldDropBatch => _failuresSinceSuccessfulBatch >= FailuresBeforeDroppingBatch;

public bool ShouldDropQueue => _failuresSinceSuccessfulBatch >= FailuresBeforeDroppingQueue;
}
48 changes: 48 additions & 0 deletions src/Serilog/Util/TimeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright © Serilog Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#if !NET8_0_OR_GREATER

using System.Diagnostics;

namespace System;

/// <summary>
/// A super-simple, cut-down subset of `System.TimeProvider` which we use internally to avoid a package dependency
/// on platforms without it.
/// </summary>
abstract class TimeProvider
{
public static TimeProvider System { get; } = new SystemTimeProvider();

public DateTimeOffset GetLocalNow() => DateTimeOffset.Now;

public virtual DateTimeOffset GetUtcNow() => DateTimeOffset.UtcNow;

public virtual long TimestampFrequency => Stopwatch.Frequency;

public virtual long GetTimestamp() => Stopwatch.GetTimestamp();

public TimeSpan GetElapsedTime(long startingTimestamp, long endingTimestamp)
{
// Assumes Stopwatch.Frequency is never zero, safe for our internal usage.
return new TimeSpan((long)((endingTimestamp - startingTimestamp) * ((double)TimeSpan.TicksPerSecond / TimestampFrequency)));
}

public TimeSpan GetElapsedTime(long startingTimestamp) => GetElapsedTime(startingTimestamp, GetTimestamp());

sealed class SystemTimeProvider : TimeProvider;
}

#endif
1 change: 1 addition & 0 deletions test/Serilog.ApprovalTests/Serilog.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Serilog.Configuration
public System.TimeSpan BufferingTimeLimit { get; set; }
public bool EagerlyEmitFirstEvent { get; set; }
public int? QueueLimit { get; set; }
public System.TimeSpan RetryTimeLimit { get; set; }
}
public interface ILoggerSettings
{
Expand Down
99 changes: 43 additions & 56 deletions test/Serilog.Tests/Core/FailureAwareBatchSchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,101 +4,88 @@ namespace Serilog.Core.Tests;

public class FailureAwareBatchSchedulerTests
{
static TimeSpan Period => TimeSpan.FromSeconds(2);
FailureAwareBatchScheduler Scheduler { get; } = new(Period);
TimeSpan _bufferingTimeLimit = TimeSpan.FromSeconds(2), _retryTimeLimit = TimeSpan.FromMinutes(10);
TestTimeProvider _timeProvider = new();
FailureAwareBatchScheduler _scheduler { get; }

public FailureAwareBatchSchedulerTests()
{
_scheduler = new(_bufferingTimeLimit, _retryTimeLimit, _timeProvider);
}

[Fact]
public void WhenNoFailuresHaveOccurredTheInitialIntervalIsUsed()
{
Assert.Equal(Period, Scheduler.NextInterval);
Assert.Equal(_bufferingTimeLimit, _scheduler.NextInterval);
}

[Fact]
public void WhenOneFailureHasOccurredTheInitialIntervalIsUsed()
{
Scheduler.MarkFailure();
Assert.Equal(Period, Scheduler.NextInterval);
_scheduler.MarkFailure(out _, out _);
Assert.Equal(_bufferingTimeLimit, _scheduler.NextInterval);
}

[Fact]
public void WhenTwoFailuresHaveOccurredTheIntervalBacksOff()
{
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Assert.Equal(TimeSpan.FromSeconds(10), Scheduler.NextInterval);
_scheduler.MarkFailure(out _, out _);
_scheduler.MarkFailure(out _, out _);
Assert.Equal(TimeSpan.FromSeconds(10), _scheduler.NextInterval);
}

[Fact]
public void WhenABatchSucceedsTheStatusResets()
{
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Scheduler.MarkSuccess();
Assert.Equal(Period, Scheduler.NextInterval);
_scheduler.MarkFailure(out _, out _);
_scheduler.MarkFailure(out _, out _);
_scheduler.MarkSuccess();
Assert.Equal(_bufferingTimeLimit, _scheduler.NextInterval);
}

[Fact]
public void WhenThreeFailuresHaveOccurredTheIntervalBacksOff()
{
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Scheduler.MarkFailure();
Assert.Equal(TimeSpan.FromSeconds(20), Scheduler.NextInterval);
Assert.False(Scheduler.ShouldDropBatch);
_scheduler.MarkFailure(out _, out _);
_scheduler.MarkFailure(out _, out _);
_scheduler.MarkFailure(out var shouldDropBatch, out _);
Assert.Equal(TimeSpan.FromSeconds(20), _scheduler.NextInterval);
Assert.False(shouldDropBatch);
}

[Fact]
public void WhenEightFailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped()
public void WhenRetryTimeLimitHasElapsedTheBatchIsDropped()
{
for (var i = 0; i < 8; ++i)
{
Assert.False(Scheduler.ShouldDropBatch);
Scheduler.MarkFailure();
}
Assert.Equal(TimeSpan.FromMinutes(10), Scheduler.NextInterval);
Assert.True(Scheduler.ShouldDropBatch);
Assert.False(Scheduler.ShouldDropQueue);
}

[Fact]
public void WhenTenFailuresHaveOccurredTheQueueIsDropped()
{
for (var i = 0; i < 10; ++i)
{
Assert.False(Scheduler.ShouldDropQueue);
Scheduler.MarkFailure();
_scheduler.MarkFailure(out var shouldDropBatch, out var shouldDropQueue);
Assert.False(shouldDropBatch);
Assert.False(shouldDropQueue);
}
Assert.True(Scheduler.ShouldDropQueue);
}

[Fact]
public void AtTheDefaultIntervalRetriesForTenMinutesBeforeDroppingBatch()
{
var cumulative = TimeSpan.Zero;
do
{
Scheduler.MarkFailure();

if (!Scheduler.ShouldDropBatch)
cumulative += Scheduler.NextInterval;
} while (!Scheduler.ShouldDropBatch);
_timeProvider.Advance(_retryTimeLimit);
_scheduler.MarkFailure(out var shouldDropBatch_, out var shouldDropQueue_);

Assert.False(Scheduler.ShouldDropQueue);
Assert.Equal(TimeSpan.Parse("00:10:32", CultureInfo.InvariantCulture), cumulative);
Assert.True(shouldDropBatch_);
Assert.False(shouldDropQueue_);
}

[Fact]
public void AtTheDefaultIntervalRetriesForThirtyMinutesBeforeDroppingQueue()
public void WhenTenConsecutiveBatchesAreDroppedTheQueueIsDropped()
{
var cumulative = TimeSpan.Zero;
do
_scheduler.MarkFailure(out var shouldDropBatch, out var shouldDropQueue);
_timeProvider.Advance(_retryTimeLimit);

for (var i = 0; i < 9; ++i)
{
Scheduler.MarkFailure();
_scheduler.MarkFailure(out shouldDropBatch, out shouldDropQueue);
Assert.True(shouldDropBatch);
Assert.False(shouldDropQueue);
}

if (!Scheduler.ShouldDropQueue)
cumulative += Scheduler.NextInterval;
} while (!Scheduler.ShouldDropQueue);
_scheduler.MarkFailure(out shouldDropBatch, out shouldDropQueue);

Assert.Equal(TimeSpan.Parse("00:30:32", CultureInfo.InvariantCulture), cumulative);
Assert.True(shouldDropBatch);
Assert.True(shouldDropQueue);
}
}
23 changes: 23 additions & 0 deletions test/Serilog.Tests/Support/TestTimeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Serilog.Tests.Support;

class TestTimeProvider: TimeProvider
{
DateTimeOffset _utcNow = DateTimeOffset.UtcNow;

public override long GetTimestamp()
{
return _utcNow.Ticks;
}

public override DateTimeOffset GetUtcNow()
{
return _utcNow;
}

public void Advance(TimeSpan distance)
{
_utcNow = _utcNow.Add(distance);
}

public override long TimestampFrequency => TimeSpan.TicksPerSecond;
}

0 comments on commit fe40d19

Please sign in to comment.