Basic software-defined assets are computed using a single op. If generating an asset involves multiple discrete computations, you can use graph-backed assets by separating each computation into an op and building a graph to combine your computations. This allows you to launch re-executions of runs at the op boundaries but doesn't require you to link each intermediate value to an asset in persistent storage.
Graph-backed assets are useful if you have an existing graph that produces and consumes assets. Wrapping your graph inside a software-defined asset gives you all the benefits of software-defined assets — like cross-job lineage — without requiring you to change the code inside your graph.
Additionally, graph-backed assets allow you to reuse an existing op across other graph-backed assets and jobs, or within the same graph.
Name | Description |
---|---|
AssetsDefinition.from_graph | Constructs an asset given a graph definition. |
To define a graph-backed asset, use the from_graph
attribute on the AssetsDefinition
object. The value returned from the graph that becomes the graph-backed asset will be stored in persistent storage as the asset:
import pandas as pd from dagster import AssetsDefinition, asset, graph, op @op(required_resource_keys={"slack"}) def fetch_files_from_slack(context) -> pd.DataFrame: files = context.resources.slack.files_list(channel="#random") return pd.DataFrame( [ { "id": file.get("id"), "created": file.get("created"), "title": file.get("title"), "permalink": file.get("permalink"), } for file in files ] ) @op def store_files(files): return files.to_sql(name="slack_files", con=create_db_connection()) @graph def store_slack_files_in_sql(): return store_files(fetch_files_from_slack()) graph_asset = AssetsDefinition.from_graph(store_slack_files_in_sql)
The from_graph
attribute on the AssetsDefinition
object infers upstream and downstream asset dependencies from the graph definition provided. In the most simple case when the graph returns a singular output, Dagster infers the name of the graph to be the outputted asset key.
In the example below, Dagster creates an asset with key middle_asset
from the middle_asset
graph. Just like assets defined via @asset
, each argument to the decorated graph function is an upstream asset name. middle_asset
depends on upstream_asset
, and downstream_asset
depends on middle_asset
:
from dagster import AssetsDefinition, asset, graph @asset def upstream_asset(): return 1 @graph def middle_asset(upstream_asset): return add_one(upstream_asset) middle_asset = AssetsDefinition.from_graph(middle_asset) @asset def downstream_asset(middle_asset): return middle_asset + 1
When your graph returns multiple outputs, Dagster infers each output name to be the outputted asset key. In the below example, two_assets_graph
accepts upstream_asset
and outputs two assets, first_asset
and second_asset
:
from dagster import AssetsDefinition, GraphOut, graph @graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()}) def two_assets_graph(upstream_asset): one, two = two_outputs(upstream_asset) return {"first_asset": one, "second_asset": two} two_assets = AssetsDefinition.from_graph(two_assets_graph)
You can also define dependencies for graph-backed assets explicitly via the asset_keys_by_input_name
and asset_keys_by_output_name
arguments to from_graph
:
from dagster import AssetsDefinition, GraphOut, graph @graph(out={"one": GraphOut(), "two": GraphOut()}) def return_one_and_two(zero): one, two = two_outputs(zero) return {"one": one, "two": two} explicit_deps_asset = AssetsDefinition.from_graph( return_one_and_two, keys_by_input_name={"zero": AssetKey("upstream_asset")}, keys_by_output_name={ "one": AssetKey("asset_one"), "two": AssetKey("asset_two"), }, )
By default, when executing a graph-backed asset, every asset produced by the graph must be materialized. This means that attempting to selectively execute a subset of assets defined in the graph-backed asset will result in an error.
If the underlying computation is sufficiently flexible to selectively output a subset of assets, a graph-backed asset can be subsetted. For example, let’s say we wanted to define a graph-backed asset with the structure depicted in the image below. In this case, we want to independently materialize foo_asset
and baz_asset
.
In order to selectively output an asset from a graph-backed asset, Dagster will run each op that is a dependency of the outputted asset. In the example, if we wanted to selectively materialize foo_asset
, Dagster would run foo
and bar
. If we wanted to selectively materialize baz_asset
, Dagster would run all three ops (foo
, bar
, and baz
).
Because the foo
op yields an asset output (foo_asset
) and is an upstream dependency of another asset generated from the graph (baz_asset
), we need to structure foo
to selectively return outputs depending on the asset subset selected for execution. We can do this by defining foo
to have optional outputs that are yielded conditionally. Dagster provides a context.selected_output_names
object on the op context that will return the outputs necessary to generate the asset subset.
During execution, if we select just baz_asset
for materialization, the below implementation of foo
will return {"foo_2"}
for context.selected_output_names
, preventing foo_asset
from being materialized.
@op(out={"foo_1": Out(is_required=False), "foo_2": Out(is_required=False)}) def foo(context, bar_1): # Selectively returns outputs based on selected assets if "foo_1" in context.selected_output_names: yield Output(bar_1 + 1, output_name="foo_1") if "foo_2" in context.selected_output_names: yield Output(bar_1 + 2, output_name="foo_2")
Because Dagster flattens each graph into a flat input/output mapping between ops under the hood, any op that produces an output of the graph must be structured to yield its outputs optionally, enabling the outputs to be returned independently.
In the example, foo
and baz
produce outputs of my_graph
. Subsequently, their outputs need to be yielded optionally. Because foo
yields multiple outputs, we must structure our code to conditionally yield its outputs like in the code snippet above.
However, because baz
only yields a singular output, Dagster will only run baz
when its asset output baz_asset
is selected. So, we don’t have to structure baz
to return an optional output. Because bar
does not yield any outputs that are returned from my_graph
, its outputs do not have to be selectively returned.
We could define the asset using the code below. Notice that can_subset
must be set to True
in the asset definition to signify that the graph-backed asset can be subsetted.
@op(out={"bar_1": Out(), "bar_2": Out()}) def bar(): return 1, 2 @op def baz(foo_2, bar_2): return foo_2 + bar_2 @graph(out={"foo_asset": GraphOut(), "baz_asset": GraphOut()}) def my_graph(): bar_1, bar_2 = bar() foo_1, foo_2 = foo(bar_1) return {"foo_asset": foo_1, "baz_asset": baz(foo_2, bar_2)} @repository def my_repo(): return [ define_asset_job("graph_asset"), AssetsDefinition.from_graph(my_graph, can_subset=True), ]
Depending on how outputs are returned from the ops within a graph-backed asset, there could be unexpected materializations. For example, the following foo
implementation would unexpectedly materialize foo_asset
if baz_asset
was the only asset selected for execution.
@op(out={"foo_1": Out(), "foo_2": Out()}) def foo(): return 1, 2 # Will unexpectedly materialize foo_asset my_repo.get_job("graph_asset").execute_in_process( asset_selection=[AssetKey("baz_asset")] )
This is because the foo
op is an upstream dependency of baz_asset
, and this implementation of foo
returns both the foo_1
and foo_2
outputs. The foo_1
output is returned as the foo_asset
output of the graph, causing an unexpected materialization of foo_asset
.