Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline-manager: logs endpoint #2500

Merged
merged 1 commit into from
Sep 25, 2024
Merged

pipeline-manager: logs endpoint #2500

merged 1 commit into from
Sep 25, 2024

Conversation

snkas
Copy link
Contributor

@snkas snkas commented Sep 16, 2024

Summary
Addition of a new API endpoint to follow the logs of a pipeline supplied by the runner, which invokes and manages the instance (e.g., process) running the pipeline executable. The runner retrieves the logs from the running pipeline instance and stores it in an internal circular buffer constrained by byte size and number of lines.

Each runner has a receiver, of which the runner logs endpoint can lookup the corresponding sender. Over this channel, the runner logs endpoint can request the runner to add it as a logs follower by sending a sender for which it has the receiver. The runner then catches up the logs endpoint by sending the entire content of the internal buffer, and every time a new line is added also sends it to all the followers. If the logs endpoint disconnects, the sender given to the runner will error and be removed. It is also possible for the runner to lose connection to the pipeline instance, in which case it will send a last line and then drop all follow senders, thus resulting in the logs endpoint receivers to error and end the stream.

The API server logs endpoint uses the pipeline name to lookup the pipeline identifier, which it uses to issue the request to the runner logs endpoint. The API server streams the result from the runner to the client.

  • Local runner: listens to the stdout/stderr of the spawned local
    process and uses a line buffer to write complete lines to the internal
    circular buffer.
  • UI: tab titled "Logs" which uses the new endpoint to display logs
  • Runner main web server default port: 8089
  • Runner main with web server and reconciliation loop
  • Additional CLI arguments to reach runner web server
  • Integration test

Example usage

  1. Start pipeline manager: RUST_BACKTRACE=1 cargo run --package=pipeline-manager --features pg-embed --bin pipeline-manager -- --dev-mode

  2. Create and start a pipeline, for example named example with program (below program is known to crash upon data ingestion):

    CREATE TABLE t1(c1 INTEGER); CREATE VIEW v1 AS SELECT ELEMENT(ARRAY [2, 3]) FROM t1;
    
  3. Use either the Logs tab in the UI or curl (curl -s -X GET http://localhost:8080/v0/pipelines/example/logs) to follow the logs.

  4. To generate some interesting logs, you can trigger an error when you run:

    curl -i -X POST http://localhost:8080/v0/pipelines/example/ingress/t1 -d '100'
    

Remaining tasks

  • Investigate behavior when one of the logs endpoint requests is slower than others
  • Decide on strategy when one of the log followers is unable to keep up -- use try_send instead of send potentially
  • Ability to set logging level on a per-pipeline basis -> Later PR
  • Use oneshot channels where appropriate

Related PRs/issues

Screenshots

logs_tab

@snkas
Copy link
Contributor Author

snkas commented Sep 16, 2024

The desired behavior for the Logs tab would be based on the pipeline status of the currently open pipeline:

  • If the Logs tab is open, and pipeline status is Initializing, Running, Paused, or Failed, the logs are retrieved
  • If the Logs tab is opened, an attempt should be made to retrieve the logs
  • If the Logs tab is open and the pipeline switches from a non-log status (Shutdown/Provisioning), to a log-potential status (Initializing, Running, Paused, Failed) it should try to retrieve the logs or at least show some button that asks the user if the user wants to retry to get logs as the status has changed
  • If the user switches away from the logs tab the logs MAY be kept following keeping the request open, but if simpler the request can also be closed and upon revisit a fresh request is started
  • If the logs request ends (possibly, due to timeout) a message should appear that "Logs follow request has returned early (pipeline is not yet shutdown). In order to see the latest, the logs need to be reloaded." and with a button to "Reload logs"

@Karakatiza666 Karakatiza666 force-pushed the logs-endpoint branch 4 times, most recently from 4721ff2 to 619846a Compare September 19, 2024 14:23
@snkas snkas marked this pull request as ready for review September 23, 2024 12:04
@snkas snkas requested review from ryzhyk and gz September 23, 2024 14:16
Copy link
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the old behavior at least in the local form factor where logs from all pipelines are also written to the terminal?

/// is sufficient space, the line is added to the buffer.
pub fn append(&mut self, line: String) {
if line.len() > self.size_limit_byte {
self.num_discarded_lines += self.buffer.len() + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to truncate such a line to a reasonable length. In fact, this might be a good strategy for all lines, even if they fit in the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it more as a terminal experience, in which truncating lines also does not happen. The line length would be quite exceptional if it is more than a megabyte by itself -- the log lines are the output of the pipeline executable, as such such long log lines would not occur unless the adapters crate or the sql compiler prints it.

crates/pipeline-manager/src/runner/main.rs Outdated Show resolved Hide resolved
crates/pipeline-manager/src/integration_test.rs Outdated Show resolved Hide resolved
crates/pipeline-manager/src/integration_test.rs Outdated Show resolved Hide resolved
let mut response_logs = config.get("/v0/pipelines/test/logs").await;
assert_eq!(response_logs.status(), StatusCode::OK);
assert_eq!(
"LOGS STREAM END: no logs currently available (likely, the pipeline has not yet started)\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"LOGS STREAM END: no logs currently available (likely, the pipeline has not yet started)\n",
"LOG STREAM: no logs available (likely, the pipeline has not yet started)\n",

maybe this could just be empty instead of this string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to LOG STREAM UNAVAILABLE: the pipeline has likely not yet started
I think it is useful for the user of the logs endpoint to get back some feedback, rather than just empty, as the stream will not await till the pipeline starts running but already return.

stderr: ChildStderr,
mut log_follow_request_receiver: Receiver<Sender<LogMessage>>,
) -> (Sender<()>, JoinHandle<Receiver<Sender<LogMessage>>>) {
let (terminate_sender, mut terminate_receiver) = mpsc::channel::<()>(10);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 10

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now replaced it with a oneshot channel, it is only for sending the termination message.

// New stdout line
line = lines_stdout.next_line() => {
if let Ok(line) = line {
if let Some(line) = line {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this stuff ever be None? do we need an else branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case is when stdout (or stderr) would return that there are no more lines: https://docs.rs/tokio/1.40.0/tokio/io/struct.Lines.html#method.poll_next_line -- Thanks! I'll add an error case for that as well 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added variables stdout_finished and stderr_finished that track if either of them returned a None

crates/pipeline-manager/src/runner/main.rs Outdated Show resolved Hide resolved
crates/pipeline-manager/src/runner/main.rs Outdated Show resolved Hide resolved
// browsers (in particular, Chrome) not yet displaying the content because
// they want more data to infer the content type (even though it was provided).
Ok(HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no extra method to set the charset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charset is part of the content type. HttpResponseBuilder doesn't seem to have a dedicated method to set them separately: https://docs.rs/actix-web/latest/actix_web/struct.HttpResponseBuilder.html

/// Process a new log line by adding it to the lines buffer and
/// sending it out to all followers. Any followers that exhibit
/// a send error are removed.
async fn process_log_line_with_followers(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this stuff be simplified by using https://docs.rs/tokio/1.40.0/tokio/sync/broadcast/index.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not I think, at the start we need to catch up a new follower and need to send to only that channel and not all the other existing followers.

stdout_finished = true;
}
Some(line) => {
println!("{line}"); // Also print it to manager's stdout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we're printing unconditionally because pipeline's output is already controlled by the pipeline's log level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can see as we go if we need to tune down the logging verbosity, or make this a runner configuration option

Addition of a new API endpoint to follow the logs of a pipeline supplied
by the runner, which invokes and manages the instance (e.g., process)
running the pipeline executable. The runner retrieves the logs from the
running pipeline instance and stores it in an internal circular buffer
constrained by byte size and number of lines.

Each runner has a receiver, of which the runner logs endpoint can lookup
the corresponding sender. Over this channel, the runner logs endpoint
can request the runner to add it as a logs follower by sending a sender
for which it has the receiver. The runner then catches up the logs
endpoint by sending the entire content of the internal buffer, and every
time a new line is added also sends it to all the followers. If the logs
endpoint disconnects, the sender given to the runner will error and be
removed. It is also possible for the runner to lose connection to the
pipeline instance, in which case it will send a last line and then drop
all follow senders, thus resulting in the logs endpoint receivers to
error and end the stream.

The API server logs endpoint uses the pipeline name to lookup the
pipeline identifier, which it uses to issue the request to the runner
logs endpoint. The API server streams the result from the runner to the
client.

- Local runner: listens to the stdout/stderr of the spawned local
  process and uses a line buffer to write complete lines to the internal
  circular buffer.
- UI: tab titled "Logs" which uses the new endpoint to display logs
- Runner main web server default port: 8089
- Runner main with web server and reconciliation loop
- Additional CLI arguments to reach runner web server
- Integration test

Co-authored-by: Karakatiza666 <bulakh.96@gmail.com>
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
@snkas snkas added this pull request to the merge queue Sep 25, 2024
Merged via the queue into main with commit f39a943 Sep 25, 2024
6 checks passed
@snkas snkas deleted the logs-endpoint branch September 25, 2024 18:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

don't log pipeline config on every startup Implement UI for pipeline logs Logs endpoint
4 participants