send email

This commit is contained in:
2025-07-26 21:15:59 +02:00
parent d55cb8fb17
commit cdf5055e4f
11 changed files with 103 additions and 14 deletions

View File

@@ -1,6 +1,7 @@
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime from datetime import datetime
from glob import glob from glob import glob
from types import SimpleNamespace
import polars as pl import polars as pl
import structlog import structlog
@@ -11,6 +12,7 @@ from plato.parse import parse as parse_plato
from shared.utils import get_partition_keys, load_partitions, parse_partition_keys from shared.utils import get_partition_keys, load_partitions, parse_partition_keys
from sounds.fetch import fetch_deals from sounds.fetch import fetch_deals
from sounds.parse import parse as parse_sounds from sounds.parse import parse as parse_sounds
from utils.email import EmailService
import dagster as dg import dagster as dg
@@ -220,7 +222,7 @@ def new_deals(
automation_condition=dg.AutomationCondition.eager(), automation_condition=dg.AutomationCondition.eager(),
) )
def good_deals( def good_deals(
context: dg.AssetExecutionContext, df: pl.LazyFrame context: dg.AssetExecutionContext, email_service: EmailService, df: pl.LazyFrame
) -> Iterator[dg.Output[Deal.DataFrame]]: ) -> Iterator[dg.Output[Deal.DataFrame]]:
filtered_df = df.filter(pl.col("price") <= 25).collect() filtered_df = df.filter(pl.col("price") <= 25).collect()
num_rows = filtered_df.height num_rows = filtered_df.height
@@ -230,3 +232,15 @@ def good_deals(
context.log.info(f"Good deals found ({num_rows}x)!") context.log.info(f"Good deals found ({num_rows}x)!")
yield dg.Output(Deal.DataFrame(filtered_df)) yield dg.Output(Deal.DataFrame(filtered_df))
lines = []
for data in filtered_df.head(10).iter_rows(named=True):
row = SimpleNamespace(**data)
lines.append(f'<a href="https://www.platomania.nl{row.url}"><h1>NEW</h1></a>')
lines.append("<ul>")
lines.append(f"<li>[artist] {row.artist}</li>")
lines.append(f"<li>[title] {row.title}</li>")
lines.append(f"<li>[price] {row.price}</li>")
lines.append(f"<li>[release] {row.release}</li>")
lines.append("</ul>")
email_service.send_email("\n".join(lines))

View File

@@ -3,6 +3,7 @@ from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from jobs import check_partitions_job, deals_job from jobs import check_partitions_job, deals_job
from schedules import deals_schedule from schedules import deals_schedule
from utils.email import EmailService
import dagster as dg import dagster as dg
@@ -17,6 +18,14 @@ definitions = dg.Definitions(
], ],
resources={ resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage/vinyl"), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage/vinyl"),
"email_service": EmailService(
smtp_server=dg.EnvVar("SMTP_SERVER"),
smtp_port=dg.EnvVar.int("SMTP_PORT"),
smtp_username=dg.EnvVar("SMTP_USERNAME"),
smtp_password=dg.EnvVar("SMTP_PASSWORD"),
sender_email=dg.EnvVar("SENDER_EMAIL"),
receiver_email=dg.EnvVar("RECEIVER_EMAIL"),
),
}, },
jobs=[deals_job, check_partitions_job], jobs=[deals_job, check_partitions_job],
schedules=[deals_schedule], schedules=[deals_schedule],

View File

@@ -1,5 +1,5 @@
import polars as pl import polars as pl
from utils import parse_date from utils.parse import parse_date
def parse(df: pl.LazyFrame) -> pl.LazyFrame: def parse(df: pl.LazyFrame) -> pl.LazyFrame:

View File

@@ -3,7 +3,6 @@ from datetime import datetime
from typing import Any from typing import Any
from assets import cleaned_deals, deals, good_deals, new_deals, works from assets import cleaned_deals, deals, good_deals, new_deals, works
from dagster_polars import PolarsParquetIOManager
from definitions import definitions from definitions import definitions
from jobs import check_partitions_job from jobs import check_partitions_job
@@ -33,11 +32,7 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None):
if __name__ == "__main__": if __name__ == "__main__":
run = 6 run = 6
resources = { resources = definitions.resources
"polars_parquet_io_manager": PolarsParquetIOManager(
base_dir="/opt/dagster/storage/vinyl"
)
}
source = "plato" source = "plato"
match run: match run:

View File

View File

@@ -0,0 +1,57 @@
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import dagster as dg
class EmailService(dg.ConfigurableResource):
"""
Service for sending emails using SMTP.
Attributes:
smtp_server (str): SMTP server address.
smtp_port (int): SMTP server port.
smtp_username (str): SMTP username for authentication.
smtp_password (str): SMTP password for authentication.
sender_email (str): Email address from which the email will be sent.
receiver_email (str): Email address to which the email will be sent.
"""
smtp_server: str = "email-smtp.eu-west-1.amazonaws.com"
smtp_port: int = 587
smtp_username: str
smtp_password: str
sender_email: str
receiver_email: str
def send_email(self, body: str, subject="Aanbieding op plato (new)!") -> None:
msg = MIMEMultipart()
msg["Subject"] = subject
msg["From"] = self.sender_email
msg["To"] = self.receiver_email
# Add HTML content
msg.attach(
MIMEText(
f"""
<html>
<head></head>
<body>
{body}
</body>
</html>
""",
"html",
)
)
# Send
try:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.smtp_username, self.smtp_password)
server.send_message(msg)
except Exception as e:
print("Failed to send email:", e)

View File

@@ -1,9 +1,17 @@
x-dagster-env: &dagster_env x-dagster-env: &dagster_env
TZ: Europe/Amsterdam
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST} DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
DAGSTER_POSTGRES_PORT: ${POSTGRES_PORT} DAGSTER_POSTGRES_PORT: ${POSTGRES_PORT}
DAGSTER_POSTGRES_USER: ${POSTGRES_USER} DAGSTER_POSTGRES_USER: ${POSTGRES_USER}
DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
DAGSTER_POSTGRES_DB: ${POSTGRES_DB} DAGSTER_POSTGRES_DB: ${POSTGRES_DB}
x-email-env: &email_env
SMTP_SERVER: ${SMTP_SERVER}
SMTP_PORT: ${SMTP_PORT}
SMTP_USERNAME: ${SMTP_USERNAME}
SMTP_PASSWORD: ${SMTP_PASSWORD}
SENDER_EMAIL: ${SENDER_EMAIL}
RECEIVER_EMAIL: ${RECEIVER_EMAIL}
services: services:
# This service runs the gRPC server that loads your user code, in both dagit # This service runs the gRPC server that loads your user code, in both dagit
@@ -21,7 +29,7 @@ services:
image: user_code_vinyl image: user_code_vinyl
restart: always restart: always
environment: environment:
<<: *dagster_env <<: [ *dagster_env, *email_env ]
DAGSTER_CURRENT_IMAGE: user_code_vinyl DAGSTER_CURRENT_IMAGE: user_code_vinyl
volumes: volumes:
- /opt/dagster/apps/:/apps/:ro - /opt/dagster/apps/:/apps/:ro
@@ -43,7 +51,7 @@ services:
image: user_code_other image: user_code_other
restart: always restart: always
environment: environment:
<<: *dagster_env <<: [ *dagster_env ]
DAGSTER_CURRENT_IMAGE: user_code_other DAGSTER_CURRENT_IMAGE: user_code_other
volumes: volumes:
- /opt/dagster/apps/:/apps:ro - /opt/dagster/apps/:/apps:ro

3
compose.env.yaml Normal file
View File

@@ -0,0 +1,3 @@
x-aws-env: &aws_env
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}

View File

@@ -4,9 +4,6 @@ x-postgres-env: &postgres_env
POSTGRES_USER: ${POSTGRES_USER} POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB} POSTGRES_DB: ${POSTGRES_DB}
x-aws-env: &aws_env
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
x-dagster-env: &dagster_env x-dagster-env: &dagster_env
TZ: Europe/Amsterdam TZ: Europe/Amsterdam
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST} DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
@@ -76,7 +73,7 @@ services:
container_name: daemon container_name: daemon
restart: on-failure restart: on-failure
environment: environment:
<<: [ *dagster_env, *aws_env ] <<: *dagster_env
<<: *volumes <<: *volumes
networks: networks:
- dagster - dagster

View File

@@ -18,6 +18,12 @@ run_launcher:
- DAGSTER_POSTGRES_USER - DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD - DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB - DAGSTER_POSTGRES_DB
- SMTP_SERVER
- SMTP_PORT
- SMTP_USERNAME
- SMTP_PASSWORD
- SENDER_EMAIL
- RECEIVER_EMAIL
network: dagster network: dagster
container_kwargs: container_kwargs:
volumes: volumes: