-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pipeline-manager: logs endpoint #2500
Conversation
The desired behavior for the Logs tab would be based on the pipeline status of the currently open pipeline:
|
4721ff2
to
619846a
Compare
619846a
to
cd79d82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the old behavior at least in the local form factor where logs from all pipelines are also written to the terminal?
/// is sufficient space, the line is added to the buffer. | ||
pub fn append(&mut self, line: String) { | ||
if line.len() > self.size_limit_byte { | ||
self.num_discarded_lines += self.buffer.len() + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to truncate such a line to a reasonable length. In fact, this might be a good strategy for all lines, even if they fit in the log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it more as a terminal experience, in which truncating lines also does not happen. The line length would be quite exceptional if it is more than a megabyte by itself -- the log lines are the output of the pipeline executable, as such such long log lines would not occur unless the adapters crate or the sql compiler prints it.
9ad31ce
to
6202cb6
Compare
let mut response_logs = config.get("/v0/pipelines/test/logs").await; | ||
assert_eq!(response_logs.status(), StatusCode::OK); | ||
assert_eq!( | ||
"LOGS STREAM END: no logs currently available (likely, the pipeline has not yet started)\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"LOGS STREAM END: no logs currently available (likely, the pipeline has not yet started)\n", | |
"LOG STREAM: no logs available (likely, the pipeline has not yet started)\n", |
maybe this could just be empty instead of this string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to LOG STREAM UNAVAILABLE: the pipeline has likely not yet started
I think it is useful for the user of the logs endpoint to get back some feedback, rather than just empty, as the stream will not await till the pipeline starts running but already return.
stderr: ChildStderr, | ||
mut log_follow_request_receiver: Receiver<Sender<LogMessage>>, | ||
) -> (Sender<()>, JoinHandle<Receiver<Sender<LogMessage>>>) { | ||
let (terminate_sender, mut terminate_receiver) = mpsc::channel::<()>(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've now replaced it with a oneshot channel, it is only for sending the termination message.
// New stdout line | ||
line = lines_stdout.next_line() => { | ||
if let Ok(line) = line { | ||
if let Some(line) = line { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would this stuff ever be None? do we need an else branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case is when stdout (or stderr) would return that there are no more lines: https://docs.rs/tokio/1.40.0/tokio/io/struct.Lines.html#method.poll_next_line -- Thanks! I'll add an error case for that as well 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added variables stdout_finished
and stderr_finished
that track if either of them returned a None
// browsers (in particular, Chrome) not yet displaying the content because | ||
// they want more data to infer the content type (even though it was provided). | ||
Ok(HttpResponse::Ok() | ||
.content_type("text/plain; charset=utf-8") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no extra method to set the charset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charset is part of the content type. HttpResponseBuilder doesn't seem to have a dedicated method to set them separately: https://docs.rs/actix-web/latest/actix_web/struct.HttpResponseBuilder.html
/// Process a new log line by adding it to the lines buffer and | ||
/// sending it out to all followers. Any followers that exhibit | ||
/// a send error are removed. | ||
async fn process_log_line_with_followers( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this stuff be simplified by using https://docs.rs/tokio/1.40.0/tokio/sync/broadcast/index.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not I think, at the start we need to catch up a new follower and need to send to only that channel and not all the other existing followers.
6202cb6
to
c6927a6
Compare
stdout_finished = true; | ||
} | ||
Some(line) => { | ||
println!("{line}"); // Also print it to manager's stdout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we're printing unconditionally because pipeline's output is already controlled by the pipeline's log level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can see as we go if we need to tune down the logging verbosity, or make this a runner configuration option
Addition of a new API endpoint to follow the logs of a pipeline supplied by the runner, which invokes and manages the instance (e.g., process) running the pipeline executable. The runner retrieves the logs from the running pipeline instance and stores it in an internal circular buffer constrained by byte size and number of lines. Each runner has a receiver, of which the runner logs endpoint can lookup the corresponding sender. Over this channel, the runner logs endpoint can request the runner to add it as a logs follower by sending a sender for which it has the receiver. The runner then catches up the logs endpoint by sending the entire content of the internal buffer, and every time a new line is added also sends it to all the followers. If the logs endpoint disconnects, the sender given to the runner will error and be removed. It is also possible for the runner to lose connection to the pipeline instance, in which case it will send a last line and then drop all follow senders, thus resulting in the logs endpoint receivers to error and end the stream. The API server logs endpoint uses the pipeline name to lookup the pipeline identifier, which it uses to issue the request to the runner logs endpoint. The API server streams the result from the runner to the client. - Local runner: listens to the stdout/stderr of the spawned local process and uses a line buffer to write complete lines to the internal circular buffer. - UI: tab titled "Logs" which uses the new endpoint to display logs - Runner main web server default port: 8089 - Runner main with web server and reconciliation loop - Additional CLI arguments to reach runner web server - Integration test Co-authored-by: Karakatiza666 <bulakh.96@gmail.com> Signed-off-by: Karakatiza666 <bulakh.96@gmail.com> Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
2935518
to
8e656d7
Compare
Summary
Addition of a new API endpoint to follow the logs of a pipeline supplied by the runner, which invokes and manages the instance (e.g., process) running the pipeline executable. The runner retrieves the logs from the running pipeline instance and stores it in an internal circular buffer constrained by byte size and number of lines.
Each runner has a receiver, of which the runner logs endpoint can lookup the corresponding sender. Over this channel, the runner logs endpoint can request the runner to add it as a logs follower by sending a sender for which it has the receiver. The runner then catches up the logs endpoint by sending the entire content of the internal buffer, and every time a new line is added also sends it to all the followers. If the logs endpoint disconnects, the sender given to the runner will error and be removed. It is also possible for the runner to lose connection to the pipeline instance, in which case it will send a last line and then drop all follow senders, thus resulting in the logs endpoint receivers to error and end the stream.
The API server logs endpoint uses the pipeline name to lookup the pipeline identifier, which it uses to issue the request to the runner logs endpoint. The API server streams the result from the runner to the client.
process and uses a line buffer to write complete lines to the internal
circular buffer.
Example usage
Start pipeline manager:
RUST_BACKTRACE=1 cargo run --package=pipeline-manager --features pg-embed --bin pipeline-manager -- --dev-mode
Create and start a pipeline, for example named
example
with program (below program is known to crash upon data ingestion):Use either the Logs tab in the UI or curl (
curl -s -X GET http://localhost:8080/v0/pipelines/example/logs
) to follow the logs.To generate some interesting logs, you can trigger an error when you run:
Remaining tasks
try_send
instead ofsend
potentiallyRelated PRs/issues
Screenshots