Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose periods of environment thread in waagent.conf #1891

Merged
merged 7 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Expose periods of environment thread in waagent.conf
  • Loading branch information
narrieta committed May 20, 2020
commit d3631af2d28470760dd190b6bba273e11969681b
33 changes: 33 additions & 0 deletions azurelinuxagent/common/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ def load_conf_from_file(conf_file_path, conf=__conf__):


__INTEGER_OPTIONS__ = {
"Extensions.GoalStatePeriod": 6,
"Extensions.GoalStateHistoryCleanupPeriod": 86400,
"OS.EnableFirewallPeriod": 30,
"OS.RemovePersistentNetRulesPeriod": 30,
"OS.RootDeviceScsiTimeoutPeriod": 30,
"OS.MonitorDhcpClientRestartPeriod": 30,
"OS.SshClientAliveInterval": 180,
"Provisioning.MonitorHostNamePeriod": 30,
"Provisioning.PasswordCryptSaltLength": 10,
"HttpProxy.Port": None,
"ResourceDisk.SwapSizeMB": 0,
Expand All @@ -164,6 +171,18 @@ def enable_firewall(conf=__conf__):
return conf.get_switch("OS.EnableFirewall", False)


def get_enable_firewall_period(conf=__conf__):
return conf.get_int("OS.EnableFirewallPeriod", 30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of the new flags dont have an entry in the README, any particular reason for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the only one that is not in README. EnableFirewall is not there either so I am following the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is a yml list and then there is a description for each flag in the conf file. I found a couple that have a description in the README but have not been added to the yml list in the README -

OS.RemovePersistentNetRulesPeriod
OS.RootDeviceScsiTimeoutPeriod
Provisioning.MonitorHostNamePeriod

Missing completely from README -

OS.MonitorDhcpClientRestartPeriod

Also I personally feel we should add the Firewall rules to the readme too, it makes more sense to be as transparent as possible especially because its an open source project.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add MonitorDhcpClientRestartPeriod to README. The other list is not in sync with README, I think it is just meant as a sample, not a comprehensive list of options. I'm not planning on listing all the missing options there.



def get_remove_persistent_net_rules_period(conf=__conf__):
return conf.get_int("OS.RemovePersistentNetRulesPeriod", 30)


def get_monitor_dhcp_client_restart_period(conf=__conf__):
return conf.get_int("OS.MonitorDhcpClientRestartPeriod", 30)


def enable_rdma(conf=__conf__):
return conf.get_switch("OS.EnableRDMA", False) or \
conf.get_switch("OS.UpdateRdmaDriver", False) or \
Expand Down Expand Up @@ -256,6 +275,10 @@ def get_root_device_scsi_timeout(conf=__conf__):
return conf.get("OS.RootDeviceScsiTimeout", None)


def get_root_device_scsi_timeout_period(conf=__conf__):
return conf.get_int("OS.RootDeviceScsiTimeoutPeriod", 30)


def get_ssh_host_keypair_type(conf=__conf__):
keypair_type = conf.get("Provisioning.SshHostKeyPairType", "rsa")
if keypair_type == "auto":
Expand All @@ -275,6 +298,14 @@ def get_extensions_enabled(conf=__conf__):
return conf.get_switch("Extensions.Enabled", True)


def get_goal_state_period(conf=__conf__):
return conf.get_int("Extensions.GoalStatePeriod", 6)


def get_goal_state_history_cleanup_period(conf=__conf__):
return conf.get_int("Extensions.GoalStateHistoryCleanupPeriod", 86400)


def get_allow_reset_sys_user(conf=__conf__):
return conf.get_switch("Provisioning.AllowResetSysUser", False)

Expand Down Expand Up @@ -321,6 +352,8 @@ def get_password_crypt_salt_len(conf=__conf__):
def get_monitor_hostname(conf=__conf__):
return conf.get_switch("Provisioning.MonitorHostName", False)

def get_monitor_hostname_period(conf=__conf__):
return conf.get_int("Provisioning.MonitorHostNamePeriod", 30)

def get_httpproxy_host(conf=__conf__):
return conf.get("HttpProxy.Host", None)
Expand Down
104 changes: 52 additions & 52 deletions azurelinuxagent/ga/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import re
import os
import socket
import time
import threading
import datetime

import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.logger as logger
Expand All @@ -34,6 +32,7 @@
from azurelinuxagent.common.protocol.util import get_protocol_util
from azurelinuxagent.common.utils.archive import StateArchiver
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION
from azurelinuxagent.ga.periodic_operation import PeriodicOperation

CACHE_PATTERNS = [
re.compile("^(.*)\.(\d+)\.(agentsManifest)$", re.IGNORECASE),
Expand All @@ -43,9 +42,6 @@

MAXIMUM_CACHED_FILES = 50

ARCHIVE_INTERVAL = datetime.timedelta(hours=24)


def get_env_handler():
return EnvHandler()

Expand All @@ -62,13 +58,26 @@ def __init__(self):
self.osutil = get_osutil()
self.dhcp_handler = get_dhcp_handler()
self.protocol_util = None
self._protocol = None
self.stopped = True
self.hostname = None
self.dhcp_id_list = []
self.server_thread = None
self.dhcp_warning_enabled = True
self.last_archive = None
self.archiver = StateArchiver(conf.get_lib_dir())
self._reset_firewall_rules = False

self._periodic_operations = [
PeriodicOperation("_remove_persistent_net_rules", self._remove_persistent_net_rules_period, conf.get_remove_persistent_net_rules_period()),
PeriodicOperation("_monitor_dhcp_client_restart", self._monitor_dhcp_client_restart, conf.get_monitor_dhcp_client_restart_period()),
PeriodicOperation("_cleanup_goal_state_history", self._cleanup_goal_state_history, conf.get_goal_state_history_cleanup_period())
]
if conf.enable_firewall():
self._periodic_operations.append(PeriodicOperation("_enable_firewall", self._enable_firewall, conf.get_enable_firewall_period()))
if conf.get_root_device_scsi_timeout() is not None:
self._periodic_operations.append(PeriodicOperation("_set_root_device_scsi_timeout", self._set_root_device_scsi_timeout, conf.get_root_device_scsi_timeout_period()))
if conf.get_monitor_hostname():
self._periodic_operations.append(PeriodicOperation("_monitor_hostname", self._monitor_hostname_changes, conf.get_monitor_hostname_period()))

def run(self):
if not self.stopped:
Expand Down Expand Up @@ -103,45 +112,39 @@ def monitor(self):
# than initializing it in the ExtHandler thread. This is done to avoid any concurrency issues as each
# thread would now have its own ProtocolUtil object as per the SingletonPerThread model.
self.protocol_util = get_protocol_util()
protocol = self.protocol_util.get_protocol()
reset_firewall_fules = False
self._protocol = self.protocol_util.get_protocol()
while not self.stopped:
self.osutil.remove_rules_files()

if conf.enable_firewall():
# If the rules ever change we must reset all rules and start over again.
#
# There was a rule change at 2.2.26, which started dropping non-root traffic
# to WireServer. The previous rules allowed traffic. Having both rules in
# place negated the fix in 2.2.26.
if not reset_firewall_fules:
self.osutil.remove_firewall(dst_ip=protocol.get_endpoint(), uid=os.getuid())
reset_firewall_fules = True

success = self.osutil.enable_firewall(dst_ip=protocol.get_endpoint(), uid=os.getuid())

add_periodic(
logger.EVERY_HOUR,
AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.Firewall,
is_success=success,
log_event=False)

timeout = conf.get_root_device_scsi_timeout()
if timeout is not None:
self.osutil.set_scsi_disks_timeout(timeout)

if conf.get_monitor_hostname():
self.handle_hostname_update()

self.handle_dhclient_restart()

self.archive_history()

time.sleep(5)

def handle_hostname_update(self):
for op in self._periodic_operations:
op.run()
PeriodicOperation.sleep_until_next_operation(self._periodic_operations)

def _remove_persistent_net_rules_period(self):
self.osutil.remove_rules_files()

def _enable_firewall(self):
# If the rules ever change we must reset all rules and start over again.
#
# There was a rule change at 2.2.26, which started dropping non-root traffic
# to WireServer. The previous rules allowed traffic. Having both rules in
# place negated the fix in 2.2.26.
if not self._reset_firewall_rules:
self.osutil.remove_firewall(dst_ip=self._protocol.get_endpoint(), uid=os.getuid())
self._reset_firewall_rules = True

success = self.osutil.enable_firewall(dst_ip=self._protocol.get_endpoint(), uid=os.getuid())

add_periodic(
logger.EVERY_HOUR,
AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.Firewall,
is_success=success,
log_event=False)

def _set_root_device_scsi_timeout(self):
self.osutil.set_scsi_disks_timeout(conf.get_root_device_scsi_timeout())

def _monitor_hostname_changes(self):
curr_hostname = socket.gethostname()
if curr_hostname != self.hostname:
logger.info("EnvMonitor: Detected hostname change: {0} -> {1}",
Expand Down Expand Up @@ -169,6 +172,9 @@ def get_dhcp_client_pid(self):

return pid

def _monitor_dhcp_client_restart(self):
self.handle_dhclient_restart()

def handle_dhclient_restart(self):
if len(self.dhcp_id_list) == 0:
self.dhcp_id_list = self.get_dhcp_client_pid()
Expand All @@ -183,16 +189,10 @@ def handle_dhclient_restart(self):
self.dhcp_handler.conf_routes()
self.dhcp_id_list = new_pid

def archive_history(self):
def _cleanup_goal_state_history(self):
"""
Purge history if we have exceed the maximum count.
Create a .zip of the history that has been preserved.
Purge history and create a .zip of the history that has been preserved.
"""
if self.last_archive is not None \
and datetime.datetime.utcnow() < \
self.last_archive + ARCHIVE_INTERVAL:
return

self.archiver.purge()
self.archiver.archive()

Expand Down
35 changes: 15 additions & 20 deletions azurelinuxagent/ga/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ def __init__(self):
self.imds_client = None

self.event_thread = None
self._reset_loggers_op = PeriodicOperation("reset_loggers", self.reset_loggers, self.RESET_LOGGERS_PERIOD)
self._collect_and_send_events_op = PeriodicOperation("collect_and_send_events", self.collect_and_send_events, self.EVENT_COLLECTION_PERIOD)
self._send_telemetry_heartbeat_op = PeriodicOperation("send_telemetry_heartbeat", self.send_telemetry_heartbeat, self.TELEMETRY_HEARTBEAT_PERIOD)
self._poll_telemetry_metrics_op = PeriodicOperation("poll_telemetry_metrics usage", self.poll_telemetry_metrics, self.CGROUP_TELEMETRY_POLLING_PERIOD)
self._send_host_plugin_heartbeat_op = PeriodicOperation("send_host_plugin_heartbeat", self.send_host_plugin_heartbeat, self.HOST_PLUGIN_HEARTBEAT_PERIOD)
self._send_imds_heartbeat_op = PeriodicOperation("send_imds_heartbeat", self.send_imds_heartbeat, self.IMDS_HEARTBEAT_PERIOD)
self._log_altered_network_configuration_op = PeriodicOperation("log_altered_network_configuration", self.log_altered_network_configuration, self.LOG_NETWORK_CONFIGURATION_PERIOD)
self._periodic_operations = [
PeriodicOperation("reset_loggers", self.reset_loggers, self.RESET_LOGGERS_PERIOD),
PeriodicOperation("collect_and_send_events", self.collect_and_send_events, self.EVENT_COLLECTION_PERIOD),
PeriodicOperation("send_telemetry_heartbeat", self.send_telemetry_heartbeat, self.TELEMETRY_HEARTBEAT_PERIOD),
PeriodicOperation("poll_telemetry_metrics usage", self.poll_telemetry_metrics, self.CGROUP_TELEMETRY_POLLING_PERIOD),
PeriodicOperation("send_host_plugin_heartbeat", self.send_host_plugin_heartbeat, self.HOST_PLUGIN_HEARTBEAT_PERIOD),
PeriodicOperation("send_imds_heartbeat", self.send_imds_heartbeat, self.IMDS_HEARTBEAT_PERIOD),
PeriodicOperation("log_altered_network_configuration", self.log_altered_network_configuration, self.LOG_NETWORK_CONFIGURATION_PERIOD),
]
self.protocol = None
self.protocol_util = None
self.health_service = None
Expand Down Expand Up @@ -142,24 +144,17 @@ def daemon(self, init_data=False):
self.init_protocols()
self.init_imds_client()

min_delta = min(MonitorHandler.TELEMETRY_HEARTBEAT_PERIOD,
MonitorHandler.CGROUP_TELEMETRY_POLLING_PERIOD,
MonitorHandler.EVENT_COLLECTION_PERIOD,
MonitorHandler.HOST_PLUGIN_HEARTBEAT_PERIOD,
MonitorHandler.IMDS_HEARTBEAT_PERIOD).seconds
while not self.stopped():
try:
self.protocol.update_host_plugin_from_goal_state()
self._send_telemetry_heartbeat_op.run()
self._poll_telemetry_metrics_op.run()
self._collect_and_send_events_op.run()
self._send_host_plugin_heartbeat_op.run()
self._send_imds_heartbeat_op.run()
self._log_altered_network_configuration_op.run()
self._reset_loggers_op.run()

for op in self._periodic_operations:
op.run()

except Exception as e:
logger.warn("An error occurred in the monitor thread main loop; will skip the current iteration.\n{0}", ustr(e))
time.sleep(min_delta)

PeriodicOperation.sleep_until_next_operation(self._periodic_operations)

def reset_loggers(self):
"""
Expand Down
24 changes: 20 additions & 4 deletions azurelinuxagent/ga/periodic_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import datetime
import time

from azurelinuxagent.common import logger
from azurelinuxagent.common.future import ustr
Expand All @@ -35,22 +36,37 @@ class PeriodicOperation(object):
def __init__(self, name, operation, period):
self._name = name
self._operation = operation
self._period = period
self._last_run = None
self._period = period if isinstance(period, datetime.timedelta) else datetime.timedelta(seconds=period)
self._next_run_time = datetime.datetime.utcnow()
self._last_warning = None
self._last_warning_time = None

def run(self):
try:
if self._last_run is None or datetime.datetime.utcnow() >= self._last_run + self._period:
if self._next_run_time <= datetime.datetime.utcnow():
try:
self._operation()
finally:
self._last_run = datetime.datetime.utcnow()
self._next_run_time = datetime.datetime.utcnow() + self._period
except Exception as e:
warning = "Failed to {0}: {1} --- [NOTE: Will not log the same error for the next hour]".format(self._name, ustr(e))
if warning != self._last_warning or self._last_warning_time is None or datetime.datetime.utcnow() >= self._last_warning_time + self._LOG_WARNING_PERIOD:
logger.warn(warning)
self._last_warning_time = datetime.datetime.utcnow()
self._last_warning = warning

def next_run_time(self):
return self._next_run_time

@staticmethod
def sleep_until_next_operation(operations):
"""
Takes a list of operations, finds the operation that should be executed next (that with the closest next_run_time)
and sleeps until it is time to execute that operation.
"""
next_operation_time = min([op.next_run_time() for op in operations])

sleep_time = (next_operation_time - datetime.datetime.utcnow()).total_seconds()
if sleep_time > 0:
time.sleep(sleep_time)

9 changes: 4 additions & 5 deletions azurelinuxagent/ga/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.protocol.util import get_protocol_util
from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol
from azurelinuxagent.common.protocol.wire import WireProtocol
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
from azurelinuxagent.common.version import AGENT_NAME, AGENT_VERSION, AGENT_DIR_PATTERN, CURRENT_AGENT,\
CURRENT_VERSION, DISTRO_NAME, DISTRO_VERSION, is_current_agent_installed, PY_VERSION_MAJOR, PY_VERSION_MINOR, \
Expand All @@ -67,9 +66,9 @@

MAX_FAILURE = 3 # Max failure allowed for agent before blacklisted

GOAL_STATE_INTERVAL = 3
GOAL_STATE_INTERVAL_DISABLED = 5 * 60

ORPHAN_POLL_INTERVAL = 3
ORPHAN_WAIT_INTERVAL = 15 * 60

AGENT_SENTINEL_FILE = "current_version"
Expand Down Expand Up @@ -290,7 +289,7 @@ def run(self, debug=False):
self._ensure_readonly_files()
self._ensure_cgroups_initialized()

goal_state_interval = GOAL_STATE_INTERVAL if conf.get_extensions_enabled() else GOAL_STATE_INTERVAL_DISABLED
goal_state_interval = conf.get_goal_state_period() if conf.get_extensions_enabled() else GOAL_STATE_INTERVAL_DISABLED

while self.running:
#
Expand Down Expand Up @@ -430,7 +429,7 @@ def _ensure_no_orphans(self, orphan_wait_interval=ORPHAN_WAIT_INTERVAL):
wait_interval = orphan_wait_interval

while self.osutil.check_pid_alive(pid):
wait_interval -= GOAL_STATE_INTERVAL
wait_interval -= ORPHAN_POLL_INTERVAL
if wait_interval <= 0:
logger.warn(
u"{0} forcibly terminated orphan process {1}",
Expand All @@ -443,7 +442,7 @@ def _ensure_no_orphans(self, orphan_wait_interval=ORPHAN_WAIT_INTERVAL):
u"{0} waiting for orphan process {1} to terminate",
CURRENT_AGENT,
pid)
time.sleep(GOAL_STATE_INTERVAL)
time.sleep(ORPHAN_POLL_INTERVAL)

os.remove(pid_file)

Expand Down
Loading