diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py
index d684dc1..d74720e 100644
--- a/apps/vinyl/src/assets.py
+++ b/apps/vinyl/src/assets.py
@@ -1,6 +1,7 @@
from collections.abc import Iterator
from datetime import datetime
from glob import glob
+from types import SimpleNamespace
import polars as pl
import structlog
@@ -11,6 +12,7 @@ from plato.parse import parse as parse_plato
from shared.utils import get_partition_keys, load_partitions, parse_partition_keys
from sounds.fetch import fetch_deals
from sounds.parse import parse as parse_sounds
+from utils.email import EmailService
import dagster as dg
@@ -220,7 +222,7 @@ def new_deals(
automation_condition=dg.AutomationCondition.eager(),
)
def good_deals(
- context: dg.AssetExecutionContext, df: pl.LazyFrame
+ context: dg.AssetExecutionContext, email_service: EmailService, df: pl.LazyFrame
) -> Iterator[dg.Output[Deal.DataFrame]]:
filtered_df = df.filter(pl.col("price") <= 25).collect()
num_rows = filtered_df.height
@@ -230,3 +232,15 @@ def good_deals(
context.log.info(f"Good deals found ({num_rows}x)!")
yield dg.Output(Deal.DataFrame(filtered_df))
+
+ lines = []
+ for data in filtered_df.head(10).iter_rows(named=True):
+ row = SimpleNamespace(**data)
+ lines.append(f'NEW
')
+ lines.append("
")
+ lines.append(f"- [artist] {row.artist}
")
+ lines.append(f"- [title] {row.title}
")
+ lines.append(f"- [price] {row.price}
")
+ lines.append(f"- [release] {row.release}
")
+ lines.append("
")
+ email_service.send_email("\n".join(lines))
diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py
index 5e4fe12..09380e0 100644
--- a/apps/vinyl/src/definitions.py
+++ b/apps/vinyl/src/definitions.py
@@ -3,6 +3,7 @@ from dagster_polars import PolarsParquetIOManager
from icecream import install
from jobs import check_partitions_job, deals_job
from schedules import deals_schedule
+from utils.email import EmailService
import dagster as dg
@@ -17,6 +18,14 @@ definitions = dg.Definitions(
],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage/vinyl"),
+ "email_service": EmailService(
+ smtp_server=dg.EnvVar("SMTP_SERVER"),
+ smtp_port=dg.EnvVar.int("SMTP_PORT"),
+ smtp_username=dg.EnvVar("SMTP_USERNAME"),
+ smtp_password=dg.EnvVar("SMTP_PASSWORD"),
+ sender_email=dg.EnvVar("SENDER_EMAIL"),
+ receiver_email=dg.EnvVar("RECEIVER_EMAIL"),
+ ),
},
jobs=[deals_job, check_partitions_job],
schedules=[deals_schedule],
diff --git a/apps/vinyl/src/sounds/parse.py b/apps/vinyl/src/sounds/parse.py
index 6fe3131..3c72904 100644
--- a/apps/vinyl/src/sounds/parse.py
+++ b/apps/vinyl/src/sounds/parse.py
@@ -1,5 +1,5 @@
import polars as pl
-from utils import parse_date
+from utils.parse import parse_date
def parse(df: pl.LazyFrame) -> pl.LazyFrame:
diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py
index 3e8eb2e..dd03580 100644
--- a/apps/vinyl/src/test.py
+++ b/apps/vinyl/src/test.py
@@ -3,7 +3,6 @@ from datetime import datetime
from typing import Any
from assets import cleaned_deals, deals, good_deals, new_deals, works
-from dagster_polars import PolarsParquetIOManager
from definitions import definitions
from jobs import check_partitions_job
@@ -33,11 +32,7 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None):
if __name__ == "__main__":
run = 6
- resources = {
- "polars_parquet_io_manager": PolarsParquetIOManager(
- base_dir="/opt/dagster/storage/vinyl"
- )
- }
+ resources = definitions.resources
source = "plato"
match run:
diff --git a/apps/vinyl/src/utils/__init__.py b/apps/vinyl/src/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/apps/vinyl/src/utils/email.py b/apps/vinyl/src/utils/email.py
new file mode 100644
index 0000000..4ea7c99
--- /dev/null
+++ b/apps/vinyl/src/utils/email.py
@@ -0,0 +1,57 @@
+import smtplib
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+import dagster as dg
+
+
+class EmailService(dg.ConfigurableResource):
+ """
+ Service for sending emails using SMTP.
+
+ Attributes:
+ smtp_server (str): SMTP server address.
+ smtp_port (int): SMTP server port.
+ smtp_username (str): SMTP username for authentication.
+ smtp_password (str): SMTP password for authentication.
+ sender_email (str): Email address from which the email will be sent.
+ receiver_email (str): Email address to which the email will be sent.
+
+ """
+
+ smtp_server: str = "email-smtp.eu-west-1.amazonaws.com"
+ smtp_port: int = 587
+ smtp_username: str
+ smtp_password: str
+ sender_email: str
+ receiver_email: str
+
+ def send_email(self, body: str, subject="Aanbieding op plato (new)!") -> None:
+ msg = MIMEMultipart()
+ msg["Subject"] = subject
+ msg["From"] = self.sender_email
+ msg["To"] = self.receiver_email
+
+ # Add HTML content
+ msg.attach(
+ MIMEText(
+ f"""
+
+
+
+ {body}
+
+
+ """,
+ "html",
+ )
+ )
+
+ # Send
+ try:
+ with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
+ server.starttls()
+ server.login(self.smtp_username, self.smtp_password)
+ server.send_message(msg)
+ except Exception as e:
+ print("Failed to send email:", e)
diff --git a/apps/vinyl/src/utils.py b/apps/vinyl/src/utils/parse.py
similarity index 100%
rename from apps/vinyl/src/utils.py
rename to apps/vinyl/src/utils/parse.py
diff --git a/compose.code.yaml b/compose.code.yaml
index 189f6b1..949d10e 100644
--- a/compose.code.yaml
+++ b/compose.code.yaml
@@ -1,9 +1,17 @@
x-dagster-env: &dagster_env
+ TZ: Europe/Amsterdam
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
DAGSTER_POSTGRES_PORT: ${POSTGRES_PORT}
DAGSTER_POSTGRES_USER: ${POSTGRES_USER}
DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
DAGSTER_POSTGRES_DB: ${POSTGRES_DB}
+x-email-env: &email_env
+ SMTP_SERVER: ${SMTP_SERVER}
+ SMTP_PORT: ${SMTP_PORT}
+ SMTP_USERNAME: ${SMTP_USERNAME}
+ SMTP_PASSWORD: ${SMTP_PASSWORD}
+ SENDER_EMAIL: ${SENDER_EMAIL}
+ RECEIVER_EMAIL: ${RECEIVER_EMAIL}
services:
# This service runs the gRPC server that loads your user code, in both dagit
@@ -21,7 +29,7 @@ services:
image: user_code_vinyl
restart: always
environment:
- <<: *dagster_env
+ <<: [ *dagster_env, *email_env ]
DAGSTER_CURRENT_IMAGE: user_code_vinyl
volumes:
- /opt/dagster/apps/:/apps/:ro
@@ -43,7 +51,7 @@ services:
image: user_code_other
restart: always
environment:
- <<: *dagster_env
+ <<: [ *dagster_env ]
DAGSTER_CURRENT_IMAGE: user_code_other
volumes:
- /opt/dagster/apps/:/apps:ro
diff --git a/compose.env.yaml b/compose.env.yaml
new file mode 100644
index 0000000..c8c7b6d
--- /dev/null
+++ b/compose.env.yaml
@@ -0,0 +1,3 @@
+x-aws-env: &aws_env
+ AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
+ AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
diff --git a/compose.system.yaml b/compose.system.yaml
index 7c973a9..8d829b0 100644
--- a/compose.system.yaml
+++ b/compose.system.yaml
@@ -4,9 +4,6 @@ x-postgres-env: &postgres_env
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
-x-aws-env: &aws_env
- AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
x-dagster-env: &dagster_env
TZ: Europe/Amsterdam
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
@@ -76,7 +73,7 @@ services:
container_name: daemon
restart: on-failure
environment:
- <<: [ *dagster_env, *aws_env ]
+ <<: *dagster_env
<<: *volumes
networks:
- dagster
diff --git a/dagster.yaml b/dagster.yaml
index 3cd086f..c750ab3 100644
--- a/dagster.yaml
+++ b/dagster.yaml
@@ -18,6 +18,12 @@ run_launcher:
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB
+ - SMTP_SERVER
+ - SMTP_PORT
+ - SMTP_USERNAME
+ - SMTP_PASSWORD
+ - SENDER_EMAIL
+ - RECEIVER_EMAIL
network: dagster
container_kwargs:
volumes: