delete old files
This commit is contained in:
@@ -1,22 +0,0 @@
|
|||||||
from typing import Any, Optional
|
|
||||||
|
|
||||||
|
|
||||||
class MyIOManager(PolarsParquetIOManager):
|
|
||||||
def _load_partition_from_path(
|
|
||||||
self,
|
|
||||||
context: InputContext,
|
|
||||||
partition_key: str,
|
|
||||||
path: "UPath",
|
|
||||||
backcompat_path: Optional["UPath"] = None,
|
|
||||||
) -> Any:
|
|
||||||
try:
|
|
||||||
return super()._load_partition_from_path(
|
|
||||||
context, partition_key, path, backcompat_path
|
|
||||||
)
|
|
||||||
except FileNotFoundError:
|
|
||||||
# Handle the case where the partition file does not exist
|
|
||||||
context.log.warning(
|
|
||||||
f"Partition file not found for key {partition_key} at path {path}. "
|
|
||||||
"Returning an empty DataFrame."
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
@@ -1,51 +0,0 @@
|
|||||||
import os
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from dotenv import find_dotenv, load_dotenv
|
|
||||||
from icecream import ic
|
|
||||||
|
|
||||||
from dagster import AssetKey, DagsterInstance
|
|
||||||
|
|
||||||
|
|
||||||
def delete_partition(instance, partition_def_name, partition_key):
|
|
||||||
try:
|
|
||||||
# This does not seem to work, perhaps because it is not a dynamic partition?
|
|
||||||
# All materializations can be deleted through the UI, but not one by one
|
|
||||||
instance.delete_dynamic_partition(partition_def_name, partition_key)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error deleting partition: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def main(instance):
|
|
||||||
print(f"Partition '{partition_key}' deleted successfully.")
|
|
||||||
|
|
||||||
|
|
||||||
def detect_previous_partition(instance, name):
|
|
||||||
ic(name)
|
|
||||||
records = instance.get_latest_materialization_events(
|
|
||||||
(AssetKey(name),),
|
|
||||||
# event_type="ASSET_MATERIALIZATION",
|
|
||||||
# asset_key=(partition_key,),
|
|
||||||
# limit=100,
|
|
||||||
)
|
|
||||||
print(records)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
partition_def_name = "asset_single_1"
|
|
||||||
partition_key = "2025-07-20" # Example partition key
|
|
||||||
|
|
||||||
load_dotenv(find_dotenv())
|
|
||||||
os.environ["DAGSTER_HOME"] = str(Path(__file__).parent.parent.parent)
|
|
||||||
|
|
||||||
for k, v in os.environ.items():
|
|
||||||
if k.startswith("POSTGRES_"):
|
|
||||||
os.environ[f"DAGSTER_{k}"] = v
|
|
||||||
|
|
||||||
os.environ["DAGSTER_POSTGRES_HOST"] = "localhost"
|
|
||||||
instance = DagsterInstance.get()
|
|
||||||
|
|
||||||
# delete_partition(instance, partition_def_name, partition_key)
|
|
||||||
|
|
||||||
detect_previous_partition(instance, partition_def_name)
|
|
||||||
@@ -1,8 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
rsync -av /opt/dagster/src/app/vinyl/ \
|
|
||||||
/Volumes/dagster/src/app/vinyl/ \
|
|
||||||
--include='*.py' \
|
|
||||||
--include='*requirements.txt' \
|
|
||||||
--exclude='__pycache__/' \
|
|
||||||
-progress \
|
|
||||||
--delete $*
|
|
||||||
Reference in New Issue
Block a user