Files
dagster/apps/vinyl/src/test.py

76 lines
2.2 KiB
Python

import logging
from datetime import datetime
from typing import Any
from assets import cleaned_deals, deals, good_deals, new_deals, works
from dotenv import find_dotenv, load_dotenv
from jobs import check_partitions_job
import dagster as dg
logging.getLogger().setLevel(logging.INFO)
def today_str():
"""Returns today's date as a string in the format YYYY-MM-DD."""
return datetime.today().strftime("%Y-%m-%d")
def test_deals(resources: dict[str, Any], source="sounds", date: str = None):
result = dg.materialize(
assets=definitions.assets,
selection=[deals.key],
partition_key=f"{date or today_str()}|{source}",
resources=resources,
run_config={
"loggers": {"console": {"config": {"log_level": "ERROR"}}},
"ops": {"deals": {"config": {"import_dir": "/storage/import"}}},
},
)
assert result.success
if __name__ == "__main__":
load_dotenv(find_dotenv())
from definitions import definitions
run = 6
resources = definitions.resources
source = "plato"
match run:
case 1:
check_partitions_job.execute_in_process(resources=resources)
case 2:
test_deals(resources, source=source)
case 3:
dg.materialize(
assets=definitions.assets,
selection=[cleaned_deals.key],
partition_key=f"{today_str()}|{source}",
resources=resources,
)
case 4:
dg.materialize(
assets=definitions.assets,
selection=[works.key],
resources=resources,
)
case 5:
dg.materialize(
assets=definitions.assets,
selection=[new_deals.key],
partition_key=f"{today_str()}|{source}",
resources=resources,
)
case 6:
dg.materialize(
assets=definitions.assets,
selection=[good_deals.key],
partition_key=f"{today_str()}",
resources=resources,
)
case _:
raise ValueError(f"Invalid run number: {run}!")