Description
Overview
We have a somewhat scattered collection of changes that we're interested in making to pipelines. This issue describes each of each of these separately
YAML
Pipeline specs should be in YAML instead of JSON. Because YAML supports comments, we'll probably need to start storing users' raw pipeline specs as text in addition to storing their pipelines as parsed protobuf messages, so that when they call inspect-pipeline
we can show them the comments they originally wrote
Custom Resource Definitions
Currently, when a user wants to create a pipeline, they sent a CreatePipeline
RPC to pachyderm, which writes the new pipeline to etcd (which we currently regard as the source of truth) and responds immediately. Then, a listener in PPS, which is watching etcd, observes the write and creates a new RC in kubernetes. Instead, we'd like the original CreatePipeline
RPC to create a "pachyderm pipeline" custom resource in kubernetes, and respond based on the success of that operation. This would let us remove the pipeline watcher, and return any errors to users immediately instead of requiring that users watch incipient pipelines for failures
This change might also let us require fewer kubernetes permissions in pachd
(since it tells kubernetes what to create, but doesn't necessarily create anything itself). We could give pachd
a much more limited service account, which is better for a lot of deployments. To enable this, pachctl
would convert the pipeline spec into a kubernetes manifest (instead of pachd
), and then send the manifest to kubernetes. This would mean that the pachctl
user is creating the pipeline using their own kubernetes credentials, instead of pachd
's
Argo does use CRDs, but that's not really why we're interested in this.
Loops and Conditionals
An invariant that we've found important to maintain in Pachyderm is that every input commit generates exactly one output commit in every downstream branch. This invariant means that users always know what data their commits will be run with (vs, say, my new data getting processed with your old data because your slow cleaning pipeline takes too long to generate an output commit). It likewise means that a user always knows exactly what needs to happen before a given downstream job can run.
However, this invariant inherently precludes loops and conditional processing—a loop might generate multiple outputs (one for each iteration), and conditional processing might generate none. Our solution addresses this conflict by expanding our notion of a single pipeline: pipelines will be able to contain multiple steps that may themselves be arranged in loops and contain conditions. However, each pipeline will still only have one output commit that it can write to, and downstream pipelines will block until this one output commit is finished. If any step in a multi-step pipeline is modified, we'll treat that as an atomic change to the entire pipeline (i.e. a commit to the __spec__
repo), and the new step won't be incorporated into any existing jobs.
We probably couldn't support our current notion of incrementality for multi-stage pipelines; we'd have to process every input commit in its entirety, rather than skipping datums as we do now. The connection between a given input datum and a given output file could be infeasible to track1.
The new shuffle code that Bryce is working on (where individual workers collect each output file's entire contents as their jobs run) gives us a good approach to implementing stateless intermediate steps
How Would Conditions Work?
We considered having conditions correspond to an output directory or file (e.g. if a step writes output to /dir
, then the next step runs, otherwise it doesn't). If the presence of a file determines whether a pipeline is run, how do you create the first version of that file? For example, if you have:
A ────⟶ B/go OR B/stop (stops the loop)
⬑───────↲
how do you create the initial commit in the B/go
directory?
In contrast, Argo has a little DSL in the pipeline specification language that matches data contents. For example, you could have a score file ("artifact" in Argo), and a conditional that says "while out/score has a value <100, re-run the pipeline", which saves the user that bit of pipeline logic.
Another example where the DSL approach really works well (again in the context of model convergence) is: suppose you want the exit condition for training to be "the training score is ≥100 OR the pipeline has run 1000 times" (because sometimes it doesn't converge, and the score never exceeds 100, but we don't want it to run forever in that case). If pipelines have to determine how many times they've run, e.g. by passing files to themselves, then that adds extra pipeline logic as well as an extra input and output. If you can put "run at most 1000 times" in the pipeline spec, then you can just skip the extra pipeline logic, input and output
Argo also allows templates in the pipeline spec conditionals. Not sure how we feel about that
Pipeline Templates
This was one of the most contentious parts of the discussion, so the details may well change. However, templates were the closest thing to "subroutines" or "reusable pipeline components" that we came up with during the discussion. We envisioned at least the following properties:
- If some steps within a pipeline were generated with a template, they should appear as a single step by default in the dash, and probably other views (with the idea that you could e.g. click a template step to expand it into its consituent steps). Therefore Pachyderm must track which parts of a pipeline were generated by a template, and which parts were entered manually—we could not simply tell users to use an external template tool
- If a template is used more than once in different parts of the pipeline, it should be clear that both parts were generated by the same template (so Pachyderm must have an identity associated with each template, and pass that identity all the way through to e.g. the dash)
- Templates can be nested (which would result in multiple expansions)
- None of these would affect the overall atomicity guarantees of pipelines described above (in Loops and Conditionals). If a pipeline uses templates, and the template is changed or the parameters to the template are changed, then the entire parent pipeline is updated atomically, generating new output commits.
- If a template is used by multiple pipelines, and the template is changed, it's unclear whether we should wait until the dependent pipelines are individually updated to use the new template, or if we should update all dependent pipelines atomically in that operation.
Services
One idea we've had is that if you have several pipeline steps that need to talk to a service, you could restrict access to the service to steps in the pipeline. Right now, we have no way to track the dependency relationship between services and pipelines. Using the example of Sean's Postgres/TrueCar project2, we foresaw the following problems arising:
- If you update a service, Pachyderm has no way to know that pipelines reading from that service need to be re-run
- If many teams at TrueCar decided to use Sean's setup, they'd be running a lot of instances of Postgres, each with its own globally-visible service. This could get overwhelming, in the sense that ensuring any given pipeline doesn't connect to the wrong postgres node could be hard, and debugging a broken job, with no natural way to connect the job to a particular Postgres cluster, could be hard as well.
The solution that we came up with was that Pipelines could include services in addition to steps. Only steps within the pipeline would be able to communicate with the service, and changes to the service would amount to alterations to the pipeline. Pipeline jobs would include information about and logs from the service spun up within the job as well.
Discoverability
One problem that we considered solving with pipelines, but eventually moved towards solving with the auth system or other constructs, is discoverability. For example, if the infra team has a set of pipelines that might clutter the view of an data scientist on the marketing team (who simply doesn't care about them), then those infra pipelines should be hidden from the marketing person by default.
To solve this problem, we ended up focusing on repos, rather than pipelines, and would organize repos in a hierarchy that is highly analogous to directories in a file system. A mature Pachyderm cluster might have an /infra
directory, that the marketing person would see, but that they generally wouldn't explore. They might spend time in the /marketing/projections
directory.
A related feature (borrowed from Go packages) is that each directory that contains repos would have some that are "exported"—publicly visible/meant to be consumed—and some that are "private". Our marketing person might pull data from /infra/users/anon_user_ids
but would not, by default, see or be able to analyze /infra/users/pii
. However, when our marketing person looks in /marketing/projects
, they would see the private repo /marketing/projections/secret_financial
as well as pipelines that consume that repo.
Open Issues/Questions
- If pipelines are one RC or one CRD, and different steps in a pipeline are different containers, then we need all steps of a pipeline to start concurrently, which might mean there aren't enough resources in a cluster for two fat steps to coexist
- OTOH, if we're using pipeline CRDs, we could have >1 RC per pipeline (by having all RCs be children of the parent pipeline resource
- The Argo pipeline spec language is Turing complete. Do we want that?
1 If pipeline step one maps datum A -> C
and B -> C
and step two maps C -> D
, then tracking which parts of D
came from A
and which came from B
(via block refs) would be impossible, as step 2
sees all of C
together. This precludes our currently-planned 1.9 approach to incrementality (reprocessing A
and merging the new output into files affected by the previous output from A
). If we decided to preserve datum hashtrees or output hashtrees for certain intermediate steps, we might be able to skip some processing within a single step, though we'll have to figure out how to constrain the amount we persist from a job, given that a cyclic job could generate unbounded output
2 In 2018 (I think), Sean Jezewski worked on a proof-of-concept that included a large SQL query. Sean's approach to the POC was to deploy Postgres adjacent to a pachyderm pipeline, and have the pipeline send queries to the postgres nodes and store the results in the pipeline's output commit.