This commit is contained in:
2025-07-21 10:56:46 +02:00
parent 119583c730
commit 1a3c288001
11 changed files with 36 additions and 61 deletions

View File

View File

@@ -1,3 +0,0 @@
from icecream import install
install()

View File

@@ -1,22 +1,25 @@
import assets
from dagster_polars import PolarsParquetIOManager
from icecream import install
from dagster import Definitions, load_assets_from_modules
from dagster import Definitions, define_asset_job, load_assets_from_modules
install()
# Define a job that includes both assets
# daily_job = define_asset_job("daily_job", selection=[dummy_asset, asset_multi_1, asset_multi_2])
daily_job = define_asset_job(
"daily_job",
selection=[assets.dummy_asset, assets.asset_multi_1, assets.asset_multi_2],
)
definitions = Definitions(
assets=load_assets_from_modules([assets]),
# [dummy_asset], # , asset_single_1, asset_multi_1, asset_single_2, asset_multi_2],
assets=[
asset.with_attributes(
group_names_by_key={asset.key: "other"},
tags_by_key={asset.key: {"app": "other"}},
)
for asset in load_assets_from_modules([assets])
],
resources={"polars_parquet_io_manager": PolarsParquetIOManager()},
# jobs=[daily_job],
# executor=docker_executor.configured({"container_kwargs": {
# "volumes": [
# "/opt/dagster/apps/other/src/:/opt/dagster/home/app/",
# "/opt/dagster/storage/:/storage/"
# ]
# }
# }
# )
jobs=[daily_job],
)

View File

View File

@@ -1,3 +0,0 @@
from icecream import install
install()

View File

@@ -55,11 +55,6 @@ partition_mapping = MultiPartitionMapping(
"partition_by": ["date", "source"],
},
config_schema={"import_dir": Field(str, default_value="/storage/import")},
# tags={
# "dagster/executor": "vinyl_executor",
# "app": "vinyl"
# },
# group_name="vinylllll"
)
def deals(context):
ic()

View File

@@ -1,15 +1,16 @@
from collections.abc import Sequence
from assets import deals, new_deals, works
import assets
from dagster_duckdb import DuckDBIOManager
from dagster_duckdb.io_manager import DbTypeHandler
from dagster_duckdb_pandas import DuckDBPandasTypeHandler
from dagster_polars import PolarsParquetIOManager
from icecream import install
from jobs import check_partititions_job, deals_job, musicbrainz_lookup_job
from schedules import deals_schedule
from sensors import musicbrainz_lookup_sensor
from dagster import Definitions
from dagster import Definitions, load_assets_from_modules
class PandasDuckDBIOManager(DuckDBIOManager):
@@ -18,25 +19,15 @@ class PandasDuckDBIOManager(DuckDBIOManager):
return [DuckDBPandasTypeHandler()]
deals.with_attributes()
assets = []
for asset in [deals, new_deals, works]:
print(asset.tags_by_key)
# for k, v in {
# "dagster/executor": "vinyl_executor",
# "app": "vinyl"
# }.items():
# pass
# asset._specs_by_key[asset.key][k] = v
assets.append(
asset.with_attributes(
tags_by_key={
asset.key: {"dagster/executor": "vinyl_executor", "app": "vinyl"}
}
)
)
install()
definitions = Definitions(
assets=assets,
assets=[
asset.with_attributes(
group_names_by_key={asset.key: "vinyl"},
tags_by_key={asset.key: {"app": "vinyl"}},
)
for asset in load_assets_from_modules([assets])
],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(),
"duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"),
@@ -45,3 +36,5 @@ definitions = Definitions(
schedules=[deals_schedule],
sensors=[musicbrainz_lookup_sensor],
)
ic("jo")

View File

@@ -28,7 +28,7 @@ def test_deals(source="sounds", date: str = None):
resources=resources,
run_config={
"loggers": {"console": {"config": {"log_level": "ERROR"}}},
"ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}},
"ops": {"deals": {"config": {"import_dir": "/storage/import"}}},
},
)
assert result.success

View File

@@ -27,7 +27,6 @@ services:
- /opt/dagster/apps/:/apps/:ro
- /opt/dagster/storage/import/:/storage/import/:ro
- /opt/dagster/storage/deals/:/storage/deals/:rw
# - /opt/dagster/apps/vinyl/src/:/opt/dagster/home/app/
networks:
- dagster
@@ -45,7 +44,6 @@ services:
<<: *dagster_env
DAGSTER_CURRENT_IMAGE: user_code_other
volumes:
# - /opt/dagster/apps/other/src/:/opt/dagster/home/app/
- /opt/dagster/apps/:/apps:ro
networks:
- dagster

View File

@@ -16,15 +16,10 @@ x-dagster-env: &dagster_env
x-volumes: &volumes
volumes:
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml
- /var/run/docker.sock:/var/run/docker.sock
- /opt/dagster/storage/import/:/storage/import/
- /opt/dagster/storage/deals/:/storage/deals/
- /opt/dagster/src/app/:/opt/dagster/home/app/
- /opt/dagster/src/repo.py:/opt/dagster/home/repo.py
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml:ro
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro
- /opt/dagster/storage/:/storage/:rw
- /var/run/docker.sock:/var/run/docker.sock:rw
services:
# This service runs the postgres DB used by dagster for run storage, schedule storage,

View File

@@ -18,14 +18,11 @@ run_launcher:
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB
- PYTHONPATH=app
network: dagster
container_kwargs:
volumes:
- /opt/dagster/storage/import/:/opt/dagster/home/storage/import/
- /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/
- /opt/dagster/apps/vinyl/src/:/opt/dagster/home/app/
- /opt/dagster/storage/:/opt/dagster/home/storage/
- /opt/dagster/apps/:/apps:ro
- /opt/dagster/storage/:/storage/:rw
run_storage:
module: dagster_postgres.run_storage