Skip to content

Commit

Permalink
Add throttling message
Browse files Browse the repository at this point in the history
  • Loading branch information
Smjert committed Dec 2, 2021
1 parent 913ab28 commit d1e59cc
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 5 deletions.
51 changes: 47 additions & 4 deletions osquery/events/linux/auditdnetlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ namespace {

const std::string kAppArmorRecordMarker{"apparmor="};
constexpr std::uint64_t kUnprocessedRecordsThreshold{4096};
// How often in seconds a message should be displayed if throttling happened
constexpr std::uint64_t kThrottlingMessageInterval{60};
// How much to wait for each throttling loop in millseconds
constexpr std::uint64_t kThrottlingDuration{100};

bool IsSELinuxRecord(const audit_reply& reply) noexcept {
static const auto& selinux_event_set = kSELinuxEventList;
Expand Down Expand Up @@ -332,11 +336,28 @@ bool AuditdNetlinkReader::acquireMessages() noexcept {
}

/* Throttle reading if the processing thread cannot keep up,
we don't want to use too much memory. */
we don't want to use too much memory */
while (auditd_context_->unprocessed_records_amount >
kUnprocessedRecordsThreshold &&
!interrupted()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
++auditd_context_->netlink_throttling_count;
std::this_thread::sleep_for(std::chrono::milliseconds(kThrottlingDuration));
}

/* We want to warn about throttling happening at most every
kThrottlingMessageInterval seconds */
if (auditd_context_->netlink_throttling_count > 0) {
auto now = getUnixTime();
if (auditd_context_->last_netlink_throttling_message_time +
kThrottlingMessageInterval <=
now) {
LOG(WARNING) << "The Audit publisher has throttled reading records from "
"Netlink for "
<< (auditd_context_->netlink_throttling_count / 10.0f)
<< " seconds. Some events may have been lost.";
auditd_context_->netlink_throttling_count = 0;
auditd_context_->last_netlink_throttling_message_time = now;
}
}

if (reset_handle) {
Expand Down Expand Up @@ -795,11 +816,33 @@ void AuditdNetlinkParser::start() {
audit_event_record_queue.clear();

/* Throttling the record processing if the consumer (the publisher)
cannot keep up. */
cannot keep up */
while (auditd_context_->processed_records_backlog >
kUnprocessedRecordsThreshold &&
!interrupted()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
++auditd_context_->processing_throttling_count;
std::this_thread::sleep_for(
std::chrono::milliseconds(kThrottlingDuration));
}

/* We want to warn about throttling happening at most every
kThrottlingMessageInterval seconds */
if (auditd_context_->processing_throttling_count > 0) {
auto now = getUnixTime();
if (auditd_context_->last_processing_throttling_message_time +
kThrottlingMessageInterval <=
now) {
/* NOTE: this is meant as a debugging message since throttling here
doesn't mean that events will be lost. It might cause throttling on
the reading side, but if that happens a warning
will be given there */
VLOG(1) << "The Audit publisher has throttled record processing for "
<< (auditd_context_->processing_throttling_count / 10.0f)
<< " seconds. This may cause further throttling and loss of "
"events.";
auditd_context_->processing_throttling_count = 0;
auditd_context_->last_processing_throttling_message_time = now;
}
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion osquery/events/linux/auditdnetlink.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,20 @@ struct AuditdContext final {

/// Amount of records that have been parsed but that still need to be consumed
/// by the publisher. Used for throttling the thread processing records if the
/// publisher cannot empty the backlog fast enough.
/// publisher cannot empty the backlog fast enough
std::atomic<std::size_t> processed_records_backlog{};

/// Timestamp of the last Netlink records reading throttling message
std::uint64_t last_netlink_throttling_message_time{};

/// Timestamp of the last records processing throttling message
std::uint64_t last_processing_throttling_message_time{};

/// Count of loops done during Netlink records reading throttling
std::uint32_t netlink_throttling_count{};

/// Count of loops done during records processing throttling
std::uint32_t processing_throttling_count{};
};

using AuditdContextRef = std::shared_ptr<AuditdContext>;
Expand Down

0 comments on commit d1e59cc

Please sign in to comment.