Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send logs from workers to master and improve log viewer tab in the Web UI #2750

Merged
merged 9 commits into from
Jun 7, 2024
Prev Previous commit
Next Next commit
Truncate master and worker logs
  • Loading branch information
andrewbaldwin44 committed Jun 7, 2024
commit 52fd1a3ce14fab627db5d3121f3ce8eda0e2acbb
7 changes: 4 additions & 3 deletions locust/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging.config
import re
import socket
from collections import deque

HOSTNAME = re.sub(r"\..*", "", socket.gethostname())

Expand All @@ -13,7 +14,7 @@
class LogReader(logging.Handler):
def __init__(self):
super().__init__()
self.logs = []
self.logs = deque(maxlen=500)

def emit(self, record):
self.logs.append(self.format(record))
Expand Down Expand Up @@ -76,10 +77,10 @@ def setup_logging(loglevel, logfile=None):


def get_logs():
log_reader_handler = [handler for handler in logging.getLogger("root").handlers if handler.name == "log_reader"][0]
log_reader_handler = [handler for handler in logging.getLogger("root").handlers if handler.name == "log_reader"]

if log_reader_handler:
return log_reader_handler.logs
return list(log_reader_handler[0].logs)

return []

Expand Down
23 changes: 15 additions & 8 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,9 +1422,20 @@ def stats_reporter(self) -> NoReturn:
logger.error(f"Temporary connection lost to master server: {e}, will retry later.")
gevent.sleep(WORKER_REPORT_INTERVAL)

def logs_reporter(self) -> NoReturn:
def logs_reporter(self) -> None:
while True:
self._send_logs()
current_logs = get_logs()

if (len(current_logs) - len(self.logs)) > 10:
logger.warning(
"The worker attempted to send more than 10 log lines in one interval. Further log sending was disabled for this worker."
)
self._send_logs(get_logs())
break
if len(current_logs) > len(self.logs):
self._send_logs(current_logs)

self.logs = current_logs
gevent.sleep(WORKER_LOG_REPORT_INTERVAL)

def send_message(self, msg_type: str, data: dict[str, Any] | None = None, client_id: str | None = None) -> None:
Expand All @@ -1443,12 +1454,8 @@ def _send_stats(self) -> None:
self.environment.events.report_to_master.fire(client_id=self.client_id, data=data)
self.client.send(Message("stats", data, self.client_id))

def _send_logs(self) -> None:
current_logs = get_logs()

if len(current_logs) > len(self.logs):
self.logs = current_logs
self.send_message("logs", {"worker_id": self.client_id, "logs": current_logs})
def _send_logs(self, current_logs) -> None:
self.send_message("logs", {"worker_id": self.client_id, "logs": current_logs})

def connect_to_master(self):
self.retry += 1
Expand Down
34 changes: 34 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3992,6 +3992,40 @@ def my_task(self):
self.assertEqual(worker.client_id, client.outbox[3].data.get("worker_id"))
worker.quit()

def test_quit_worker_logs(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
short_time = 0.05

log_handler = LogReader()
log_handler.name = "log_reader"
log_handler.setLevel(logging.INFO)
logger = logging.getLogger("root")
logger.addHandler(log_handler)
log_line = "spamming log"

for _ in range(11):
logger.info(log_line)

worker = self.get_runner(environment=Environment(), user_classes=[MyUser], client=client)

gevent.sleep(short_time)

self.assertEqual("logs", client.outbox[3].type)
self.assertEqual(
"The worker attempted to send more than 10 log lines in one interval. Further log sending was disabled for this worker.",
client.outbox[3].data.get("logs", [])[0],
)
self.assertEqual(worker.client_id, client.outbox[3].data.get("worker_id"))
worker.quit()
logger.removeHandler(log_handler)


class TestMessageSerializing(unittest.TestCase):
def test_message_serialize(self):
Expand Down