diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index b76e5ee..dbc2d0c 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -22,7 +22,7 @@ SOURCES = ["plato", "sounds"] logger = structlog.get_logger() daily_partitions_def = dg.DailyPartitionsDefinition( - start_date="2024-09-01", end_offset=1, timezone=os.environ["TZ"] + start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") ) multi_partitions_def = dg.MultiPartitionsDefinition( { @@ -235,13 +235,32 @@ def good_deals( yield dg.Output(Deal.DataFrame(filtered_df)) lines = [] + lines.append( + """ +
+

🎶 New Music Releases

+ """ + ) + + # Each item for data in filtered_df.head(10).iter_rows(named=True): row = SimpleNamespace(**data) - lines.append(f'

NEW

') - lines.append("") + lines.append( + f""" +
+ +

🆕 {row.artist} - {row.title}

+
+ +
+ """ + ) + + # Email footer + lines.append("
") email_service.send_email("\n".join(lines)) diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 09380e0..ba00d5e 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -1,3 +1,5 @@ +import os + import assets from dagster_polars import PolarsParquetIOManager from icecream import install @@ -17,7 +19,9 @@ definitions = dg.Definitions( for asset in dg.load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage/vinyl"), + "polars_parquet_io_manager": PolarsParquetIOManager( + base_dir=os.environ.get("STORAGE_DIR", "/storage") + "/vinyl" + ), "email_service": EmailService( smtp_server=dg.EnvVar("SMTP_SERVER"), smtp_port=dg.EnvVar.int("SMTP_PORT"), diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py index dd03580..8065ea7 100644 --- a/apps/vinyl/src/test.py +++ b/apps/vinyl/src/test.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import Any from assets import cleaned_deals, deals, good_deals, new_deals, works -from definitions import definitions +from dotenv import find_dotenv, load_dotenv from jobs import check_partitions_job import dagster as dg @@ -31,6 +31,9 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None): if __name__ == "__main__": + load_dotenv(find_dotenv()) + from definitions import definitions + run = 6 resources = definitions.resources source = "plato"