Skip to content

Commit 072dc2a

Browse files
Merge pull request #189 from digitalghost-dev/1.7.1
1.7.1
2 parents 91fc942 + 2338167 commit 072dc2a

26 files changed

+350
-117
lines changed

card_data/pipelines/defs/extract/extract_data.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ def extract_series_data() -> pl.DataFrame:
4242
print(e)
4343
raise
4444

45-
filtered = [s.model_dump(mode="json") for s in validated if s.id in ["swsh", "sv"]]
45+
filtered = [s.model_dump(mode="json") for s in validated if s.id in ["swsh", "sv", "me"]]
4646
return pl.DataFrame(filtered)
4747

4848

49-
@dg.asset(kinds={"API", "Polars", "Pydantic"})
49+
@dg.asset(kinds={"API", "Polars", "Pydantic"}, name="extract_set_data")
5050
def extract_set_data() -> pl.DataFrame:
5151
url_list = [
5252
"https://api.tcgdex.net/v2/en/series/swsh",
53-
"https://api.tcgdex.net/v2/en/series/sv"
53+
"https://api.tcgdex.net/v2/en/series/sv",
54+
"https://api.tcgdex.net/v2/en/series/me",
5455
]
5556

5657
flat: list[dict] = []
@@ -86,11 +87,10 @@ def extract_set_data() -> pl.DataFrame:
8687
return pl.DataFrame([s.model_dump(mode="json") for s in validated])
8788

8889

89-
@dg.asset(kinds={"API"})
90+
@dg.asset(kinds={"API"}, name="extract_card_url_from_set_data")
9091
def extract_card_url_from_set() -> list:
9192
urls = [
92-
"https://api.tcgdex.net/v2/en/sets/sv01",
93-
"https://api.tcgdex.net/v2/en/sets/sv02",
93+
"https://api.tcgdex.net/v2/en/sets/swsh3"
9494
]
9595

9696
all_card_urls = [] # Initialize empty list to collect all URLs
@@ -113,7 +113,7 @@ def extract_card_url_from_set() -> list:
113113
return all_card_urls
114114

115115

116-
@dg.asset(deps=[extract_card_url_from_set], kinds={"API"})
116+
@dg.asset(deps=[extract_card_url_from_set], kinds={"API"}, name="extract_card_info")
117117
def extract_card_info() -> list:
118118
card_url_list = extract_card_url_from_set()
119119
cards_list = []
@@ -124,14 +124,15 @@ def extract_card_info() -> list:
124124
r.raise_for_status()
125125
data = r.json()
126126
cards_list.append(data)
127+
# print(f"Retrieved card: {data['id']} - {data.get('name', 'Unknown')}")
127128
time.sleep(0.1)
128129
except requests.RequestException as e:
129130
print(f"Failed to fetch {url}: {e}")
130131

131132
return cards_list
132133

133134

134-
@dg.asset(deps=[extract_card_info], kinds={"Polars"})
135+
@dg.asset(deps=[extract_card_info], kinds={"Polars"}, name="create_card_dataframe")
135136
def create_card_dataframe() -> pl.DataFrame:
136137
cards_list = extract_card_info()
137138

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
from typing import Optional
2+
3+
import dagster as dg
4+
import polars as pl
5+
import requests
6+
from pydantic import BaseModel, ValidationError
7+
from termcolor import colored
8+
9+
10+
SET_PRODUCT_MATCHING = {
11+
"sv01": "22873",
12+
"sv02": "23120",
13+
}
14+
15+
16+
class CardPricing(BaseModel):
17+
product_id: int
18+
name: str
19+
card_number: str
20+
market_price: Optional[float] = None
21+
22+
23+
def is_card(item: dict) -> bool:
24+
"""Check if item has a 'Number' field in extendedData"""
25+
return any(
26+
data_field.get("name") == "Number"
27+
for data_field in item.get("extendedData", [])
28+
)
29+
30+
31+
def get_card_number(card: dict) -> Optional[str]:
32+
"""Get the card number from extendedData"""
33+
for data_field in card.get("extendedData", []):
34+
if data_field.get("name") == "Number":
35+
return data_field.get("value")
36+
return None
37+
38+
39+
def extract_card_name(full_name: str) -> str:
40+
"""Extract clean card name, removing variant information after dash"""
41+
return full_name.partition("-")[0].strip() if "-" in full_name else full_name
42+
43+
44+
def pull_product_information(set_number: str) -> pl.DataFrame:
45+
"""Pull product and pricing information for a given set number."""
46+
47+
print(colored(" →", "blue"), f"Processing set: {set_number}")
48+
49+
product_id = SET_PRODUCT_MATCHING[set_number]
50+
51+
# Fetch product data
52+
products_url = (f"https://tcgcsv.com/tcgplayer/3/{product_id}/products")
53+
products_data = requests.get(products_url, timeout=30).json()
54+
55+
# Fetch pricing data
56+
prices_url = (f"https://tcgcsv.com/tcgplayer/3/{product_id}/prices")
57+
prices_data = requests.get(prices_url, timeout=30).json()
58+
59+
price_dict = {
60+
price["productId"]: price.get("marketPrice")
61+
for price in prices_data.get("results", [])
62+
}
63+
64+
cards_data = []
65+
for card in products_data.get("results", []):
66+
if not is_card(card):
67+
continue
68+
69+
card_info = {
70+
"product_id": card["productId"],
71+
"name": extract_card_name(card["name"]),
72+
"card_number": get_card_number(card),
73+
"market_price": price_dict.get(card["productId"]),
74+
}
75+
cards_data.append(card_info)
76+
77+
# Pydantic validation
78+
try:
79+
validated: list[CardPricing] = [CardPricing(**card) for card in cards_data]
80+
print(
81+
colored(" ✓", "green"),
82+
f"Pydantic validation passed for {len(validated)} cards.",
83+
)
84+
except ValidationError as e:
85+
print(colored(" ✖", "red"), "Pydantic validation failed.")
86+
print(e)
87+
raise
88+
89+
df_data = [card.model_dump(mode="json") for card in validated]
90+
return pl.DataFrame(df_data)
91+
92+
93+
@dg.asset(kinds={"API", "Polars", "Pydantic"}, name="build_pricing_dataframe")
94+
def build_dataframe() -> pl.DataFrame:
95+
all_cards = []
96+
for set_number in SET_PRODUCT_MATCHING.keys():
97+
df = pull_product_information(set_number)
98+
99+
# Raise error if any DataFrame is empty
100+
if df is None or df.shape[1] == 0 or df.is_empty():
101+
error_msg = f"Empty DataFrame returned for set '{set_number}'. " \
102+
f"Cannot proceed with drop+replace operation to avoid data loss."
103+
print(colored(" ✖", "red"), error_msg)
104+
raise ValueError(error_msg)
105+
106+
all_cards.append(df)
107+
108+
concatenated = pl.concat(all_cards)
109+
print(concatenated)
110+
return concatenated

card_data/pipelines/defs/load/load_data.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import dagster as dg
2+
from dagster import RetryPolicy, Backoff
23
from sqlalchemy.exc import OperationalError
34
from ..extract.extract_data import (
45
extract_series_data,
@@ -11,7 +12,12 @@
1112
from pathlib import Path
1213

1314

14-
@dg.asset(deps=[extract_series_data], kinds={"Supabase", "Postgres"})
15+
@dg.asset(
16+
deps=[extract_series_data],
17+
kinds={"Supabase", "Postgres"},
18+
name="load_series_data",
19+
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL)
20+
)
1521
def load_series_data() -> None:
1622
database_url: str = fetch_secret()
1723
table_name: str = "staging.series"
@@ -23,12 +29,16 @@ def load_series_data() -> None:
2329
)
2430
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
2531
except OperationalError as e:
26-
print(colored(" ✖", "red"), "Error:", e)
32+
print(colored(" ✖", "red"), "Connection error in load_series_data():", e)
33+
raise
2734

2835

29-
@dg.asset(deps=[load_series_data], kinds={"Soda"}, key_prefix=["staging"], name="series")
36+
@dg.asset(
37+
deps=[load_series_data],
38+
kinds={"Soda"},
39+
name="quality_checks_series"
40+
)
3041
def data_quality_check_on_series() -> None:
31-
# Set working directory to where this file is located
3242
current_file_dir = Path(__file__).parent
3343
print(f"Setting cwd to: {current_file_dir}")
3444

@@ -53,8 +63,16 @@ def data_quality_check_on_series() -> None:
5363
if result.stderr:
5464
print(result.stderr)
5565

66+
if result.returncode != 0:
67+
raise Exception(f"Soda data quality checks failed with return code {result.returncode}")
68+
5669

57-
@dg.asset(deps=[extract_set_data], kinds={"Supabase", "Postgres"}, key_prefix=["staging"], name="sets")
70+
@dg.asset(
71+
deps=[extract_set_data],
72+
kinds={"Supabase", "Postgres"},
73+
name="load_set_data",
74+
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL)
75+
)
5876
def load_set_data() -> None:
5977
database_url: str = fetch_secret()
6078
table_name: str = "staging.sets"
@@ -66,10 +84,16 @@ def load_set_data() -> None:
6684
)
6785
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
6886
except OperationalError as e:
69-
print(colored(" ✖", "red"), "Error:", e)
87+
print(colored(" ✖", "red"), "Connection error in load_set_data():", e)
88+
raise
7089

7190

72-
@dg.asset(deps=[create_card_dataframe], kinds={"Supabase", "Postgres"}, key_prefix=["staging"], name="cards")
91+
@dg.asset(
92+
deps=[create_card_dataframe],
93+
kinds={"Supabase", "Postgres"},
94+
name="load_card_data",
95+
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL)
96+
)
7397
def load_card_data() -> None:
7498
database_url: str = fetch_secret()
7599
table_name: str = "staging.cards"
@@ -81,4 +105,5 @@ def load_card_data() -> None:
81105
)
82106
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
83107
except OperationalError as e:
84-
print(colored(" ✖", "red"), "Error:", e)
108+
print(colored(" ✖", "red"), "Connection error in load_card_data():", e)
109+
raise
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import dagster as dg
2+
from dagster import RetryPolicy, Backoff
3+
from sqlalchemy.exc import OperationalError
4+
from ..extract.extract_pricing_data import build_dataframe
5+
from ...utils.secret_retriever import fetch_secret
6+
from termcolor import colored
7+
8+
9+
@dg.asset(
10+
deps=[build_dataframe],
11+
kinds={"Supabase", "Postgres"},
12+
retry_policy=RetryPolicy(max_retries=3, delay=2, backoff=Backoff.EXPONENTIAL),
13+
)
14+
def load_pricing_data() -> None:
15+
database_url: str = fetch_secret()
16+
table_name: str = "staging.pricing_data"
17+
18+
df = build_dataframe()
19+
try:
20+
df.write_database(
21+
table_name=table_name, connection=database_url, if_table_exists="replace"
22+
)
23+
print(colored(" ✓", "green"), f"Data loaded into {table_name}")
24+
except OperationalError as e:
25+
print(colored(" ✖", "red"), "Connection error in load_pricing_data():", e)
26+
raise

card_data/pipelines/defs/transformation/transform_data.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,33 @@
11
import dagster as dg
2-
from dagster_dbt import DbtCliResource, dbt_assets
2+
from dagster_dbt import DbtCliResource, DagsterDbtTranslator, dbt_assets
33
from pathlib import Path
44

55
DBT_PROJECT_PATH = Path(__file__).joinpath("..", "..", "..", "poke_cli_dbt").resolve()
66

7-
@dbt_assets(manifest=DBT_PROJECT_PATH / "target" / "manifest.json")
7+
class CustomDbtTranslator(DagsterDbtTranslator):
8+
def get_asset_key(self, dbt_resource_props):
9+
10+
resource_type = dbt_resource_props["resource_type"]
11+
name = dbt_resource_props["name"]
12+
13+
if resource_type == "source":
14+
# Map staging sources to load assets
15+
source_mapping = {
16+
"series": "quality_checks_series",
17+
"sets": "load_set_data",
18+
"cards": "load_card_data",
19+
"pricing_data": "load_pricing_data",
20+
}
21+
if name in source_mapping:
22+
return dg.AssetKey([source_mapping[name]])
23+
24+
# For models, use default behavior
25+
return super().get_asset_key(dbt_resource_props)
26+
27+
@dbt_assets(
28+
manifest=DBT_PROJECT_PATH / "target" / "manifest.json",
29+
dagster_dbt_translator=CustomDbtTranslator()
30+
)
831
def poke_cli_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
932
"""
1033
dbt assets that transform staging data into final models.
Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
11
{% macro create_relationships() %}
2-
ALTER TABLE {{ target.schema }}.series ADD CONSTRAINT pk_series PRIMARY KEY (id);
3-
ALTER TABLE {{ target.schema }}.sets ADD CONSTRAINT pk_sets PRIMARY KEY (set_id);
4-
ALTER TABLE {{ target.schema }}.cards ADD CONSTRAINT pk_cards PRIMARY KEY (id);
2+
{{ print("Dropping existing constraints...") }}
53

6-
ALTER TABLE public.sets
7-
ADD CONSTRAINT fk_sets_series
8-
FOREIGN KEY (series_id)
9-
REFERENCES public.series (id);
4+
-- Drop existing constraints if they exist (in reverse dependency order)
5+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards DROP CONSTRAINT IF EXISTS fk_cards_sets") %}
6+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets DROP CONSTRAINT IF EXISTS fk_sets_series") %}
7+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards DROP CONSTRAINT IF EXISTS pk_cards") %}
8+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets DROP CONSTRAINT IF EXISTS pk_sets") %}
9+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".series DROP CONSTRAINT IF EXISTS pk_series") %}
1010

11-
ALTER TABLE public.cards
12-
ADD CONSTRAINT fk_cards_sets
13-
FOREIGN KEY (set_id)
14-
REFERENCES public.sets (set_id);
11+
{{ print("Adding primary keys...") }}
1512

13+
-- Add primary keys
14+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".series ADD CONSTRAINT pk_series PRIMARY KEY (id)") %}
15+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets ADD CONSTRAINT pk_sets PRIMARY KEY (set_id)") %}
16+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards ADD CONSTRAINT pk_cards PRIMARY KEY (id)") %}
17+
18+
{{ print("Adding foreign keys...") }}
19+
20+
-- Add foreign keys
21+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".sets ADD CONSTRAINT fk_sets_series FOREIGN KEY (series_id) REFERENCES " ~ target.schema ~ ".series (id)") %}
22+
{% do run_query("ALTER TABLE " ~ target.schema ~ ".cards ADD CONSTRAINT fk_cards_sets FOREIGN KEY (set_id) REFERENCES " ~ target.schema ~ ".sets (set_id)") %}
23+
24+
{{ print("Relationships created successfully") }}
25+
26+
{% do return('') %}
1627
{% endmacro %}

card_data/pipelines/poke_cli_dbt/models/cards.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
post_hook="{{ enable_rls() }}"
44
) }}
55

6-
SELECT id, image, name, "localId", category, hp
6+
SELECT id, set_id, image, name, "localId", category, hp, "set_cardCount_official", set_name
77
FROM {{ source('staging', 'cards') }}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{{ config(
2+
materialized='table',
3+
post_hook="{{ enable_rls() }}"
4+
) }}
5+
6+
SELECT product_id, name, card_number, market_price
7+
FROM {{ source('staging', 'pricing_data') }}

card_data/pipelines/poke_cli_dbt/models/sources.yml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,16 @@ sources:
9292
- name: attack_2_effect
9393
description: "Second attack effect"
9494
- name: attack_2_cost
95-
description: "Second attack energy cost"
95+
description: "Second attack energy cost"
96+
97+
- name: pricing_data
98+
description: "Card pricing data"
99+
columns:
100+
- name: product_id
101+
description: "Product ID"
102+
- name: name
103+
description: "Card name"
104+
- name: card_number
105+
description: "Card number"
106+
- name: market_price
107+
description: "Market price"

0 commit comments

Comments
 (0)