mount all apps in container
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import pandas as pd
|
||||
import polars as pl
|
||||
|
||||
from dagster import (
|
||||
@@ -22,6 +23,16 @@ partitions_def_multi = MultiPartitionsDefinition(
|
||||
)
|
||||
|
||||
|
||||
@asset(
|
||||
# tags={
|
||||
# "dagster/executor": "other_executor"
|
||||
# },
|
||||
)
|
||||
def dummy_asset():
|
||||
"""A dummy asset to ensure the module is recognized by Dagster."""
|
||||
return pd.DataFrame({"dummy": [1, 2, 3]})
|
||||
|
||||
|
||||
@asset(
|
||||
io_manager_key="polars_parquet_io_manager",
|
||||
partitions_def=partitions_def_single,
|
||||
|
||||
@@ -1,13 +1,22 @@
|
||||
from assets import asset_multi_1, asset_multi_2, asset_single_1, asset_single_2
|
||||
import assets
|
||||
from dagster_polars import PolarsParquetIOManager
|
||||
|
||||
from dagster import Definitions, define_asset_job
|
||||
from dagster import Definitions, load_assets_from_modules
|
||||
|
||||
# Define a job that includes both assets
|
||||
daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2])
|
||||
# daily_job = define_asset_job("daily_job", selection=[dummy_asset, asset_multi_1, asset_multi_2])
|
||||
|
||||
definitions = Definitions(
|
||||
assets=[asset_single_1, asset_multi_1, asset_single_2, asset_multi_2],
|
||||
assets=load_assets_from_modules([assets]),
|
||||
# [dummy_asset], # , asset_single_1, asset_multi_1, asset_single_2, asset_multi_2],
|
||||
resources={"polars_parquet_io_manager": PolarsParquetIOManager()},
|
||||
jobs=[daily_job],
|
||||
# jobs=[daily_job],
|
||||
# executor=docker_executor.configured({"container_kwargs": {
|
||||
# "volumes": [
|
||||
# "/opt/dagster/apps/other/src/:/opt/dagster/home/app/",
|
||||
# "/opt/dagster/storage/:/storage/"
|
||||
# ]
|
||||
# }
|
||||
# }
|
||||
# )
|
||||
)
|
||||
|
||||
@@ -54,9 +54,12 @@ partition_mapping = MultiPartitionMapping(
|
||||
metadata={
|
||||
"partition_by": ["date", "source"],
|
||||
},
|
||||
config_schema={
|
||||
"import_dir": Field(str, default_value="/opt/dagster/home/storage/import")
|
||||
},
|
||||
config_schema={"import_dir": Field(str, default_value="/storage/import")},
|
||||
# tags={
|
||||
# "dagster/executor": "vinyl_executor",
|
||||
# "app": "vinyl"
|
||||
# },
|
||||
# group_name="vinylllll"
|
||||
)
|
||||
def deals(context):
|
||||
ic()
|
||||
|
||||
@@ -18,8 +18,25 @@ class PandasDuckDBIOManager(DuckDBIOManager):
|
||||
return [DuckDBPandasTypeHandler()]
|
||||
|
||||
|
||||
deals.with_attributes()
|
||||
assets = []
|
||||
for asset in [deals, new_deals, works]:
|
||||
print(asset.tags_by_key)
|
||||
# for k, v in {
|
||||
# "dagster/executor": "vinyl_executor",
|
||||
# "app": "vinyl"
|
||||
# }.items():
|
||||
# pass
|
||||
# asset._specs_by_key[asset.key][k] = v
|
||||
assets.append(
|
||||
asset.with_attributes(
|
||||
tags_by_key={
|
||||
asset.key: {"dagster/executor": "vinyl_executor", "app": "vinyl"}
|
||||
}
|
||||
)
|
||||
)
|
||||
definitions = Definitions(
|
||||
assets=[deals, new_deals, works],
|
||||
assets=assets,
|
||||
resources={
|
||||
"polars_parquet_io_manager": PolarsParquetIOManager(),
|
||||
"duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"),
|
||||
|
||||
Reference in New Issue
Block a user