Sensors allow you to instigate runs based on any external state change.
Name | Description |
---|---|
@sensor | The decorator used to define a sensor. The decorated function is called the sensor's evaluation function. The decorator returns a SensorDefinition . |
RunRequest | The sensor evaluation function can yield one or more run requests. Each run request creates a job run. |
SkipReason | If a sensor evaluation doesn't yield any run requests, it can instead yield a skip reason to log why the evaluation was skipped or why there were no events to be processed. |
SensorDefinition | Class for sensors. You almost never want to use initialize this class directly. Instead, you should use the @sensor decorator, which returns a SensorDefinition |
SensorEvaluationContext | The context object passed to a sensor evaluation function. |
build_sensor_context | A function that constructs an instance of SensorEvaluationContext , This is intended to be used to test a sensor. |
@asset_sensor | The decorator used to define an asset sensor. The decorated function is an evaluation function that takes in a SensorEvaluationContext and an asset materialization event. The decorator returns an AssetSensorDefinition |
AssetSensorDefinition | A special sensor definition class for asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @asset_sensor which returns a AssetSensorDefinition |
@multi_asset_sensor | The decorator used to define an asset sensor that can monitor multiple assets at a time. The decorated function is an evaluation function that takes in a MultiAssetSensorEvaluationContext which has methods to fetch materialization events for the monitored assets. The decorator returns an MultiAssetSensorDefinition |
MultiAssetSensorDefinition | A special sensor definition class for multi asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @multi_asset_sensor which returns a MultiAssetSensorDefinition |
MultiSensorEvaluationContext | The context object passed to a multi asset sensor evaluation function. Has methods for fetching materialization events for assets |
build_multi_asset_sensor_context | A function that constructs an instance of MultiAssetSensorEvaluationContext , This is intended to be used to test a multi asset sensor. |
@run_status_sensor | The decorator used to define a run status sensor. The decorator returns a RunStatusSensorDefinition |
@run_failure_sensor | The decorator used to define a run failure sensor. The run failure sensor, is a special case of a run status sensor specifically to detect run failures. |
RunStatusSensorDefinition | Class for run status sensors. You almost never want to initialize this class directly. Instead, you should use the @run_status_sensor or @run_failure_sensor |
RunStatusSensorContext | The context object passed to a run status sensor evaluation. |
build_run_status_sensor_context | A function that constructs an instance of RunStatusSensorContext . This is intended to be used to test a run status sensor. |
Sensors are definitions in Dagster that allow you to instigate runs based on some external state change. For example, you can:
A sensor defines an evaluation function that returns either:
RunRequest
objects. Each run request launches a run.SkipReason
, which specifies a message which describes why no runs were requested.The Dagster Daemon runs each sensor evaluation function on a tight loop. If you are using sensors, make sure to follow the instructions on the Dagster Daemon page for how to run your sensors.
To define a sensor, use the @sensor
decorator. The decorated function can optionally have a context
as the first argument. The context is a SensorEvaluationContext
.
Let's say you have a job that logs a filename that is specified in the op configuration of the process_file
op:
from dagster import op, job @op(config_schema={"filename": str}) def process_file(context): filename = context.op_config["filename"] context.log.info(filename) @job def log_file_job(): process_file()
You can write a sensor that watches for new files in a specific directory and yields
a RunRequest
for each new file in the directory. By default, this sensor runs every 30 seconds.
import os from dagster import sensor, RunRequest @sensor(job=log_file_job) def my_directory_sensor(): for filename in os.listdir(MY_DIRECTORY): filepath = os.path.join(MY_DIRECTORY, filename) if os.path.isfile(filepath): yield RunRequest( run_key=filename, run_config={ "ops": {"process_file": {"config": {"filename": filename}}} }, )
This sensor iterates through all the files in MY_DIRECTORY
and yields
a RunRequest
for each file. Note that despite the yield
syntax, the function will run to completion before any runs are submitted.
To write a sensor that materializes assets, you can build a job that materializes assets:
asset_job = define_asset_job("asset_job", "*") @sensor(job=asset_job) def materializes_asset_sensor(): yield RunRequest(...)
Once a sensor is added to a repository
with the job it yields a RunRequest
for, it can be started and will start creating runs. You can start or stop sensors in Dagit, or by setting the default status to DefaultSensorStatus.RUNNING
in code:
@sensor(job=asset_job, default_status=DefaultSensorStatus.RUNNING) def my_running_sensor(): ...
If you manually start or stop a sensor in Dagit, that will override any default status that is set in code.
Once your sensor is started, if you're running the dagster-daemon process as part of your deployment, the sensor will begin executing immediately, without needing to restart the dagster-daemon process.
A useful pattern is to create a sensor that checks for new AssetMaterialization
events for a particular asset key. This can be used to kick off a job that computes downstream assets or notifies appropriate stakeholders.
One benefit of this pattern is that it enables cross-job and even cross-repository dependencies. Each job run instigated by an asset sensor is agnostic to the job that caused it.
Dagster provides a special asset sensor definition format for sensors that fire a single RunRequest
based on a single asset materialization. Here is an example of a sensor that generates a RunRequest
for every materialization for the asset key my_table
:
from dagster import AssetKey, EventLogEntry, SensorEvaluationContext, asset_sensor @asset_sensor(asset_key=AssetKey("my_table"), job=my_job) def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): yield RunRequest( run_key=context.cursor, run_config={ "ops": { "read_materialization": { "config": { "asset_key": asset_event.dagster_event.asset_key.path, } } } }, )
Multi-asset sensors, which can trigger job executions based on multiple asset materialization event streams, can be handled using the @multi_asset_sensor
decorator.
In the body of the sensor, you have access to the materialization event records for each asset via the context. MultiAssetSensorEvaluationContext.latest_materialization_records_by_key
returns a dictionary mapping the AssetKey
for each monitored asset to the most recent materialization record. If there is no materialization event, the mapped value will be None
. MultiAssetSensorEvaluationContext.materialization_records_for_key
returns a list of materialization event records for a specified AssetKey
. Both methods only return materializations after the latest cursor.
@multi_asset_sensor( asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")], job=my_job, ) def asset_a_and_b_sensor(context): asset_events = context.latest_materialization_records_by_key() if all(asset_events.values()): context.advance_all_cursors() return RunRequest()
Note the context.advance_all_cursors
call near the end of the sensor. The cursor helps keep track of which materialization events have been processed by the sensor so that the next time the sensor runs, only newer events are fetched. Since multi_asset_sensor
s provide flexibility to determine what conditions should result in RunRequest
s, the sensor must manually update the cursor if a RunRequest
is returned.
You can also return a SkipReason
to document why the sensor didn't launch a run.
@multi_asset_sensor( asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")], job=my_job, ) def asset_a_and_b_sensor_with_skip_reason(context): asset_events = context.latest_materialization_records_by_key() if all(asset_events.values()): context.advance_all_cursors() return RunRequest() elif any(asset_events.values()): materialized_asset_key_strs = [ key.to_user_string() for key, value in asset_events.items() if value ] not_materialized_asset_key_strs = [ key.to_user_string() for key, value in asset_events.items() if not value ] return SkipReason( f"Observed materializations for {materialized_asset_key_strs}, " f"but not for {not_materialized_asset_key_strs}" )
The MultiAssetSensorEvaluationContext.latest_materialization_records_by_partition
method fetches the latest materialization records by partition for a given asset. Using this method, you can trigger downstream partitioned runs when the corresponding partitions of upstream assets are materialized.
The following example monitors two upstream daily-partitioned assets, kicking off materializations of the corresponding partition in the downstream daily-partitioned asset. Whenever a partition is replaced in upstream_daily_1
and the same partition has previously been materialized in upstream_daily_2
(and vice versa), the sensor yields a run request for the downstream asset.
@multi_asset_sensor( asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")], job=downstream_daily_job, ) def trigger_daily_asset_if_all_upstream_partitions_materialized(context): run_requests_by_partition = {} for ( partition, materializations_by_asset, ) in context.latest_materialization_records_by_partition_and_asset().items(): for asset_key, materialization in materializations_by_asset.items(): if all( [ context.all_partitions_materialized(other_key, [partition]) for other_key in context.asset_keys if other_key != asset_key ] ): if partition not in run_requests_by_partition: run_requests_by_partition[ partition ] = downstream_daily_job.run_request_for_partition(partition) context.advance_cursor({asset_key: materialization}) return list(run_requests_by_partition.values())
If the PartitionsDefinition
of the monitored assets differs from the triggered asset, you can use the MultiAssetSensorEvaluationContext.get_downstream_partition_keys
method to map a partition key from one asset to another. This method accepts a partition key from the upstream asset and uses the existing PartitionMapping
object on the downstream asset to fetch the corresponding partitions in the downstream asset.
If a partition mapping is not defined, Dagster will use the default partition mapping, which is the TimeWindowPartitionMapping
for time window partitions definitions and the IdentityPartitionMapping
for other partitions definitions. The TimeWindowPartitionMapping
will map an upstream partition to the downstream partitions that overlap with it.
Looking for more? The examples section features another example of updating a weekly asset when upstream daily assets are materialized.
When instigating runs based on external events, you usually want to run exactly one job run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events: using run_key
and using a cursor.
In the example sensor above, the RunRequest
is constructed with a run_key
.
yield RunRequest( run_key=filename, run_config={"ops": {"process_file": {"config": {"filename": filename}}}}, )
Dagster guarantees that for a given sensor, at most one run is created for each RunRequest
with a unique run_key
. If a sensor yields a new run request with a previously used run_key
, Dagster skips processing the new run request.
In the example, a RunRequest
is requested for each file during every sensor evaluation. Therefore, for a given sensor evaluation, there already exists a RunRequest
with a run_key
for any file that existed during the previous sensor evaluation. Dagster skips processing duplicate run requests, so Dagster launches runs for only the files added since the last sensor evaluation. The result is exactly one run per file.
Run keys allow you to write sensor evaluation functions that declaratively describe what job runs should exist, and helps you avoid the need for more complex logic that manages state. However, when dealing with high-volume external events, some state-tracking optimizations might be necessary.
When writing a sensor that deals with high-volume events, it might not be feasible to yield
a RunRequest
during every sensor evaluation. For example, you may have an s3
storage bucket that contains thousands of files.
When writing a sensor for such event sources, you can maintain a cursor that limits the number of yielded run requests for previously processed events. The sensor context, provided to every sensor evaluation function, has a cursor
property and a update_cursor
method for sensors to track state across evaluations.
cursor
: A cursor field on SensorEvaluationContext
that returns the last persisted cursor value from a previous evaluation.update_cursor
: A method on SensorEvaluationContext
that takes a string to persist and make available to future evaluations.Here is a somewhat contrived example of our directory file sensor using a cursor for updated files.
@sensor(job=log_file_job) def my_directory_sensor_cursor(context): last_mtime = float(context.cursor) if context.cursor else 0 max_mtime = last_mtime for filename in os.listdir(MY_DIRECTORY): filepath = os.path.join(MY_DIRECTORY, filename) if os.path.isfile(filepath): fstats = os.stat(filepath) file_mtime = fstats.st_mtime if file_mtime <= last_mtime: continue # the run key should include mtime if we want to kick off new runs based on file modifications run_key = f"{filename}:{str(file_mtime)}" run_config = {"ops": {"process_file": {"config": {"filename": filename}}}} yield RunRequest(run_key=run_key, run_config=run_config) max_mtime = max(max_mtime, file_mtime) context.update_cursor(str(max_mtime))
For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams.
By default, the Dagster Daemon runs a sensor 30 seconds after that sensor's previous evaluation finishes executing. You can configure the interval using the minimum_interval_seconds
argument on the @sensor
decorator.
It's important to note that this interval represents a minimum interval between runs of the sensor and not the exact frequency the sensor runs. If you have a sensor that takes 30 seconds to complete, but the minimum_interval_seconds
is 5 seconds, the fastest Dagster Daemon will run the sensor is every 35 seconds. The minimum_interval_seconds
only guarantees that the sensor is not evaluated more frequently than the given interval.
For example, here are two sensors that specify two different minimum intervals:
@sensor(job=my_job, minimum_interval_seconds=30) def sensor_A(): yield RunRequest(run_key=None, run_config={}) @sensor(job=my_job, minimum_interval_seconds=45) def sensor_B(): yield RunRequest(run_key=None, run_config={})
These sensor definitions are short, so they run in less than a second. Therefore, you can expect these sensors to run consistently around every 30 and 45 seconds, respectively.
If a sensor evaluation function takes more than 60 seconds to return its results, the sensor evaluation will time out and the Dagster Daemon will move on to the next sensor without submitting any runs. This 60 second timeout only applies to the time it takes to run the sensor function, not to the execution time of the runs submitted by the sensor. To avoid timeouts, slower sensors can break up their work into chunks, using cursors to let subsequent sensor calls pick up where the previous call left off.
For debugging purposes, it is often useful to describe why a sensor might not yield any runs for a given evaluation. The sensor evaluation function can yield a SkipReason
with a string description that will be displayed in Dagit.
For example, here is our directory sensor that now provides a SkipReason when no files are encountered:
@sensor(job=log_file_job) def my_directory_sensor_with_skip_reasons(): has_files = False for filename in os.listdir(MY_DIRECTORY): filepath = os.path.join(MY_DIRECTORY, filename) if os.path.isfile(filepath): yield RunRequest( run_key=filename, run_config={ "ops": {"process_file": {"config": {"filename": filename}}} }, ) has_files = True if not has_files: yield SkipReason(f"No files found in {MY_DIRECTORY}.")
To quickly preview what an existing sensor will generate when evaluated, you can run the CLI command dagster sensor preview my_sensor_name
.
In order to unit test sensors, you can invoke the sensor directly. This will return all the run requests yielded by the sensor. The config obtained from these can be validated using the validate_run_config
function.
from dagster import validate_run_config @sensor(job=log_file_job) def sensor_to_test(): yield RunRequest( run_key="foo", run_config={"ops": {"process_file": {"config": {"filename": "foo"}}}}, ) def test_sensor(): for run_request in sensor_to_test(): assert validate_run_config(log_file_job, run_request.run_config)
Notice that since we did not use the context argument in our sensor, we don't have to provide a context object. However, if we do in fact need the context object for our sensor, we can provide it via build_sensor_context
. Consider again the my_directory_sensor_cursor
example.
@sensor(job=log_file_job) def my_directory_sensor_cursor(context): last_mtime = float(context.cursor) if context.cursor else 0 max_mtime = last_mtime for filename in os.listdir(MY_DIRECTORY): filepath = os.path.join(MY_DIRECTORY, filename) if os.path.isfile(filepath): fstats = os.stat(filepath) file_mtime = fstats.st_mtime if file_mtime <= last_mtime: continue # the run key should include mtime if we want to kick off new runs based on file modifications run_key = f"{filename}:{str(file_mtime)}" run_config = {"ops": {"process_file": {"config": {"filename": filename}}}} yield RunRequest(run_key=run_key, run_config=run_config) max_mtime = max(max_mtime, file_mtime) context.update_cursor(str(max_mtime))
This sensor makes use of the context
argument, and thus to invoke it, we need to provide one.
from dagster import build_sensor_context def test_my_directory_sensor_cursor(): context = build_sensor_context(cursor="0") for run_request in my_directory_sensor_cursor(context): assert validate_run_config(log_file_job, run_request.run_config)
You can monitor and operate sensors in Dagit. There are multiple views that help with observing sensor evaluations, skip reasons, and errors.
To view the sensors page, you can navigate to the "Sensors" tab from the Repository page. Here you can start and stop sensors using the toggle.
If you click on any sensor, you can monitor all sensor evaluations on a timeline and view a table of runs launched by the sensor:
If you want to act on the status of a job run, Dagster provides a way to create a sensor that reacts to run statuses. You can use run_status_sensor
with a specified DagsterRunStatus
to decorate a function that will run when the given status occurs. This can be used to launch runs of other jobs, send alerts to a monitoring service on run failure, or report a run success.
Here is an example of a run status sensor that launches a run of status_reporting_job
if a run is successful
@run_status_sensor( run_status=DagsterRunStatus.SUCCESS, request_job=status_reporting_job, ) def report_status_sensor(context): # this condition prevents the sensor from triggering status_reporting_job again after it succeeds if context.dagster_run.job_name != status_reporting_job.name: run_config = { "ops": { "status_report": {"config": {"job_name": context.dagster_run.job_name}} } } return RunRequest(run_key=None, run_config=run_config) else: return SkipReason("Don't report status of status_reporting_job")
request_job
is the job that will be run when the RunRequest
is returned.
Note that in report_status_sensor
we conditionally return a RunRequest. This ensures that when report_status_sensor
runs status_reporting_job
it doesn't enter an infinite loop where the success of status_reporting_job
triggers another run of status_reporting_job
, which triggers another run, and so on.
Here is an example of a sensor that reports job success in a Slack message:
from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus @run_status_sensor(run_status=DagsterRunStatus.SUCCESS) def my_slack_on_run_success(context: RunStatusSensorContext): slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"]) slack_client.chat_postMessage( channel="#alert-channel", message=f'Job "{context.dagster_run.job_name}" succeeded.', )
When a run status sensor is triggered by a job run but doesn't return anything, Dagster will report an event back to the run to indicate that the sensor ran.
Once you have written your sensor, you can add the sensor to a repository so it can be enabled and used the same as other sensors:
from dagster import repository @repository def my_repository(): return my_jobs + [my_slack_on_run_success]
Dagster provides a set of special run status sensor decorators for defining sensors that monitor run failure events. You can use run_failure_sensor
to decorate a function that will run when a run fails.
@run_failure_sensor(request_job=status_reporting_job) def report_failure_sensor(context): run_config = { "ops": {"status_report": {"config": {"job_name": context.dagster_run.job_name}}} } return RunRequest(run_key=None, run_config=run_config)
This run failure sensor sends a slack message when it runs:
import os from dagster import run_failure_sensor, RunFailureSensorContext from slack_sdk import WebClient @run_failure_sensor def my_slack_on_run_failure(context: RunFailureSensorContext): slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"]) slack_client.chat_postMessage( channel="#alert-channel", message=f'Job "{context.dagster_run.job_name}" failed. Error: {context.failure_event.message}', )
Dagster also provides the following out-of-box run failure sensors:
make_slack_on_run_failure_sensor
helps you create a run failure sensor that will message a given Slack channel:from dagster_slack import make_slack_on_run_failure_sensor slack_on_run_failure = make_slack_on_run_failure_sensor( "#my_channel", os.getenv("MY_SLACK_TOKEN") )
make_email_on_run_failure_sensor
helps you create a run failure sensor that will send emails via the SMTP protocol:from dagster import make_email_on_run_failure_sensor email_on_run_failure = make_email_on_run_failure_sensor( email_from="no-reply@example.com", email_password=os.getenv("ALERT_EMAIL_PASSWORD"), email_to=["xxx@example.com", "xyz@example.com"], )
Besides, if you would like to set up success or failure handling policies on ops, you can find more information on the Op Hooks page.
Sometimes, you may want to monitor jobs in a repository other than the one where the sensor is defined. You can use special identifiers RepositorySelector
and JobSelector
to tell a run status sensor to monitor jobs in another repository:
@run_status_sensor( monitored_jobs=[ RepositorySelector( location_name="repository.location", repository_name="team_a_repository" ) ], run_status=DagsterRunStatus.SUCCESS, ) def team_a_repo_sensor(): # when any job in team_a_repository succeeds, this sensor will trigger send_slack_alert() @run_failure_sensor( monitored_jobs=[ JobSelector( location_name="repository.location", repository_name="team_a_repository", job_name="data_update", ) ], ) def team_a_data_update_failure_sensor(): # when the data_update job in team_a_repository fails, this sensor will trigger send_slack_alert()
You can also monitor every job in your Dagster instance by specifying monitor_all_repositories=True
on the sensor decorator. Note that monitor_all_repositories
cannot be used along with jobs specified via monitored_jobs
.
@run_status_sensor( monitor_all_repositories=True, run_status=DagsterRunStatus.SUCCESS, ) def instance_sensor(): # when any job in the Dagster instance succeeds, this sensor will trigger send_slack_alert()
As with other sensors, you can directly invoke run status sensors. However, the context
provided via run_status_sensor
and run_failure_sensor
contain objects that are typically only available during run time. Below you'll find code snippets that demonstrate how to build the context so that you can directly invoke your function in unit tests. Feel free to copy the following into your unit tests.
If you had written a status sensor like this (assuming you implemented the function email_alert
elsewhere)
@run_status_sensor(run_status=DagsterRunStatus.SUCCESS) def my_email_sensor(context: RunStatusSensorContext): message = f'Job "{context.dagster_run.job_name}" succeeded.' email_alert(message)
We can first write a simple job that will succeed
@op def succeeds(): return 1 @job def my_job_succeeds(): succeeds()
Then we can execute this job and pull the attributes we need to build the context
. We provide a function build_run_status_sensor_context
that will return the correct context object
# execute the job instance = DagsterInstance.ephemeral() result = my_job_succeeds.execute_in_process(instance=instance) # retrieve the DagsterRun dagster_run = result.dagster_run # retrieve a success event from the completed execution dagster_event = result.get_job_success_event() # create the context run_status_sensor_context = build_run_status_sensor_context( sensor_name="my_email_sensor", dagster_instance=instance, dagster_run=dagster_run, dagster_event=dagster_event, ) # run the sensor my_email_sensor(run_status_sensor_context)
We have provided convenience functions ExecuteInProcessResult.get_job_success_event
and ExecuteInProcessResult.get_job_failure_event
for retrieving DagsterRunStatus.SUCCESS
and DagsterRunStatus.FAILURE
events, respectively. If you have a run status sensor triggered on another status, you can retrieve all events from result
and filter based on your event type.
We can use the same pattern to build the context for run_failure_sensor
. If we wanted to test this run failure sensor
@run_failure_sensor def my_email_failure_sensor(context: RunFailureSensorContext): message = f'Job "{context.dagster_run.job_name}" failed. Error: {context.failure_event.message}' email_alert(message)
We first need to make a simple job that will fail
from dagster import op, job @op def fails(): raise Exception("failure!") @job def my_job_fails(): fails()
Then we can execute the job and create our context
from dagster import DagsterInstance, build_run_status_sensor_context # execute the job instance = DagsterInstance.ephemeral() result = my_job_fails.execute_in_process(instance=instance, raise_on_error=False) # retrieve the DagsterRun dagster_run = result.dagster_run # retrieve a failure event from the completed job execution dagster_event = result.get_job_failure_event() # create the context run_failure_sensor_context = build_run_status_sensor_context( sensor_name="my_email_failure_sensor", dagster_instance=instance, dagster_run=dagster_run, dagster_event=dagster_event, ).for_run_failure() # run the sensor my_email_failure_sensor(run_failure_sensor_context)
Note the additional function call RunStatusSensorContext.for_run_failure
after creating the context
. The context
provided by run_failure_sensor
is a subclass of the context provided by run_status_sensor
and can be built using this additional call.
For jobs that should initiate new runs for new paths in an s3 bucket, the dagster-aws
package provides the useful helper function get_s3_keys
.
Here is an example of a sensor that listens to a particular s3 bucket my_s3_bucket
:
from dagster_aws.s3.sensor import get_s3_keys @sensor(job=my_job) def my_s3_sensor(context): since_key = context.cursor or None new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key) if not new_s3_keys: return SkipReason("No new s3 files found for bucket my_s3_bucket.") last_key = new_s3_keys[-1] run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys] context.update_cursor(last_key) return run_requests
If you want to use resources within your sensor, you can use the build_resources
API to perform the initialization.
from dagster import resource, build_resources, sensor @resource def the_credentials(): ... @resource(required_resource_keys={"credentials"}) def the_db_connection(init_context): get_the_db_connection(init_context.resources.credentials) @sensor(job=the_job) def uses_db_connection(): with build_resources( {"db_connection": the_db_connection, "credentials": the_credentials} ) as resources: conn = resources.db_connection ...
If a resource you want to initialize has dependencies on other resources, those can be included in the dictionary passed to build_resources
. For more in-depth usage, check out the Initializing Resources Outside of Execution section.
The following code example monitors an upstream daily-partitioned asset, kicking off materializations of a downstream weekly-partitioned asset whenever a daily partition is materialized and all daily partitions in the week have existing materializations. Every time a daily partition is replaced, the weekly partition will materialize.
MultiAssetSensorEvaluationContext.all_partitions_materialized
is a utility method accepts a list of partitions and checks if every provided partition has been materialized. This method ignores the cursor, so it searches through all existing materializations.@multi_asset_sensor(asset_keys=[AssetKey("upstream_daily_asset")], job=weekly_asset_job) def trigger_weekly_asset_from_daily_asset(context): run_requests = [] materializations_by_partition = context.latest_materialization_records_by_partition( AssetKey("upstream_daily_asset") ) # Get all corresponding weekly partitions for any materialized daily partitions for partition, materialization in materializations_by_partition.items(): weekly_partitions = context.get_downstream_partition_keys( partition, from_asset_key=AssetKey("upstream_daily_asset"), to_asset_key=AssetKey("downstream_weekly_asset"), ) if weekly_partitions: # Check that a downstream weekly partition exists # Upstream daily partition can only map to at most one downstream weekly partition daily_partitions_in_week = context.get_downstream_partition_keys( weekly_partitions[0], from_asset_key=AssetKey("downstream_weekly_asset"), to_asset_key=AssetKey("upstream_daily_asset"), ) if context.all_partitions_materialized( AssetKey("upstream_daily_asset"), daily_partitions_in_week ): run_requests.append( weekly_asset_job.run_request_for_partition(weekly_partitions[0]) ) # Advance the cursor so we only check event log records past the cursor context.advance_cursor( {AssetKey("upstream_daily_asset"): materialization} ) return run_requests
For more examples of sensors, check out the following in our Hacker News example: