-
Notifications
You must be signed in to change notification settings - Fork 676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flytepropeller][flyteadmin] Streaming Decks V2 #6053
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com> Co-authored-by: Yi Cheng <luyc58576@gmail.com> Co-authored-by: pingsutw <pingsutw@apache.org>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6053 +/- ##
=======================================
Coverage 37.01% 37.02%
=======================================
Files 1317 1317
Lines 132523 132580 +57
=======================================
+ Hits 49060 49082 +22
- Misses 79217 79244 +27
- Partials 4246 4254 +8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
switch pluginTrns.pInfo.Phase() { | ||
case pluginCore.PhaseSuccess: | ||
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). | ||
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do a head
call on the deck URI for every task that succeeds? Two thoughts here:
(1) does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?
(2) this is incurring a 20-30ms performance degredation to every task execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will take a look tmr, thank you!!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do a head call on the deck URI for every task that succeeds?
yes it will do a head
call by RemoteFileOutputReader
flyte/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Lines 306 to 313 in b3330ba
func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) { | |
md, err := r.store.Head(ctx, r.outPath.GetDeckPath()) | |
if err != nil { | |
return false, err | |
} | |
return md.Exists(), nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you know the performance degradation?
did you use grafana or other performance tools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?
flyteadmin will set the deckURI
in the execution metadata
to nil
if the propeller removes it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hamersaw
How to test it?
flytectl demo start --image futureoutlier/sandbox:deck-1205-1138 --force cd flytekit
gh pr checkout 2779
from flytekit import ImageSpec, task, workflow
from flytekit.deck import Deck
flytekit_hash = "473ae1119af6f86c26c0790dee0affa3eb29be64"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
# Define custom image for the task
custom_image = ImageSpec(packages=[flytekit],
apt_packages=["git"],
registry="localhost:30000",
env={"FLYTE_SDK_LOGGING_LEVEL": 10},
)
@task(enable_deck=True, container_image=custom_image)
def t_deck():
import time
"""
1st deck only show timeline deck
2nd will show
"""
for i in range(5):
Deck.publish()
# # raise Exception("This is an exception")
time.sleep(3)
@workflow
def wf():
t_deck()
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
import os
runner = CliRunner()
path = os.path.realpath(__file__)
# result = runner.invoke(pyflyte.main,
# ["run", path, "wf"])
# print("Local Execution: ", result.output)
result = runner.invoke(pyflyte.main,
["run", "--remote", path,"wf"])
# "--remote"
print("Remote Execution: ", result.output) |
Mind adding screenshots for the rendered deck and refresh to the PR description? |
Yes no problem |
its provided! |
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Code Review Agent Run #f3ef5eActionable Suggestions - 1
Additional Suggestions - 1
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
return nil | ||
} | ||
|
||
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method signature for CacheHit
has been modified to remove the deckPath
parameter, but it seems this parameter may still be needed based on the usage in the code. Consider if this change could cause issues with deck path handling.
Code suggestion
Check the AI-generated fix before applying
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) { | |
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) { |
Code Review Run #f3ef5e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
// - We relied on a HEAD request to check if the deck file exists, then added the URI to the event. | ||
// | ||
// After (new behavior): | ||
// - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is no longer correct right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes super nice catch
@@ -380,6 +430,27 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics | |||
return t.taskMetricsMap[metricNameKey], nil | |||
} | |||
|
|||
func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { | |||
// FLYTE_ENABLE_DECK is used when flytekit > 1.14.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this comment
|
||
metadata := template.GetMetadata() | ||
if metadata == nil { | ||
return DeckUnknown, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this correct in older versions of flytekit? didn't tasks in the past also have this field? this means that this function will always return Disabled
right for older versions of flytekit. meaning the condition on line 567 won't get triggered cuz it'll be disabled instead of unknown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yes, you are right, thinking solution.
Code Review Agent Run Status
|
// GetDeckStatus determines whether a task generates a deck based on its execution context. | ||
// | ||
// This function ensures backward compatibility with older Flytekit versions using the following logic: | ||
// 1. For Flytekit > 1.14.3, the task template's metadata includes the `generates_deck` flag: | ||
// - If `generates_deck` is set to true, it indicates that the task generates a deck, and DeckEnabled is returned. | ||
// 2. If `generates_deck` is set to false or is not set (likely from older Flytekit versions): | ||
// - DeckUnknown is returned as a placeholder status. | ||
// - In terminal states, a HEAD request can be made to check if the deck file exists. | ||
// | ||
// In future implementations, a `DeckDisabled` status could be introduced for better performance optimization: | ||
// - This would eliminate the need for a HEAD request in the final phase. | ||
// - However, the tradeoff is that a new field would need to be added to FlyteIDL to support this behavior. | ||
|
||
template, err := tCtx.tr.Read(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better comments!
cc @wild-endeavor
Tracking issue
#5574
Why are the changes needed?
To enhance user visibility into Flyte Decks at different stages of workflow execution (running, failing, and succeeding), enabling better debugging and analysis.
What changes were proposed in this pull request?
Concept:
NodeExecutionEvent
, and send it to admin.flyte/flytepropeller/pkg/controller/nodes/executor.go
Lines 1251 to 1261 in b3330ba
Life Cycle:
use new flytekit > 1.14.0
summary:
HEAD
request to be called. (save resource)details:
DeckURI
when the task is running ifFLYTE_ENABLE_DECK=true
in the task template.DeckURI
to node info, and turn it toNodeExecutionEvent
to flyte admin.DeckURI
toClosure
flyte/flyteadmin/dataproxy/service.go
Lines 175 to 189 in b3330ba
node Closure
, it will not show theFlyte Deck
button.old flytekit <= 1.14.0
summary:
details:
HEAD
request to know if the Deck URI exists or not.if exist, then put it to the node info.
How was this patch tested?
python code:
Setup process
single binary.
flyte: this branch
flytekit: flyteorg/flytekit#2779
flyteconsole: flyteorg/flyteconsole#890
Screenshots
flytekit branch:
flyteorg/flytekit#2779
NEW FLYTEKIT, NO DECK, RUNNING With Deck, SUCCEED, and FAILED
OSS-STREAMING-DECK-small.mov
OLD FLYTEKIT, NO DECK, RUNNING With Deck, SUCCEED, and FAILED
OSS-STREAMING-DECK-OLD-FLYTEKIT-small.mov
Check all the applicable boxes
Related PRs
follow up questions
Abort
phase for the streaming deck?should we support
EPhaseAbort
in this file?https://github.com/flyteorg/flyte/blob/b3330ba4430538f91ae9fc7d868a29a2e96db8bd/flytepropeller/pkg/controller/nodes/handler/transition_info.go
Summary by Bito
Implementation of enhanced Flyte Deck streaming functionality for improved workflow execution visibility. Introduces real-time deck URI handling in Flytepropeller and Flyteadmin, supporting deck information display across various execution states. Maintains backward compatibility with Flytekit <=1.14.0 while optimizing for newer versions through FLYTE_ENABLE_DECK configuration.Unit tests added: False
Estimated effort to review (1-5, lower is better): 2