import jobs import numpy as np from partitions import ( latitude_partitions_def, location_partitions_def, longitude_partitions_def, ) from utils import coordinate_to_str, latitude_to_str, longitude_to_str import dagster as dg lat_range = np.arange(51.0, 53.01, 0.25, dtype=float) lon_range = np.arange(3.0, 7.01, 0.25, dtype=float) @dg.sensor(job=jobs.raw_weather_job) def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult: """Sensor that emits RunRequests for new 0.25-degree grid locations not yet seen as partitions.""" existing_latitudes = set( context.instance.get_dynamic_partitions(location_partitions_def.name) ) existing_longitudes = set( context.instance.get_dynamic_partitions(location_partitions_def.name) ) existing_coordinates = set( context.instance.get_dynamic_partitions(location_partitions_def.name) ) latitudes = [latitude_to_str(lat) for lat in lat_range] longitudes = [longitude_to_str(lon) for lon in lon_range] locations = [coordinate_to_str(lat, lon) for lat in lat_range for lon in lon_range] new_latitudes = [lat for lat in latitudes if lat not in existing_latitudes] new_longitudes = [lon for lon in longitudes if lon not in existing_longitudes] new_locations = [ location for location in locations if location not in existing_coordinates ] if new_locations: context.log.info(f"Discovered {len(new_locations)} new locations.") # Limit to 3 new locations selected = new_locations[:3] return dg.SensorResult( run_requests=[], # dg.RunRequest(partition_key=location) for location in locations], dynamic_partitions_requests=[ location_partitions_def.build_add_request(selected), latitude_partitions_def.build_add_request(new_latitudes), longitude_partitions_def.build_add_request(new_longitudes), ], ) @dg.sensor(job=jobs.raw_weather_batch_latitude_job) def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult: existing_latitudes = set( context.instance.get_dynamic_partitions(location_partitions_def.name) ) latitudes = [latitude_to_str(lat) for lat in lat_range] new_latitudes = [lat for lat in latitudes if lat not in existing_latitudes] return dg.SensorResult( run_requests=[ dg.RunRequest(partition_key=partition_key) for partition_key in new_latitudes ], dynamic_partitions_requests=[ latitude_partitions_def.build_add_request(new_latitudes) ], ) @dg.sensor(job=jobs.raw_weather_batch_longitude_job) def list_longitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult: existing_longitudes = set( context.instance.get_dynamic_partitions(location_partitions_def.name) ) longitudes = [longitude_to_str(lon) for lon in lon_range] new_longitudes = [lon for lon in longitudes if lon not in existing_longitudes] return dg.SensorResult( run_requests=[ dg.RunRequest(partition_key=partition_key) for partition_key in new_longitudes ], dynamic_partitions_requests=[ longitude_partitions_def.build_add_request(new_longitudes) ], ) @dg.sensor(job=jobs.raw_weather_batch_latitude_job, minimum_interval_seconds=60 * 60) def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult: """ Retrieve weather sensor function. This function monitors and retrieves weather data by evaluating the current dynamic partitions and triggering run requests for each key in the partitions. The function is executed as a sensor with a defined minimum interval. Args: - context (dg.SensorEvaluationContext): The context provided by the sensor framework, allowing access to the instance for retrieving dynamic partitions. Returns: The result of the sensor's evaluation, containing run requests for each existing partition key. """ existing_keys = set( context.instance.get_dynamic_partitions(latitude_partitions_def.name) ) return dg.SensorResult( run_requests=[ dg.RunRequest(partition_key=partition_key) for partition_key in existing_keys ] )