Skip to main content

Monitor Dynamic Table freshness

Because the Dynamic Tables are virtual — Dagster never executes them — their metadata timeline in the asset catalog never updates on its own. Two complementary tools close this gap: a sensor that continuously records refresh state as observations, and asset checks that give a structured pass/fail signal when a table's scheduling state is unhealthy.

The freshness sensor

The sensor runs every 60 seconds, queries information_schema.dynamic_tables, and emits one AssetObservation per table:

project_snowflake_dynamic_tables/defs/sensors.py
@dg.sensor(
name="dynamic_table_freshness_sensor",
minimum_interval_seconds=60,
)
def dynamic_table_freshness_sensor(
context: dg.SensorEvaluationContext,
snowflake: SnowflakeResource,
) -> dg.SensorResult | dg.SkipReason:
table_names_sql = ", ".join(f"'{sf_name}'" for _, sf_name in _TABLES)

try:
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
f"""
SELECT
name,
scheduling_state,
last_completed_refresh,
TIMESTAMPDIFF('second', last_completed_refresh, CURRENT_TIMESTAMP())
AS seconds_since_refresh
FROM information_schema.dynamic_tables
WHERE UPPER(name) IN ({table_names_sql})
AND schema_name = 'ANALYTICS'
"""
)
rows = cursor.fetchall()
except Exception as exc:
return dg.SkipReason(f"Snowflake query failed: {exc}")

if not rows:
return dg.SkipReason("No dynamic tables found in information_schema")

name_to_key = {sf_name: dagster_key for dagster_key, sf_name in _TABLES}

observations = [
dg.AssetObservation(
asset_key=dg.AssetKey(name_to_key[row[0].upper()]),
metadata={
"scheduling_state": dg.MetadataValue.text(str(row[1])),
"last_completed_refresh": dg.MetadataValue.text(str(row[2])),
"seconds_since_refresh": dg.MetadataValue.int(int(row[3] or 0)),
},
)
for row in rows
if row[0].upper() in name_to_key
]

return dg.SensorResult(asset_events=observations, skip_reason="Freshness metadata updated.")

The sensor returns SensorResult(asset_events=[...], skip_reason="...") rather than run requests. asset_events carries the AssetObservation objects that populate each virtual asset's metadata timeline in the Dagster UI. The skip_reason surfaces the freshness summary in the sensor tick history.

Freshness checks on the virtual assets

Observations record state — they don't pass or fail. For a structured health signal, asset checks co-located with the virtual specs query scheduling_state and return a boolean result:

project_snowflake_dynamic_tables/defs/assets/dynamic_tables.py
@dg.asset_check(asset="customer_lifetime_value")
def customer_lifetime_value_is_fresh(snowflake: SnowflakeResource) -> dg.AssetCheckResult:
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT scheduling_state, last_completed_refresh
FROM information_schema.dynamic_tables
WHERE UPPER(name) = 'CUSTOMER_LIFETIME_VALUE' AND schema_name = 'ANALYTICS'
""")
row = cursor.fetchone()
if not row:
return dg.AssetCheckResult(passed=False, metadata={"error": "table not found"})
state, last_refresh = row
return dg.AssetCheckResult(
passed=state in ("RUNNING", "SUSPENDED"),
metadata={
"scheduling_state": dg.MetadataValue.text(str(state)),
"last_completed_refresh": dg.MetadataValue.text(str(last_refresh)),
},
)


@dg.asset_check(asset="daily_revenue_rollup")
def daily_revenue_rollup_is_fresh(snowflake: SnowflakeResource) -> dg.AssetCheckResult:
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT scheduling_state, last_completed_refresh
FROM information_schema.dynamic_tables
WHERE UPPER(name) = 'DAILY_REVENUE_ROLLUP' AND schema_name = 'ANALYTICS'
""")
row = cursor.fetchone()
if not row:
return dg.AssetCheckResult(passed=False, metadata={"error": "table not found"})
state, last_refresh = row
return dg.AssetCheckResult(
passed=state in ("RUNNING", "SUSPENDED"),
metadata={
"scheduling_state": dg.MetadataValue.text(str(state)),
"last_completed_refresh": dg.MetadataValue.text(str(last_refresh)),
},
)

The check passes when scheduling_state is RUNNING or SUSPENDED — both are valid healthy states for a Snowflake Dynamic Table. Any other state (e.g. FAILED) causes the check to fail, making the problem visible in the Dagster asset catalog as a failed check rather than requiring manual inspection of Snowflake.

What you've built

With the sensor and checks in place, you have the complete pattern for incorporating Snowflake-managed objects into a Dagster data platform: Snowflake handles computation and refresh, Dagster provides lineage, automation, and observability. Virtual assets are the bridge — they give Dagster enough information to route automation correctly and surface health signals without ever attempting to execute objects it doesn't control.

For more on virtual assets and the broader family of non-executable assets, see the virtual assets guide.