diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index b7c810cfe5164..8af963b98c232 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1032,18 +1032,6 @@ logging: type: boolean example: ~ default: "False" - enable_task_context_logger: - description: | - If enabled, Airflow may ship messages to task logs from outside the task run context, e.g. from - the scheduler, executor, or callback execution context. This can help in circumstances such as - when there's something blocking the execution of the task and ordinarily there may be no task - logs at all. - This is set to ``True`` by default. If you encounter issues with this feature - (e.g. scheduler performance issues) it can be disabled. - version_added: 2.8.0 - type: boolean - example: ~ - default: "True" color_log_error_keywords: description: | A comma separated list of keywords related to errors whose presence should display the line in red diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 24379d340a784..0192892603f20 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -19,7 +19,6 @@ from __future__ import annotations -import inspect import logging import os from contextlib import suppress @@ -238,10 +237,6 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non self.handler.setLevel(self.level) return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None - @cached_property - def supports_task_context_logging(self) -> bool: - return "identifier" in inspect.signature(self.set_context).parameters - @staticmethod def add_triggerer_suffix(full_path, job_id=None): """ diff --git a/airflow/utils/log/task_context_logger.py b/airflow/utils/log/task_context_logger.py deleted file mode 100644 index 1d2301b65be81..0000000000000 --- a/airflow/utils/log/task_context_logger.py +++ /dev/null @@ -1,188 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import logging -from contextlib import suppress -from copy import copy -from logging import Logger -from typing import TYPE_CHECKING - -from airflow.configuration import conf -from airflow.models.taskinstancekey import TaskInstanceKey -from airflow.utils.log.file_task_handler import _ensure_ti -from airflow.utils.session import create_session - -if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstance - from airflow.utils.log.file_task_handler import FileTaskHandler - -logger = logging.getLogger(__name__) - - -class TaskContextLogger: - """ - Class for sending messages to task instance logs from outside task execution context. - - This is intended to be used mainly in exceptional circumstances, to give visibility into - events related to task execution when otherwise there would be none. - - :meta private: - """ - - def __init__(self, component_name: str, call_site_logger: Logger | None = None): - """ - Initialize the task context logger with the component name. - - :param component_name: the name of the component that will be used to identify the log messages - :param call_site_logger: if provided, message will also be emitted through this logger - """ - self.component_name = component_name - self.task_handler = self._get_task_handler() - self.enabled = self._should_enable() - self.call_site_logger = call_site_logger - - def _should_enable(self) -> bool: - if not conf.getboolean("logging", "enable_task_context_logger"): - return False - if not self.task_handler: - logger.warning("Task handler does not support task context logging") - return False - logger.info("Task context logging is enabled") - return True - - @staticmethod - def _get_task_handler() -> FileTaskHandler | None: - """Return the task handler that supports task context logging.""" - handlers = [ - handler - for handler in logging.getLogger("airflow.task").handlers - if getattr(handler, "supports_task_context_logging", False) - ] - if not handlers: - return None - h = handlers[0] - if TYPE_CHECKING: - assert isinstance(h, FileTaskHandler) - return h - - def _log(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message to the task instance logs. - - :param level: the log level - :param msg: the message to relay to task context log - :param ti: the task instance or the task instance key - """ - if self.call_site_logger and self.call_site_logger.isEnabledFor(level=level): - with suppress(Exception): - self.call_site_logger.log(level, msg, *args) - - if not self.enabled: - return - - if not self.task_handler: - return - - task_handler = copy(self.task_handler) - try: - if isinstance(ti, TaskInstanceKey): - with create_session() as session: - ti = _ensure_ti(ti, session) - task_handler.set_context(ti, identifier=self.component_name) - if hasattr(task_handler, "mark_end_on_close"): - task_handler.mark_end_on_close = False - filename, lineno, func, stackinfo = logger.findCaller(stacklevel=3) - record = logging.LogRecord( - self.component_name, level, filename, lineno, msg, args, None, func=func - ) - task_handler.emit(record) - finally: - task_handler.close() - - def critical(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level CRITICAL to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.CRITICAL, msg, *args, ti=ti) - - def fatal(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level FATAL to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.FATAL, msg, *args, ti=ti) - - def error(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level ERROR to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.ERROR, msg, *args, ti=ti) - - def warn(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level WARN to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.WARNING, msg, *args, ti=ti) - - def warning(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level WARNING to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.WARNING, msg, *args, ti=ti) - - def info(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level INFO to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.INFO, msg, *args, ti=ti) - - def debug(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level DEBUG to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.DEBUG, msg, *args, ti=ti) - - def notset(self, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - """ - Emit a log message with level NOTSET to the task instance logs. - - :param msg: the message to relay to task context log - :param ti: the task instance - """ - self._log(logging.NOTSET, msg, *args, ti=ti) diff --git a/newsfragments/43183.significant.rst b/newsfragments/43183.significant.rst new file mode 100644 index 0000000000000..e363824b6db5f --- /dev/null +++ b/newsfragments/43183.significant.rst @@ -0,0 +1,5 @@ +Remove TaskContextLogger + +We introduced this as a way to inject messages into task logs from places +other than the task execution context. We later realized that we were better off +just using the Log table. diff --git a/providers/src/airflow/providers/amazon/CHANGELOG.rst b/providers/src/airflow/providers/amazon/CHANGELOG.rst index 8099f3943aac9..0da5f22339229 100644 --- a/providers/src/airflow/providers/amazon/CHANGELOG.rst +++ b/providers/src/airflow/providers/amazon/CHANGELOG.rst @@ -788,7 +788,7 @@ Misc .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): * ``Use reproducible builds for provider packages (#35693)`` - * ``Update http to s3 system test (#35711)`` + * ``Update http to s3 system test (#35711)`` 8.11.0 ...... diff --git a/tests/utils/log/test_task_context_logger.py b/tests/utils/log/test_task_context_logger.py deleted file mode 100644 index 7c73648535a22..0000000000000 --- a/tests/utils/log/test_task_context_logger.py +++ /dev/null @@ -1,139 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import logging -from unittest import mock -from unittest.mock import Mock - -import pytest - -from airflow.models.taskinstancekey import TaskInstanceKey -from airflow.utils.log.task_context_logger import TaskContextLogger - -from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS -from tests_common.test_utils.config import conf_vars - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - -logger = logging.getLogger(__name__) - -pytestmark = pytest.mark.skip_if_database_isolation_mode - - -@pytest.fixture -def mock_handler(): - logger = logging.getLogger("airflow.task") - old = logger.handlers[:] - h = Mock() - logger.handlers[:] = [h] - yield h - logger.handlers[:] = old - - -@pytest.fixture -def ti(dag_maker): - with dag_maker() as dag: - - @dag.task() - def nothing(): - return None - - nothing() - - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun("running", run_id="abc", **triggered_by_kwargs) - ti = dr.get_task_instances()[0] - return ti - - -def test_task_context_logger_enabled_by_default(): - t = TaskContextLogger(component_name="test_component") - assert t.enabled is True - - -@pytest.mark.parametrize("supported", [True, False]) -def test_task_handler_not_supports_task_context_logging(mock_handler, supported): - mock_handler.supports_task_context_logging = supported - t = TaskContextLogger(component_name="test_component") - assert t.enabled is supported - - -@pytest.mark.skip_if_database_isolation_mode -@pytest.mark.db_test -@pytest.mark.parametrize("supported", [True, False]) -def test_task_context_log_with_correct_arguments(ti, mock_handler, supported): - mock_handler.supports_task_context_logging = supported - t = TaskContextLogger(component_name="test_component") - t.info("test message with args %s, %s", "a", "b", ti=ti) - if supported: - mock_handler.set_context.assert_called_once_with(ti, identifier="test_component") - mock_handler.emit.assert_called_once() - else: - mock_handler.set_context.assert_not_called() - mock_handler.emit.assert_not_called() - - -@pytest.mark.skip_if_database_isolation_mode -@pytest.mark.db_test -@mock.patch("airflow.utils.log.task_context_logger._ensure_ti") -@pytest.mark.parametrize("supported", [True, False]) -def test_task_context_log_with_task_instance_key(mock_ensure_ti, ti, mock_handler, supported): - mock_handler.supports_task_context_logging = supported - mock_ensure_ti.return_value = ti - task_instance_key = TaskInstanceKey(ti.dag_id, ti.task_id, ti.run_id, ti.try_number, ti.map_index) - t = TaskContextLogger(component_name="test_component") - t.info("test message with args %s, %s", "a", "b", ti=task_instance_key) - if supported: - mock_handler.set_context.assert_called_once_with(ti, identifier="test_component") - mock_handler.emit.assert_called_once() - else: - mock_handler.set_context.assert_not_called() - mock_handler.emit.assert_not_called() - - -@pytest.mark.skip_if_database_isolation_mode -@pytest.mark.db_test -def test_task_context_log_closes_task_handler(ti, mock_handler): - t = TaskContextLogger("blah") - t.info("test message", ti=ti) - mock_handler.close.assert_called_once() - - -@pytest.mark.skip_if_database_isolation_mode -@pytest.mark.skip_if_database_isolation_mode -@pytest.mark.db_test -def test_task_context_log_also_emits_to_call_site_logger(ti): - logger = logging.getLogger("abc123567") - logger.setLevel(logging.INFO) - logger.log = Mock() - t = TaskContextLogger("blah", call_site_logger=logger) - t.info("test message", ti=ti) - logger.log.assert_called_once_with(logging.INFO, "test message") - - -@pytest.mark.db_test -@pytest.mark.parametrize("val, expected", [("true", True), ("false", False)]) -def test_task_context_logger_config_works(ti, mock_handler, val, expected): - with conf_vars({("logging", "enable_task_context_logger"): val}): - t = TaskContextLogger("abc") - t.info("test message", ti=ti) - if expected: - mock_handler.emit.assert_called() - else: - mock_handler.emit.assert_not_called() diff --git a/tests_common/test_utils/mock_executor.py b/tests_common/test_utils/mock_executor.py index 6d4791e889150..83197298733ce 100644 --- a/tests_common/test_utils/mock_executor.py +++ b/tests_common/test_utils/mock_executor.py @@ -18,7 +18,7 @@ from __future__ import annotations from collections import defaultdict -from unittest.mock import MagicMock, Mock +from unittest.mock import MagicMock from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_utils import ExecutorName @@ -52,8 +52,6 @@ def __init__(self, do_update=True, *args, **kwargs): super().__init__(*args, **kwargs) - self.task_context_logger = Mock() - def success(self): return State.SUCCESS