feat: improve ingestion incremental strategy
This commit is contained in:
@@ -46,13 +46,13 @@ class FundaSearchConfig(Config):
|
|||||||
class FundaDetailsConfig(Config):
|
class FundaDetailsConfig(Config):
|
||||||
"""Config for listing details fetch."""
|
"""Config for listing details fetch."""
|
||||||
|
|
||||||
fetch_all: bool = True
|
fetch_all: bool = False
|
||||||
|
|
||||||
|
|
||||||
class FundaPriceHistoryConfig(Config):
|
class FundaPriceHistoryConfig(Config):
|
||||||
"""Config for price history fetch."""
|
"""Config for price history fetch."""
|
||||||
|
|
||||||
fetch_all: bool = True
|
fetch_all: bool = False
|
||||||
|
|
||||||
|
|
||||||
@asset(
|
@asset(
|
||||||
@@ -152,13 +152,28 @@ def funda_search_results(
|
|||||||
render_sql(_SQL_DIR, "dml/insert_search_results.sql", schema=_SCHEMA), rows
|
render_sql(_SQL_DIR, "dml/insert_search_results.sql", schema=_SCHEMA), rows
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Mark listings not seen in the last 7 days as inactive.
|
||||||
|
engine = postgres.get_engine()
|
||||||
|
with engine.begin() as conn:
|
||||||
|
result = conn.execute(
|
||||||
|
text(
|
||||||
|
f"UPDATE {_SCHEMA}.search_results"
|
||||||
|
f" SET is_active = FALSE"
|
||||||
|
f" WHERE last_seen_at < now() - INTERVAL '7 days'"
|
||||||
|
f" RETURNING global_id"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
newly_inactive = result.rowcount
|
||||||
|
|
||||||
context.log.info(
|
context.log.info(
|
||||||
f"Inserted {len(rows)} search results into {_SCHEMA}.search_results"
|
f"Inserted {len(rows)} search results into {_SCHEMA}.search_results"
|
||||||
|
f" ({newly_inactive} listings marked inactive)"
|
||||||
)
|
)
|
||||||
|
|
||||||
return MaterializeResult(
|
return MaterializeResult(
|
||||||
metadata={
|
metadata={
|
||||||
"count": len(rows),
|
"count": len(rows),
|
||||||
|
"newly_inactive": newly_inactive,
|
||||||
"location": MetadataValue.text(config.location),
|
"location": MetadataValue.text(config.location),
|
||||||
"offering_type": MetadataValue.text(config.offering_type),
|
"offering_type": MetadataValue.text(config.offering_type),
|
||||||
"preview": MetadataValue.md(
|
"preview": MetadataValue.md(
|
||||||
@@ -207,9 +222,19 @@ def funda_listing_details(
|
|||||||
)
|
)
|
||||||
|
|
||||||
with engine.connect() as conn:
|
with engine.connect() as conn:
|
||||||
result = conn.execute(
|
if config.fetch_all:
|
||||||
text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results")
|
query = text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results")
|
||||||
)
|
else:
|
||||||
|
query = text(
|
||||||
|
f"""
|
||||||
|
SELECT DISTINCT s.global_id
|
||||||
|
FROM {_SCHEMA}.search_results s
|
||||||
|
LEFT JOIN {_SCHEMA}.listing_details d ON s.global_id = d.global_id
|
||||||
|
WHERE s.is_active = TRUE
|
||||||
|
AND (d.global_id IS NULL OR d.is_stale = TRUE)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
result = conn.execute(query)
|
||||||
ids = [row[0] for row in result if row[0]]
|
ids = [row[0] for row in result if row[0]]
|
||||||
|
|
||||||
if not ids:
|
if not ids:
|
||||||
@@ -278,6 +303,21 @@ def funda_listing_details(
|
|||||||
render_sql(_SQL_DIR, "dml/insert_listing_details.sql", schema=_SCHEMA), rows
|
render_sql(_SQL_DIR, "dml/insert_listing_details.sql", schema=_SCHEMA), rows
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Mark details as stale where the parent search listing is no longer active.
|
||||||
|
with engine.begin() as conn:
|
||||||
|
conn.execute(
|
||||||
|
text(
|
||||||
|
f"""
|
||||||
|
UPDATE {_SCHEMA}.listing_details d
|
||||||
|
SET is_stale = TRUE
|
||||||
|
FROM {_SCHEMA}.search_results s
|
||||||
|
WHERE d.global_id = s.global_id
|
||||||
|
AND s.is_active = FALSE
|
||||||
|
AND d.is_stale = FALSE
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
context.log.info(
|
context.log.info(
|
||||||
f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details"
|
f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details"
|
||||||
)
|
)
|
||||||
@@ -332,9 +372,23 @@ def funda_price_history(
|
|||||||
)
|
)
|
||||||
|
|
||||||
with engine.connect() as conn:
|
with engine.connect() as conn:
|
||||||
result = conn.execute(
|
if config.fetch_all:
|
||||||
text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details")
|
query = text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details")
|
||||||
)
|
else:
|
||||||
|
query = text(
|
||||||
|
f"""
|
||||||
|
SELECT DISTINCT d.global_id
|
||||||
|
FROM {_SCHEMA}.listing_details d
|
||||||
|
JOIN {_SCHEMA}.search_results s ON d.global_id = s.global_id
|
||||||
|
WHERE s.is_active = TRUE
|
||||||
|
UNION
|
||||||
|
SELECT DISTINCT d.global_id
|
||||||
|
FROM {_SCHEMA}.listing_details d
|
||||||
|
LEFT JOIN {_SCHEMA}.price_history p ON d.global_id = p.global_id
|
||||||
|
WHERE p.global_id IS NULL
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
result = conn.execute(query)
|
||||||
ids = [row[0] for row in result if row[0]]
|
ids = [row[0] for row in result if row[0]]
|
||||||
|
|
||||||
if not ids:
|
if not ids:
|
||||||
|
|||||||
@@ -37,5 +37,7 @@ CREATE TABLE IF NOT EXISTS {{ schema }}.listing_details (
|
|||||||
saves INT,
|
saves INT,
|
||||||
raw_json JSONB,
|
raw_json JSONB,
|
||||||
ingested_at TIMESTAMPTZ DEFAULT now(),
|
ingested_at TIMESTAMPTZ DEFAULT now(),
|
||||||
|
last_fetched_at TIMESTAMPTZ DEFAULT now(),
|
||||||
|
is_stale BOOLEAN DEFAULT FALSE,
|
||||||
UNIQUE (global_id, status)
|
UNIQUE (global_id, status)
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -19,5 +19,7 @@ CREATE TABLE IF NOT EXISTS {{ schema }}.search_results (
|
|||||||
broker_name TEXT,
|
broker_name TEXT,
|
||||||
raw_json JSONB,
|
raw_json JSONB,
|
||||||
ingested_at TIMESTAMPTZ DEFAULT now(),
|
ingested_at TIMESTAMPTZ DEFAULT now(),
|
||||||
|
last_seen_at TIMESTAMPTZ DEFAULT now(),
|
||||||
|
is_active BOOLEAN DEFAULT TRUE,
|
||||||
UNIQUE (global_id)
|
UNIQUE (global_id)
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -56,4 +56,6 @@ ON CONFLICT (global_id, status) DO UPDATE SET
|
|||||||
views = excluded.views,
|
views = excluded.views,
|
||||||
saves = excluded.saves,
|
saves = excluded.saves,
|
||||||
raw_json = excluded.raw_json,
|
raw_json = excluded.raw_json,
|
||||||
ingested_at = now()
|
ingested_at = now(),
|
||||||
|
last_fetched_at = now(),
|
||||||
|
is_stale = FALSE
|
||||||
|
|||||||
@@ -29,4 +29,6 @@ ON CONFLICT (global_id) DO UPDATE SET
|
|||||||
broker_id = excluded.broker_id,
|
broker_id = excluded.broker_id,
|
||||||
broker_name = excluded.broker_name,
|
broker_name = excluded.broker_name,
|
||||||
raw_json = excluded.raw_json,
|
raw_json = excluded.raw_json,
|
||||||
ingested_at = now()
|
ingested_at = now(),
|
||||||
|
last_seen_at = now(),
|
||||||
|
is_active = TRUE
|
||||||
|
|||||||
Reference in New Issue
Block a user