tweaks on shuttle

(cherry picked from commit 38f8830521)
This commit is contained in:
2025-08-06 21:29:36 +02:00
parent 030424e124
commit 968d5c34de
3 changed files with 9 additions and 7 deletions

View File

@@ -155,7 +155,7 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
"date": dg.DimensionPartitionMapping(
dimension_name="date",
partition_mapping=dg.TimeWindowPartitionMapping(
start_offset=-10,
start_offset=-3,
end_offset=0,
allow_nonexistent_upstream_partitions=True,
),
@@ -170,9 +170,9 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
},
output_required=False,
dagster_type=patito_model_to_dagster_type(Deal),
automation_condition=dg.AutomationCondition.on_missing().without(
dg.AutomationCondition.in_latest_time_window()
),
automation_condition=dg.AutomationCondition.on_missing().ignore(
dg.AssetSelection.assets(cleaned_deals.key)
)
)
def new_deals(
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
@@ -259,7 +259,7 @@ def good_deals(
]
# Render HTML from Jinja template
env = Environment(loader=FileSystemLoader(f"/apps/{APP}"))
env = Environment(loader=FileSystemLoader(f"/code/apps/{APP}"))
template = env.get_template("email.html")
html_content = template.render(deals=deals)

View File

@@ -22,9 +22,10 @@ definitions = dg.Definitions(
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
},
sensors=[
# sensors.list_locations,
sensors.list_locations,
sensors.list_latitudes,
# sensors.list_longitudes,
sensors.list_longitudes,
sensors.retrieve_weather,
sensors.retrieve_weather,
],
)

View File

@@ -56,6 +56,7 @@ services:
- /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/logs/:/logs:rw
- /tmp/cache:/cache:rw
networks:
- dagster