How Modern AI Platforms Move From Guesswork to Verified Truth
Architecting the Intelligence Layer:
Artificial intelligence is entering a new era—one defined not by model size or benchmark scores, but by trust, verification, and economic sustainability.
The companies that win the next decade won't be the ones with the flashiest demos. They'll be the ones who build intelligence layers that can withstand the messy reality of production systems, regulatory scrutiny, and user expectations.
This editorial explores what it actually takes to build such a system—not in theory, but in practice.
To keep it grounded, we'll follow a single real-world scenario: a global commerce marketplace struggling with conflicting product data across 12 million SKUs.
This is where the Intelligence Layer becomes more than an idea. It becomes survival.
Part One: The Problem
A Marketplace Drowning in "Probably Correct"
Mercury Commerce operates a massive multi-category B2B marketplace. Every day, thousands of sellers upload product data—often incomplete, inconsistent, or outright contradictory.
The symptoms showed up everywhere:
A procurement manager at a Fortune 500 company ordered 2,000 units of "USB-C compatible" docking stations for a corporate laptop refresh. The docking stations arrived. They were USB-A only. The LLM powering Mercury's enrichment pipeline had inferred "USB-C compatible" from the phrase "works with modern laptops."
That invoice dispute cost Mercury $847,000 and a customer relationship that had taken six years to build.
The support team was drowning in tickets:
- "This laptop didn't have the advertised GPU."
- "Two identical products show different specs."
- "Search results for '4K 144Hz monitor' are all over the place."
Mercury tried the obvious fix: plug in a large language model to "clean" the data.
It helped... until it didn't.
The LLM confidently hallucinated:
- "Waterproof" where the product was only splash-resistant
- "Compatible with M2" where it wasn't
- "Stainless steel" where the listing only said "stainless steel color"
This is the moment Mercury realized:
Probabilistic AI is not enough. They needed verified truth.
Part Two: The Architecture
Building the Intelligence Layer
The Intelligence Layer is built on two foundations:
- A Commerce Knowledge Graph (CKG)
- A Neuro-Symbolic Intelligence Engine
Together, they transform messy seller feeds into verified product truth.
2.1 The Commerce Knowledge Graph
The first architectural decision was deceptively simple: stop treating product data as text blobs.
Mercury's legacy system stored product information as a JSON document per listing. When sellers uploaded conflicting data, the system just... kept all of it. Search indexed everything. Customers saw whatever the ranking algorithm surfaced first.
The Commerce Knowledge Graph inverted this model. Instead of documents, they modeled:
- Entities: The actual things (products, brands, categories, sellers)
- Claims: What various sources assert about those entities
- Provenance: Where each claim came from and how trustworthy that source is
- Verified Attributes: The resolved truth after adjudication
Here's what this looks like in practice:
from dataclasses import dataclass, field
from typing import Any, Literal, Optional
from datetime import datetime
from enum import Enum
import hashlib
class SourceTier(Enum):
"""
Trust hierarchy for data sources.
This took 6 months to calibrate correctly.
"""
OEM = "oem" # Manufacturer feeds - highest trust
AUTHORIZED = "authorized" # Authorized resellers
TRUSTED_SELLER = "trusted" # Sellers with >98% accuracy history
STANDARD_SELLER = "standard" # Regular marketplace sellers
UNKNOWN = "unknown" # New or unverified sources
class ExtractionMethod(Enum):
"""How was this data obtained?"""
DIRECT_FEED = "direct_feed" # Structured data from source
NEURAL = "neural" # LLM/ML extraction
RULE_DERIVED = "rule_derived" # Inferred from symbolic rules
HUMAN_VERIFIED = "human" # Manual verification
CROSS_REFERENCE = "cross_ref" # Validated against external source
@dataclass
class Claim:
"""
A single assertion about a product attribute from a specific source.
This is the atomic unit of the knowledge graph.
Everything flows from claims.
"""
attribute_name: str
value: Any
source_id: str
source_tier: SourceTier
extraction_method: ExtractionMethod
raw_text: Optional[str] # Original text this was extracted from
confidence: float # 0.0 to 1.0
timestamp: datetime
claim_id: str = field(default_factory=lambda: "")
def __post_init__(self):
# Generate deterministic claim ID for deduplication
content = f"{self.attribute_name}:{self.value}:{self.source_id}"
self.claim_id = hashlib.sha256(content.encode()).hexdigest()[:16]
@dataclass
class VerifiedAttribute:
"""
The resolved truth for a single attribute after claim adjudication.
This is what gets shown to customers.
"""
attribute_name: str
value: Any
truth_score: float # Final confidence after resolution
winning_claim_id: str # Which claim won
supporting_claims: list[str] # Claim IDs that agreed
conflicting_claims: list[str] # Claim IDs that disagreed
resolution_method: str # How conflict was resolved
last_verified: datetime
needs_review: bool = False # Flag for human-in-the-loop
@dataclass
class ProductEntity:
"""
A canonical product in the knowledge graph.
Instead of 847 conflicting listings for "AirPods Pro",
we have ONE entity with multiple claims and resolved truth.
"""
canonical_id: str
product_type: str
# All claims from all sources - the messy reality
claims: dict[str, list[Claim]] = field(default_factory=dict)
# Resolved truth - what we actually show to users
verified_attributes: dict[str, VerifiedAttribute] = field(default_factory=dict)
# Linking
seller_listings: list[str] = field(default_factory=list)
category_path: list[str] = field(default_factory=list)
# Metadata
created_at: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
verification_status: Literal["unverified", "partial", "verified"] = "unverified"
def add_claim(self, claim: Claim) -> None:
"""Add a new claim, maintaining history."""
if claim.attribute_name not in self.claims:
self.claims[claim.attribute_name] = []
existing_ids = [c.claim_id for c in self.claims[claim.attribute_name]]
if claim.claim_id not in existing_ids:
self.claims[claim.attribute_name].append(claim)
self.last_updated = datetime.now()
def get_claim_consensus(self, attribute_name: str) -> dict:
"""Analyze agreement across claims for an attribute."""
if attribute_name not in self.claims:
return {"status": "no_claims", "values": []}
claims = self.claims[attribute_name]
value_counts = {}
value_weights = {}
for claim in claims:
val = str(claim.value)
weight = claim.confidence * self._tier_weight(claim.source_tier)
if val not in value_counts:
value_counts[val] = 0
value_weights[val] = 0.0
value_counts[val] += 1
value_weights[val] += weight
return {
"status": "consensus" if len(value_counts) == 1 else "conflict",
"values": value_counts,
"weighted_scores": value_weights,
"total_claims": len(claims)
}
def _tier_weight(self, tier: SourceTier) -> float:
weights = {
SourceTier.OEM: 1.0,
SourceTier.AUTHORIZED: 0.9,
SourceTier.TRUSTED_SELLER: 0.7,
SourceTier.STANDARD_SELLER: 0.4,
SourceTier.UNKNOWN: 0.2
}
return weights.get(tier, 0.2)The "One Product, Many Listings" Problem
Here's a real scenario Mercury faced. Search their catalog for "AirPods Pro" and you'd find:
- 847 active listings
- 14 distinct claim sets for basic attributes like "battery life"
- 6 different values for "noise cancellation type" (Active, ANC, Yes, True, Adaptive, "Apple ANC")
The old system showed users a random sampling of this chaos. The CKG consolidates it:
def consolidate_listings_to_entity(
listings: list[dict],
entity_type: str
) -> ProductEntity:
"""
Transform multiple seller listings into a single canonical entity.
This is the core 'many-to-one' resolution that makes the CKG work.
"""
# Step 1: Generate canonical ID from invariant attributes
canonical_id = generate_canonical_id(listings, entity_type)
entity = ProductEntity(
canonical_id=canonical_id,
product_type=entity_type
)
# Step 2: Extract claims from each listing
for listing in listings:
source_tier = assess_source_tier(listing["seller_id"])
for attr_name, attr_value in listing["attributes"].items():
claim = Claim(
attribute_name=normalize_attribute_name(attr_name),
value=normalize_attribute_value(attr_name, attr_value),
source_id=listing["listing_id"],
source_tier=source_tier,
extraction_method=ExtractionMethod.DIRECT_FEED,
raw_text=listing.get("raw_description"),
confidence=0.8 if source_tier == SourceTier.OEM else 0.6,
timestamp=datetime.now()
)
entity.add_claim(claim)
entity.seller_listings.append(listing["listing_id"])
return entity
def normalize_attribute_name(raw_name: str) -> str:
"""
Map the chaos of seller attribute names to canonical names.
This mapping table grew to 12,000 entries over 18 months.
Mercury's team thought this would be a "one-time setup task."
They budgeted two weeks. They were very wrong.
"""
canonical_map = {
# Noise cancellation variants
"noise cancellation": "noise_cancelling",
"noise cancelling": "noise_cancelling",
"anc": "noise_cancelling",
"active noise cancellation": "noise_cancelling",
"nc": "noise_cancelling",
"noise_cancel": "noise_cancelling",
# Battery variants
"battery life": "battery_life_hours",
"battery": "battery_life_hours",
"playtime": "battery_life_hours",
"battery hours": "battery_life_hours",
"listening time": "battery_life_hours",
# ... thousands more
}
normalized = raw_name.lower().strip()
return canonical_map.get(normalized, normalized)
def normalize_attribute_value(attr_name: str, raw_value: Any) -> Any:
"""
Normalize values to canonical representations.
'30 hours', '30h', '30 hrs', 'up to 30 hours' all become: 30
"""
if attr_name in ["battery_life_hours", "playtime"]:
return extract_numeric_hours(raw_value)
if attr_name == "noise_cancelling":
return normalize_boolean(raw_value)
if attr_name == "bluetooth_version":
return extract_bluetooth_version(raw_value)
return raw_value
def extract_numeric_hours(value: Any) -> Optional[float]:
"""Extract numeric hour values from messy text."""
import re
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
patterns = [
r"(\d+(?:\.\d+)?)\s*(?:hours?|hrs?|h)\b",
r"up to (\d+(?:\.\d+)?)",
r"^(\d+(?:\.\d+)?)$"
]
for pattern in patterns:
match = re.search(pattern, value.lower())
if match:
return float(match.group(1))
return None
def normalize_boolean(value: Any) -> Optional[bool]:
"""
The many ways sellers say 'yes' and 'no'.
We found 47 distinct representations of boolean values in production.
"""
if isinstance(value, bool):
return value
if isinstance(value, str):
truthy = {"yes", "true", "1", "active", "anc", "enabled", "on", "y"}
falsy = {"no", "false", "0", "none", "disabled", "off", "n", "passive"}
normalized = value.lower().strip()
if normalized in truthy:
return True
if normalized in falsy:
return False
return None2.2 The Neuro-Symbolic Intelligence Engine
The Knowledge Graph gives you structure. But structure without intelligence is just a very organized mess.
Mercury needed a system that could:
- Extract attributes from unstructured text (neural)
- Validate those attributes against known constraints (symbolic)
- Resolve conflicts when sources disagreed (hybrid)
- Learn from corrections without catastrophic forgetting (adaptive)
They built the Neuro-Symbolic Intelligence Engine as a three-layer pipeline.
Layer 1: Neural Extraction
The first layer uses transformer models to extract candidate attributes from raw listing data.
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class NeuralExtraction:
"""Output from the neural extraction layer."""
attribute_name: str
extracted_value: Any
confidence: float
source_span: Optional[tuple[int, int]] # Character positions in source text
extraction_reasoning: Optional[str] # Chain-of-thought for debugging
class NeuralExtractor:
"""
Production neural extraction with cost controls.
Key insight: We use different models for different confidence requirements.
- High-value attributes (price, compatibility): Large model, high cost
- Standard attributes (color, weight): Distilled model, low cost
- Validation checks: Embedding similarity, near-zero cost
This single decision reduced extraction costs by 67%.
"""
def __init__(self, config: dict):
self.large_model = config["large_model"] # GPT-4 / Claude Opus class
self.small_model = config["small_model"] # Distilled/fine-tuned
self.embedding_model = config["embedding"] # For similarity
# Attribute routing rules
self.high_value_attributes = {
"compatibility", "voltage", "wattage", "certification",
"warranty_terms", "safety_rating", "medical_grade"
}
# Cost tracking
self.extraction_costs = []
def extract_attributes(
self,
listing: dict,
product_type: str
) -> list[NeuralExtraction]:
"""
Extract all relevant attributes from a listing.
Routes to appropriate model based on attribute importance.
"""
schema = self._get_product_schema(product_type)
extractions = []
# Batch low-value attributes for efficiency
low_value_attrs = [
attr for attr in schema["attributes"]
if attr not in self.high_value_attributes
]
if low_value_attrs:
batch_results = self._batch_extract_small_model(
listing, low_value_attrs
)
extractions.extend(batch_results)
# Process high-value attributes individually with large model
high_value_attrs = [
attr for attr in schema["attributes"]
if attr in self.high_value_attributes
]
for attr in high_value_attrs:
result = self._extract_single_large_model(listing, attr)
if result:
extractions.append(result)
return extractions
def _batch_extract_small_model(
self,
listing: dict,
attributes: list[str]
) -> list[NeuralExtraction]:
"""
Extract multiple attributes in a single call using distilled model.
Cost: ~$0.0002 per listing
"""
prompt = f"""
Extract these attributes from the product listing: {', '.join(attributes)}
Title: {listing.get('title', '')}
Description: {listing.get('description', '')[:1000]}
Return JSON array with objects containing:
- attribute: name
- value: extracted value or null
- confidence: 0.0-1.0
Be literal. Do not infer. If not explicitly stated, use null.
"""
response = self.small_model.complete(
prompt=prompt,
max_tokens=500,
temperature=0.1
)
self.extraction_costs.append({
"model": "small",
"tokens": response.usage.total_tokens,
"cost": response.usage.total_tokens * 0.0000004
})
return self._parse_batch_response(response.text, attributes)
def _extract_single_large_model(
self,
listing: dict,
attribute: str
) -> Optional[NeuralExtraction]:
"""
Extract high-stakes attribute with chain-of-thought reasoning.
Cost: ~$0.008 per attribute
We only use this for attributes where being wrong has real consequences.
"""
prompt = f"""
Extract the {attribute} from this product listing.
LISTING:
Title: {listing.get('title', '')}
Description: {listing.get('description', '')}
Specifications: {json.dumps(listing.get('specs', {}), indent=2)}
INSTRUCTIONS:
1. First, identify all text spans that mention {attribute}
2. Analyze each mention for reliability (explicit statement vs inference)
3. If multiple values exist, note the conflict
4. Provide your extraction with confidence score
CRITICAL: Do not infer. Only extract what is explicitly stated.
"Works with modern laptops" does NOT mean "USB-C compatible".
Respond in JSON format:
{{
"mentions": [
{{"text": "...", "type": "explicit|inferred", "value": "..."}}
],
"conflicts": true|false,
"final_value": "...",
"confidence": 0.0-1.0,
"reasoning": "..."
}}
"""
response = self.large_model.complete(
prompt=prompt,
max_tokens=800,
temperature=0.2
)
self.extraction_costs.append({
"model": "large",
"tokens": response.usage.total_tokens,
"cost": response.usage.total_tokens * 0.00003
})
parsed = json.loads(response.text)
return NeuralExtraction(
attribute_name=attribute,
extracted_value=parsed["final_value"],
confidence=parsed["confidence"],
source_span=None,
extraction_reasoning=parsed["reasoning"]
)
def _get_product_schema(self, product_type: str) -> dict:
"""Get expected attributes for a product type."""
schemas = {
"headphones": {
"attributes": [
"brand", "model", "noise_cancelling", "battery_life_hours",
"bluetooth_version", "driver_size_mm", "frequency_response",
"impedance_ohms", "weight_grams", "foldable", "wired_option",
"microphone", "multipoint_connection", "codec_support"
]
},
"laptop": {
"attributes": [
"brand", "model", "processor", "ram_gb", "storage_gb",
"storage_type", "display_size_inches", "resolution",
"refresh_rate_hz", "gpu", "battery_wh", "weight_kg",
"ports", "wifi_version", "bluetooth_version", "os"
]
},
"monitor": {
"attributes": [
"brand", "model", "size_inches", "resolution", "panel_type",
"refresh_rate_hz", "response_time_ms", "hdr", "adaptive_sync",
"ports", "speakers", "vesa_mount", "height_adjustable"
]
}
}
return schemas.get(product_type, {"attributes": []})Layer 2: Symbolic Validation
Neural extraction gives you candidates. Symbolic rules validate them.
This is where Mercury learned their most important lesson: rules are not optional guardrails—they're the source of truth.
from dataclasses import dataclass
from typing import Callable, Any, Optional
from enum import Enum
class RuleSeverity(Enum):
CRITICAL = "critical" # Must pass or reject entirely
WARNING = "warning" # Flag for review but don't reject
INFO = "info" # Log for analytics only
@dataclass
class RuleViolation:
rule_id: str
rule_name: str
severity: RuleSeverity
expected: Any
actual: Any
message: str
auto_correctable: bool = False
suggested_correction: Any = None
@dataclass
class ValidationResult:
passed: bool
violations: list[RuleViolation]
corrections_applied: list[dict]
confidence_adjustment: float
class SymbolicRuleEngine:
"""
The symbolic layer that catches what neural extraction misses.
Design principle: Rules are written by domain experts, not ML engineers.
We built a DSL that product managers could actually use.
These rules prevented $2.3M in potential disputes in the first year.
"""
def __init__(self):
self.rules: dict[str, list[Callable]] = {}
self.rule_metadata: dict[str, dict] = {}
self._register_core_rules()
def validate(
self,
entity: ProductEntity,
extractions: list[NeuralExtraction]
) -> ValidationResult:
"""Validate all extractions against applicable rules."""
violations = []
corrections = []
extracted = {e.attribute_name: e for e in extractions}
# Get rules for this product type
applicable_rules = self.rules.get(entity.product_type, [])
applicable_rules.extend(self.rules.get("universal", []))
for rule_fn in applicable_rules:
result = rule_fn(entity, extracted)
if result:
violations.append(result)
if result.auto_correctable and result.suggested_correction:
corrections.append({
"attribute": result.rule_id.split("_")[0],
"old_value": result.actual,
"new_value": result.suggested_correction,
"reason": result.rule_name
})
confidence_hit = sum(
0.3 if v.severity == RuleSeverity.CRITICAL else 0.1
for v in violations
)
return ValidationResult(
passed=not any(v.severity == RuleSeverity.CRITICAL for v in violations),
violations=violations,
corrections_applied=corrections,
confidence_adjustment=min(confidence_hit, 0.8)
)
def _register_core_rules(self):
"""Register the core validation rules."""
self.rules["universal"] = [
self._rule_price_sanity,
self._rule_brand_model_consistency,
self._rule_no_placeholder_values,
]
self.rules["headphones"] = [
self._rule_xm5_anc_required,
self._rule_xm5_battery_range,
self._rule_airpods_model_sku_match,
self._rule_bluetooth_version_valid,
]
self.rules["monitor"] = [
self._rule_4k_resolution_check,
self._rule_refresh_rate_panel_match,
self._rule_hdr_brightness_requirement,
]
def _rule_price_sanity(
self,
entity: ProductEntity,
extracted: dict
) -> Optional[RuleViolation]:
"""Catch obviously wrong prices."""
price = extracted.get("price")
if not price:
return None
price_val = price.extracted_value
bounds = {
"headphones": (10, 2000),
"laptop": (200, 10000),
"monitor": (80, 5000),
}
min_price, max_price = bounds.get(entity.product_type, (1, 100000))
if price_val < min_price or price_val > max_price:
return RuleViolation(
rule_id="price_sanity",
rule_name="Price Sanity Check",
severity=RuleSeverity.CRITICAL,
expected=f"${min_price}-${max_price}",
actual=f"${price_val}",
message=f"Price ${price_val} outside expected range for {entity.product_type}"
)
return None
def _rule_no_placeholder_values(
self,
entity: ProductEntity,
extracted: dict
) -> Optional[RuleViolation]:
"""Catch placeholder text that neural extraction might miss."""
placeholders = {
"tbd", "tba", "n/a", "coming soon", "placeholder",
"xxx", "000", "insert here", "[blank]", "default"
}
for attr_name, extraction in extracted.items():
val = str(extraction.extracted_value).lower().strip()
if val in placeholders:
return RuleViolation(
rule_id=f"{attr_name}_placeholder",
rule_name="Placeholder Detection",
severity=RuleSeverity.WARNING,
expected="Real value",
actual=extraction.extracted_value,
message=f"Detected placeholder in {attr_name}",
auto_correctable=True,
suggested_correction=None
)
return None
def _rule_xm5_anc_required(
self,
entity: ProductEntity,
extracted: dict
) -> Optional[RuleViolation]:
"""Sony XM5 headphones MUST have active noise cancellation."""
model = extracted.get("model")
anc = extracted.get("noise_cancelling")
if not model or "xm5" not in str(model.extracted_value).lower():
return None
if not anc or anc.extracted_value != True:
return RuleViolation(
rule_id="xm5_anc_required",
rule_name="XM5 ANC Requirement",
severity=RuleSeverity.CRITICAL,
expected=True,
actual=anc.extracted_value if anc else None,
message="Sony WH-1000XM5 must have active noise cancellation",
auto_correctable=True,
suggested_correction=True
)
return None
def _rule_xm5_battery_range(
self,
entity: ProductEntity,
extracted: dict
) -> Optional[RuleViolation]:
"""XM5 battery life should be ~30 hours (±2)."""
model = extracted.get("model")
battery = extracted.get("battery_life_hours")
if not model or "xm5" not in str(model.extracted_value).lower():
return None
if not battery:
return RuleViolation(
rule_id="xm5_battery_missing",
rule_name="XM5 Battery Missing",
severity=RuleSeverity.WARNING,
expected="28-32 hours",
actual=None,
message="Sony XM5 should have battery life specified"
)
battery_val = battery.extracted_value
if not (28 <= battery_val <= 32):
return RuleViolation(
rule_id="xm5_battery_range",
rule_name="XM5 Battery Range",
severity=RuleSeverity.WARNING,
expected="28-32 hours",
actual=f"{battery_val} hours",
message=f"Battery life {battery_val}h outside expected range for XM5"
)
return None
def _rule_airpods_model_sku_match(
self,
entity: ProductEntity,
extracted: dict
) -> Optional[RuleViolation]:
"""
AirPods SKU must match claimed generation.
This rule alone caught 8% of AirPods listing fraud.
"""
sku = extracted.get("sku")
model = extracted.get("model")
if not sku or not model:
return None
sku_val = str(sku.extracted_value).upper()
model_val = str(model.extracted_value).lower()
sku_model_map = {
"MQD83": "airpods pro 2",
"MTJV3": "airpods pro 2", # USB-C version
"MME73": "airpods 3",
"MV7N2": "airpods pro", # 1st gen
}
for sku_prefix, expected_model in sku_model_map.items():
if sku_val.startswith(sku_prefix):
if expected_model not in model_val:
return RuleViolation(
rule_id="airpods_sku_mismatch",
rule_name="AirPods SKU/Model Mismatch",
severity=RuleSeverity.CRITICAL,
expected=expected_model,
actual=model_val,
message=f"SKU {sku_val} indicates {expected_model}, but listing claims {model_val}"
)
return None
def _rule_4k_resolution_check(
self,
entity: ProductEntity,
extracted: dict
) -> Optional[RuleViolation]:
"""
If listing claims '4K', resolution must be at least 3840x2160.
This rule alone caught 12% of monitor listing errors.
"""
title = extracted.get("title")
resolution = extracted.get("resolution")
if not title:
return None
title_val = str(title.extracted_value).lower()
claims_4k = "4k" in title_val or "uhd" in title_val
if not claims_4k:
return None
if not resolution:
return RuleViolation(
rule_id="4k_resolution_missing",
rule_name="4K Resolution Missing",
severity=RuleSeverity.WARNING,
expected="3840x2160 or higher",
actual=None,
message="Listing claims 4K but no resolution specified"
)
res_val = str(resolution.extracted_value)
import re
match = re.search(r"(\d{3,4})\s*[x×]\s*(\d{3,4})", res_val)
if match:
width, height = int(match.group(1)), int(match.group(2))
if width < 3840 or height < 2160:
return RuleViolation(
rule_id="4k_resolution_mismatch",
rule_name="4K Resolution Mismatch",
severity=RuleSeverity.CRITICAL,
expected="3840x2160 or higher",
actual=f"{width}x{height}",
message=f"Listing claims 4K but resolution is only {width}x{height}"
)
return NoneLayer 3: Truth Resolution
When neural extraction produces a value and symbolic rules validate it, you have a candidate. When multiple sources produce different candidates, you need a resolution.
This is where most AI systems fail. They either pick randomly or always trust the newest data.
Mercury built a proper adjudication system:
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import statistics
import math
@dataclass
class TruthResolution:
"""The output of truth resolution for a single attribute."""
attribute_name: str
resolved_value: Any
truth_score: float
resolution_method: str
winning_source: str
dissenting_sources: list[str]
needs_human_review: bool
class TruthResolver:
"""
Resolves conflicts between multiple claims about the same attribute.
Key insight: Truth isn't just about confidence—it's about provenance,
consistency, and recency, weighted appropriately.
The weighting took 4 months of A/B testing to calibrate.
"""
def __init__(self, config: dict):
self.weights = {
"source_tier": 0.35,
"claim_count": 0.20,
"neural_confidence": 0.25,
"recency": 0.10,
"rule_compliance": 0.10
}
self.verification_threshold = 0.85
self.review_threshold = 0.65
self.conflict_threshold = 0.30
def resolve(
self,
entity: ProductEntity,
attribute_name: str,
neural_extraction: Optional[NeuralExtraction],
rule_validation: ValidationResult
) -> TruthResolution:
"""Resolve truth for a single attribute."""
claims = entity.claims.get(attribute_name, [])
if not claims and not neural_extraction:
return TruthResolution(
attribute_name=attribute_name,
resolved_value=None,
truth_score=0.0,
resolution_method="no_data",
winning_source="none",
dissenting_sources=[],
needs_human_review=True
)
value_scores = self._score_candidate_values(
claims, neural_extraction, rule_validation
)
if not value_scores:
return TruthResolution(
attribute_name=attribute_name,
resolved_value=None,
truth_score=0.0,
resolution_method="scoring_failed",
winning_source="none",
dissenting_sources=[],
needs_human_review=True
)
sorted_values = sorted(
value_scores.items(),
key=lambda x: x[1]["total_score"],
reverse=True
)
winner = sorted_values[0]
winner_value = winner[0]
winner_data = winner[1]
needs_review = False
dissenting = []
if len(sorted_values) > 1:
runner_up = sorted_values[1]
score_gap = winner_data["total_score"] - runner_up[1]["total_score"]
if score_gap < self.conflict_threshold:
needs_review = True
dissenting = [str(runner_up[0])]
if winner_data.get("oem_sourced"):
method = "oem_authority"
elif winner_data["claim_count"] > 3 and winner_data["total_score"] > 0.8:
method = "consensus"
elif neural_extraction and winner_value == str(neural_extraction.extracted_value):
method = "neural_primary"
else:
method = "weighted_scoring"
return TruthResolution(
attribute_name=attribute_name,
resolved_value=winner_value,
truth_score=winner_data["total_score"],
resolution_method=method,
winning_source=winner_data.get("top_source", "unknown"),
dissenting_sources=dissenting,
needs_human_review=needs_review or winner_data["total_score"] < self.review_threshold
)
def _score_candidate_values(
self,
claims: list[Claim],
neural_extraction: Optional[NeuralExtraction],
rule_validation: ValidationResult
) -> dict:
"""Score each candidate value based on multiple factors."""
value_scores = {}
for claim in claims:
val = str(claim.value)
if val not in value_scores:
value_scores[val] = {
"source_tier_score": 0.0,
"claim_count": 0,
"confidence_scores": [],
"timestamps": [],
"sources": [],
"oem_sourced": False,
"top_source": None
}
scores = value_scores[val]
tier_score = self._tier_to_score(claim.source_tier)
scores["source_tier_score"] = max(scores["source_tier_score"], tier_score)
scores["claim_count"] += 1
scores["confidence_scores"].append(claim.confidence)
scores["timestamps"].append(claim.timestamp)
scores["sources"].append(claim.source_id)
if claim.source_tier == SourceTier.OEM:
scores["oem_sourced"] = True
if tier_score > 0.65:
scores["top_source"] = claim.source_id
if neural_extraction and neural_extraction.extracted_value:
val = str(neural_extraction.extracted_value)
if val not in value_scores:
value_scores[val] = {
"source_tier_score": 0.5,
"claim_count": 0,
"confidence_scores": [],
"timestamps": [],
"sources": [],
"oem_sourced": False,
"top_source": "neural_extraction"
}
value_scores[val]["confidence_scores"].append(neural_extraction.confidence)
for val, scores in value_scores.items():
tier_component = scores["source_tier_score"] * self.weights["source_tier"]
count_component = min(math.log(scores["claim_count"] + 1) / math.log(10), 1.0)
count_component *= self.weights["claim_count"]
if scores["confidence_scores"]:
conf_component = statistics.mean(scores["confidence_scores"])
else:
conf_component = 0.5
conf_component *= self.weights["neural_confidence"]
if scores["timestamps"]:
most_recent = max(scores["timestamps"])
days_old = (datetime.now() - most_recent).days
recency_component = max(0, 1 - (days_old / 365))
else:
recency_component = 0.5
recency_component *= self.weights["recency"]
rule_component = 1.0 if not rule_validation.violations else 0.5
rule_component *= self.weights["rule_compliance"]
oem_bonus = 0.15 if scores["oem_sourced"] else 0
scores["total_score"] = min(
tier_component + count_component + conf_component +
recency_component + rule_component + oem_bonus,
1.0
)
return value_scores
def _tier_to_score(self, tier: SourceTier) -> float:
return {
SourceTier.OEM: 1.0,
SourceTier.AUTHORIZED: 0.85,
SourceTier.TRUSTED_SELLER: 0.65,
SourceTier.STANDARD_SELLER: 0.40,
SourceTier.UNKNOWN: 0.20
}.get(tier, 0.20)When the System Disagrees With Itself
Three months into production, Mercury hit an interesting failure mode.
The neural extractor confidently tagged a monitor as "4K" (0.94 confidence). The symbolic rule flagged it: the resolution field showed 2560×1440.
Who wins?
Their first instinct was "rules always win." But that created a different problem—the resolution field came from a seller who had copy-pasted from the wrong product page. The neural extractor had correctly read "4K UHD" from the product title and image text.
The real issue wasn't the conflict. It was that they had two unreliable signals fighting each other.
The solution was a conflict escalation protocol:
@dataclass
class ConflictResolution:
"""How to handle neural vs symbolic conflicts."""
resolution_type: str
action: str
confidence_adjustment: float
requires_human: bool
def resolve_neural_symbolic_conflict(
neural: NeuralExtraction,
rule_violation: RuleViolation,
provenance: SourceTier
) -> ConflictResolution:
"""
Decision tree for neural vs symbolic conflicts.
This was the hardest part of the system to get right.
We went through 6 iterations before landing on this logic.
"""
# Case 1: OEM data violates rule
# Usually means the rule is wrong or outdated
if provenance == SourceTier.OEM and rule_violation.severity != RuleSeverity.CRITICAL:
return ConflictResolution(
resolution_type="trust_oem",
action="accept_neural_flag_rule_review",
confidence_adjustment=-0.1,
requires_human=True # Review the rule, not the data
)
# Case 2: Critical rule violation from any source
# Safety first—reject and escalate
if rule_violation.severity == RuleSeverity.CRITICAL:
return ConflictResolution(
resolution_type="trust_rule",
action="reject_neural_escalate",
confidence_adjustment=-0.5,
requires_human=True
)
# Case 3: High-confidence neural, warning-level rule, unreliable source
# This is the "monitor 4K" case—neural is probably right
if (neural.confidence > 0.9 and
rule_violation.severity == RuleSeverity.WARNING and
provenance in [SourceTier.STANDARD_SELLER, SourceTier.UNKNOWN]):
return ConflictResolution(
resolution_type="trust_neural_qualified",
action="accept_neural_low_confidence",
confidence_adjustment=-0.3,
requires_human=False
)
# Case 4: Everything else
# When in doubt, require human review
return ConflictResolution(
resolution_type="uncertain",
action="quarantine_for_review",
confidence_adjustment=-0.4,
requires_human=True
)2.3 The Complete Pipeline: End-to-End SKU Verification
Here's how the full neuro-symbolic pipeline processes a single SKU. Let's walk through a real example:
Sony WH-1000XM5 Wireless Noise-Canceling Headphones
Mercury ingests 9 conflicting listings for this product.
class NeuroSymbolicPipeline:
"""
The complete verification pipeline from raw listing to verified entity.
This processes 150,000 SKUs per day at Mercury.
"""
def __init__(self, config: dict):
self.extractor = NeuralExtractor(config["neural"])
self.rules = SymbolicRuleEngine()
self.resolver = TruthResolver(config["resolution"])
self.processed_count = 0
self.verification_stats = {
"fully_verified": 0,
"partially_verified": 0,
"failed_verification": 0,
"escalated_to_human": 0
}
def process_entity(
self,
entity: ProductEntity,
listings: list[dict]
) -> ProductEntity:
"""
Process a product entity through the full pipeline.
Steps:
1. Ingest all claims from all listings
2. Run neural extraction on combined data
3. Validate extractions against symbolic rules
4. Apply auto-corrections where rules allow
5. Resolve truth for each attribute
6. Update verification status
"""
self.processed_count += 1
# Step 1: Ingest claims
for listing in listings:
self._ingest_listing_claims(entity, listing)
# Step 2: Neural extraction
combined_text = self._combine_listing_data(listings)
extractions = self.extractor.extract_attributes(
combined_text,
entity.product_type
)
# Step 3: Symbolic validation
validation_result = self.rules.validate(entity, extractions)
# Step 4: Apply auto-corrections
for correction in validation_result.corrections_applied:
self._apply_correction(extractions, correction)
# Step 5: Resolve truth for each attribute
extraction_map = {e.attribute_name: e for e in extractions}
for attr_name in self._get_all_attribute_names(entity, extractions):
resolution = self.resolver.resolve(
entity,
attr_name,
extraction_map.get(attr_name),
validation_result
)
if resolution.truth_score >= self.resolver.verification_threshold:
entity.verified_attributes[attr_name] = VerifiedAttribute(
attribute_name=attr_name,
value=resolution.resolved_value,
truth_score=resolution.truth_score,
winning_claim_id=resolution.winning_source,
supporting_claims=[],
conflicting_claims=resolution.dissenting_sources,
resolution_method=resolution.resolution_method,
last_verified=datetime.now(),
needs_review=resolution.needs_human_review
)
# Step 6: Update status
entity.verification_status = self._calculate_verification_status(entity)
self._update_stats(entity)
return entity
def _ingest_listing_claims(self, entity: ProductEntity, listing: dict) -> None:
"""Convert listing attributes to claims."""
source_tier = self._assess_source_tier(listing["seller_id"])
for attr_name, attr_value in listing.get("attributes", {}).items():
claim = Claim(
attribute_name=normalize_attribute_name(attr_name),
value=normalize_attribute_value(attr_name, attr_value),
source_id=listing["listing_id"],
source_tier=source_tier,
extraction_method=ExtractionMethod.DIRECT_FEED,
raw_text=listing.get("raw_description"),
confidence=0.8 if source_tier == SourceTier.OEM else 0.6,
timestamp=datetime.now()
)
entity.add_claim(claim)
def _calculate_verification_status(self, entity: ProductEntity) -> str:
"""Determine overall verification status."""
if not entity.verified_attributes:
return "unverified"
schema = self.extractor._get_product_schema(entity.product_type)
required = set(schema.get("required_attributes", schema["attributes"][:5]))
verified = set(entity.verified_attributes.keys())
high_confidence = sum(
1 for attr in entity.verified_attributes.values()
if attr.truth_score > 0.9 and not attr.needs_review
)
if required.issubset(verified) and high_confidence >= len(required):
return "verified"
elif len(verified) >= len(required) * 0.6:
return "partial"
else:
return "unverified"
def get_pipeline_report(self) -> dict:
"""Generate pipeline performance report."""
total = self.processed_count or 1
return {
"total_processed": self.processed_count,
"verification_rate": self.verification_stats["fully_verified"] / total,
"partial_rate": self.verification_stats["partially_verified"] / total,
"failure_rate": self.verification_stats["failed_verification"] / total,
"human_escalation_rate": self.verification_stats["escalated_to_human"] / total,
"neural_costs": sum(c["cost"] for c in self.extractor.extraction_costs),
"avg_cost_per_entity": (
sum(c["cost"] for c in self.extractor.extraction_costs) / total
)
}The XM5 verification in action:
| Step | Input | Output |
|---|---|---|
| Ingest | 9 listings, 47 total claims | Claims organized by attribute |
| Neural Extract | Combined text | Brand: Sony, Model: XM5, ANC: Yes, Battery: 30h |
| Symbolic Validate | Extractions | PASS: All XM5 rules satisfied |
| Truth Resolve | 4 claims for battery_life | Winner: 30h (OEM source, 0.94 score) |
| Final Status | Verified attributes | VERIFIED (all required attributes above threshold) |
Part Three: The Economics
Engineering AI Unit Economics (FinOps)
AI is powerful. AI is expensive. AI at scale is brutally expensive.
Mercury learned this the hard way.
Their CFO didn't care about neural architectures. She cared about one number: $1.47 million.
That was the monthly AWS bill after they turned on "AI-powered product enrichment" across their full catalog. The feature worked beautifully. Search relevance improved 34%. Customer satisfaction scores jumped. The product team was celebrating.
Then Finance called.
"You've burned through your entire quarterly AI budget in six weeks. Explain."
This is the moment Mercury learned that AI without economics is just expensive demos.
3.1 The Anatomy of AI Costs
Before you can optimize, you need to understand where the money goes.
Mercury's initial pipeline had no cost instrumentation. They knew they were spending money. They didn't know where.
They built a cost attribution system:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import statistics
class CostCategory(Enum):
INFERENCE = "inference"
COMPUTE = "compute"
STORAGE = "storage"
BANDWIDTH = "bandwidth"
HUMAN_REVIEW = "human_review"
@dataclass
class CostEvent:
"""A single cost-incurring event in the pipeline."""
event_id: str
category: CostCategory
model_name: Optional[str]
input_tokens: int
output_tokens: int
cost_usd: float
timestamp: datetime
product_id: Optional[str]
feature_name: str
triggered_by: str
contributed_to_conversion: Optional[bool] = None
user_satisfaction_score: Optional[float] = None
@dataclass
class CostBudget:
"""Budget constraints for a feature or pipeline."""
feature_name: str
monthly_budget_usd: float
current_spend_usd: float = 0.0
alert_threshold: float = 0.8
hard_limit: bool = False
def remaining(self) -> float:
return max(0, self.monthly_budget_usd - self.current_spend_usd)
def utilization(self) -> float:
return self.current_spend_usd / self.monthly_budget_usd
def is_exhausted(self) -> bool:
return self.hard_limit and self.utilization() >= 1.0
class CostTracker:
"""
Real-time cost tracking and attribution.
This became the most important operational tool in Mercury's AI stack.
"""
def __init__(self):
self.events: list[CostEvent] = []
self.budgets: dict[str, CostBudget] = {}
# Model pricing (per 1K tokens: input, output)
self.model_costs = {
"gpt-4-turbo": (0.01, 0.03),
"gpt-4o": (0.005, 0.015),
"gpt-4o-mini": (0.00015, 0.0006),
"claude-3-opus": (0.015, 0.075),
"claude-3-sonnet": (0.003, 0.015),
"claude-3-haiku": (0.00025, 0.00125),
"claude-sonnet-4": (0.003, 0.015),
"text-embedding-3-large": (0.00013, 0),
"text-embedding-3-small": (0.00002, 0),
}
def record_event(self, event: CostEvent) -> None:
"""Record a cost event and update budgets."""
self.events.append(event)
if event.feature_name in self.budgets:
self.budgets[event.feature_name].current_spend_usd += event.cost_usd
budget = self.budgets[event.feature_name]
if budget.utilization() >= budget.alert_threshold:
self._send_alert(budget)
def calculate_cost(
self,
model_name: str,
input_tokens: int,
output_tokens: int
) -> float:
"""Calculate cost for a model call."""
if model_name not in self.model_costs:
return (input_tokens + output_tokens) * 0.00005
input_rate, output_rate = self.model_costs[model_name]
return (input_tokens * input_rate / 1000) + (output_tokens * output_rate / 1000)
def get_feature_economics(self, feature_name: str, days: int = 30) -> dict:
"""Calculate unit economics for a feature."""
cutoff = datetime.now() - timedelta(days=days)
feature_events = [
e for e in self.events
if e.feature_name == feature_name and e.timestamp > cutoff
]
if not feature_events:
return {"status": "no_data"}
total_cost = sum(e.cost_usd for e in feature_events)
total_calls = len(feature_events)
conversions = [
e for e in feature_events
if e.contributed_to_conversion == True
]
return {
"total_cost": total_cost,
"total_calls": total_calls,
"cost_per_call": total_cost / total_calls,
"conversion_rate": len(conversions) / total_calls if total_calls > 0 else 0,
"cost_per_conversion": total_cost / len(conversions) if conversions else None,
}When they ran the numbers, here's what they found:
| Pipeline Stage | % of Total Cost | Cost per Product |
|---|---|---|
| Neural extraction (large model) | 67% | $0.0089 |
| Image validation | 18% | $0.0024 |
| Embedding generation | 8% | $0.0011 |
| Rule evaluation | 2% | $0.0003 |
| Truth resolution | 5% | $0.0007 |
The large model extraction was killing them. And it was running on every single product, including commodity items where perfect attribution didn't matter.
3.2 The Model Tiering Strategy
Mercury redesigned their pipeline around a simple insight: not all products deserve the same AI investment.
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class ProductTier(Enum):
PREMIUM = "premium" # High-margin, high-stakes
STANDARD = "standard" # Normal products
COMMODITY = "commodity" # Low-margin basics
class ModelTier(Enum):
LARGE = "large" # GPT-4 / Claude Opus class
MEDIUM = "medium" # GPT-4o / Claude Sonnet class
SMALL = "small" # GPT-4o-mini / Claude Haiku class
CACHED = "cached" # Precomputed results
RULES_ONLY = "rules" # No neural, symbolic only
@dataclass
class TieringDecision:
model_tier: ModelTier
reasoning: str
estimated_cost: float
fallback_tier: Optional[ModelTier] = None
class CostAwareRouter:
"""
Routes products to appropriate model tiers based on value and complexity.
This single component reduced Mercury's AI costs by 73%.
"""
def __init__(self, cost_tracker: CostTracker):
self.cost_tracker = cost_tracker
self.margin_thresholds = {
ProductTier.PREMIUM: 0.25,
ProductTier.STANDARD: 0.10,
ProductTier.COMMODITY: 0.0
}
self.complexity_signals = [
"technical_specifications",
"compatibility_requirements",
"safety_certifications",
]
def route(
self,
product: dict,
extraction_type: str
) -> TieringDecision:
"""Decide which model tier to use."""
product_tier = self._classify_product_tier(product)
complexity = self._assess_complexity(product, extraction_type)
# Check cache first
cache_key = self._build_cache_key(product, extraction_type)
if self._cache_exists(cache_key):
return TieringDecision(
model_tier=ModelTier.CACHED,
reasoning="Result available in cache",
estimated_cost=0.0001
)
return self._make_routing_decision(product_tier, complexity, extraction_type)
def _classify_product_tier(self, product: dict) -> ProductTier:
"""Classify product by business value."""
margin = product.get("estimated_margin", 0.15)
category = product.get("category", "").lower()
# High-value categories always get premium treatment
premium_categories = {
"electronics", "appliances", "industrial equipment",
"medical devices", "automotive parts"
}
if any(cat in category for cat in premium_categories) and margin > 0.15:
return ProductTier.PREMIUM
commodity_categories = {
"office supplies", "basic accessories", "consumables"
}
if any(cat in category for cat in commodity_categories):
return ProductTier.COMMODITY
if margin >= self.margin_thresholds[ProductTier.PREMIUM]:
return ProductTier.PREMIUM
elif margin >= self.margin_thresholds[ProductTier.STANDARD]:
return ProductTier.STANDARD
else:
return ProductTier.COMMODITY
def _assess_complexity(self, product: dict, extraction_type: str) -> float:
"""Score extraction complexity from 0 to 1."""
complexity_score = 0.0
description_length = len(product.get("description", ""))
if description_length > 2000:
complexity_score += 0.2
elif description_length > 500:
complexity_score += 0.1
description_lower = product.get("description", "").lower()
tech_signals = [
"specifications", "compatible with", "requires",
"voltage", "wattage", "certification"
]
signal_count = sum(1 for signal in tech_signals if signal in description_lower)
complexity_score += min(signal_count * 0.1, 0.4)
complex_extractions = {
"compatibility": 0.3,
"technical_specs": 0.2,
"safety_info": 0.25,
}
complexity_score += complex_extractions.get(extraction_type, 0.0)
if product.get("has_conflicting_claims", False):
complexity_score += 0.2
return min(complexity_score, 1.0)
def _make_routing_decision(
self,
product_tier: ProductTier,
complexity: float,
extraction_type: str
) -> TieringDecision:
"""
Final routing decision.
This matrix took 3 months of A/B testing to calibrate.
"""
# Premium products: quality over cost
if product_tier == ProductTier.PREMIUM:
if complexity > 0.6:
return TieringDecision(
model_tier=ModelTier.LARGE,
reasoning=f"Premium product, high complexity ({complexity:.2f})",
estimated_cost=0.015,
fallback_tier=ModelTier.MEDIUM
)
else:
return TieringDecision(
model_tier=ModelTier.MEDIUM,
reasoning=f"Premium product, moderate complexity ({complexity:.2f})",
estimated_cost=0.005,
fallback_tier=ModelTier.LARGE
)
# Standard products: balanced approach
elif product_tier == ProductTier.STANDARD:
if complexity > 0.7:
return TieringDecision(
model_tier=ModelTier.MEDIUM,
reasoning=f"Standard product, high complexity ({complexity:.2f})",
estimated_cost=0.005,
fallback_tier=ModelTier.SMALL
)
elif complexity > 0.3:
return TieringDecision(
model_tier=ModelTier.SMALL,
reasoning=f"Standard product, moderate complexity ({complexity:.2f})",
estimated_cost=0.0008,
fallback_tier=ModelTier.MEDIUM
)
else:
return TieringDecision(
model_tier=ModelTier.RULES_ONLY,
reasoning=f"Standard product, low complexity ({complexity:.2f})",
estimated_cost=0.0001,
fallback_tier=ModelTier.SMALL
)
# Commodity products: minimize cost
else:
if complexity > 0.8:
return TieringDecision(
model_tier=ModelTier.SMALL,
reasoning=f"Commodity but high complexity ({complexity:.2f})",
estimated_cost=0.0008,
fallback_tier=ModelTier.RULES_ONLY
)
else:
return TieringDecision(
model_tier=ModelTier.RULES_ONLY,
reasoning=f"Commodity product, rules only",
estimated_cost=0.0001,
fallback_tier=None
)3.3 The Caching Strategy That Saved $400K/Month
Model tiering helped. But the real breakthrough came from aggressive caching.
Here's the insight: product attributes don't change often, but they get requested constantly.
Mercury was re-extracting the same attributes every time a product appeared in search results. Thousands of times per day for popular products.
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Any
@dataclass
class CachedExtraction:
"""A cached extraction result."""
cache_key: str
attribute_name: str
value: Any
truth_score: float
extracted_at: datetime
expires_at: datetime
model_used: str
extraction_cost: float
hit_count: int = 0
class SmartCache:
"""
Intelligent caching with TTL based on attribute volatility.
Key insight: Some attributes never change (brand, model number).
Some change occasionally (price, availability).
Some change frequently (stock levels, delivery estimates).
TTL should match volatility.
"""
def __init__(self):
self.cache: dict[str, CachedExtraction] = {}
self.ttl_config = {
# Static attributes - cache for 30 days
"static": {
"ttl_days": 30,
"attributes": [
"brand", "model", "sku", "upc", "manufacturer",
"product_line", "release_date", "color_options"
]
},
# Semi-static - cache for 7 days
"semi_static": {
"ttl_days": 7,
"attributes": [
"weight", "dimensions", "specifications",
"compatibility", "features", "warranty_length"
]
},
# Dynamic - cache for 24 hours
"dynamic": {
"ttl_days": 1,
"attributes": [
"price", "availability", "seller_count",
"rating", "review_count"
]
},
# Volatile - cache for 1 hour
"volatile": {
"ttl_hours": 1,
"attributes": [
"stock_level", "delivery_estimate",
"current_promotion", "buy_box_winner"
]
}
}
self.stats = {"hits": 0, "misses": 0, "cost_saved": 0.0}
def get(self, product_id: str, attribute_name: str) -> Optional[CachedExtraction]:
"""Retrieve from cache if valid."""
cache_key = f"{product_id}:{attribute_name}"
if cache_key not in self.cache:
self.stats["misses"] += 1
return None
cached = self.cache[cache_key]
if datetime.now() > cached.expires_at:
del self.cache[cache_key]
self.stats["misses"] += 1
return None
cached.hit_count += 1
self.stats["hits"] += 1
self.stats["cost_saved"] += cached.extraction_cost
return cached
def put(
self,
product_id: str,
attribute_name: str,
value: Any,
truth_score: float,
model_used: str,
extraction_cost: float
) -> None:
"""Store extraction result with appropriate TTL."""
cache_key = f"{product_id}:{attribute_name}"
ttl = self._get_ttl(attribute_name)
self.cache[cache_key] = CachedExtraction(
cache_key=cache_key,
attribute_name=attribute_name,
value=value,
truth_score=truth_score,
extracted_at=datetime.now(),
expires_at=datetime.now() + ttl,
model_used=model_used,
extraction_cost=extraction_cost
)
def _get_ttl(self, attribute_name: str) -> timedelta:
"""Get TTL based on attribute volatility."""
for class_name, config in self.ttl_config.items():
if attribute_name in config["attributes"]:
if "ttl_days" in config:
return timedelta(days=config["ttl_days"])
elif "ttl_hours" in config:
return timedelta(hours=config["ttl_hours"])
return timedelta(days=3) # Default
def get_economics(self) -> dict:
"""Report cache economics."""
total_requests = self.stats["hits"] + self.stats["misses"]
return {
"hit_rate": self.stats["hits"] / total_requests if total_requests > 0 else 0,
"total_cost_saved": self.stats["cost_saved"],
"cache_size": len(self.cache)
}Mercury ran batch precomputation nightly for their top 500,000 products:
- Cost: $2,100/night
- Savings: $18,000/day in avoided real-time extractions
- ROI: 757%
3.4 The Feature ROI Framework
With cost tracking and tiering in place, Mercury built a framework for evaluating every AI feature before it shipped.
No feature ships without passing this analysis:
3.5 A Real Decision: The Feature That Didn't Ship
Let's walk through an actual feature decision from Mercury.
Proposed Feature: LLM-generated "Explainable Attribute Differences"
When a buyer compares two products, generate a natural language explanation of the key differences.
The Analysis:
company_metrics = {
"daily_orders": 45000,
"average_order_value": 127,
"average_margin": 0.23,
"daily_support_tickets": 3200,
"return_rate": 0.08
}
calculator = FeatureROICalculator(company_metrics)
proposal = FeatureProposal(
name="LLM-generated Explainable Attribute Differences",
description="Show buyers why two similar products differ",
model_tier="gpt-4o",
estimated_calls_per_day=120_000,
cost_per_call=0.012,
target_metric="conversion_rate",
baseline_value=0.032,
expected_lift=0.003,
confidence_in_lift=0.65,
affected_user_segment="comparison_shoppers",
rollout_percentage=1.0
)
analysis = calculator.analyze(proposal)Result:
Feature: LLM-generated Explainable Attribute Differences
COST ANALYSIS:
- Estimated daily calls: 120,000
- Cost per call: $0.0120
- Monthly cost: $43,200.00
VALUE ANALYSIS:
- Target metric: conversion_rate
- Expected lift: 0.30%
- Confidence: 65%
- Monthly value: $18,126.00
ROI: -58.0%
DECISION: KILLThe feature destroyed value. It cost more than twice what it generated.
The Redesign:
Instead of killing the concept entirely, they redesigned it:
- Template-based differences for common comparisons (free)
- Structured attribute highlighting using verified graph data (nearly free)
- LLM-generated explanations only for:
- High-margin products (>30% margin)
- Users who had viewed 5+ products (high intent)
- Categories where comparisons drove >$500 AOV
The redesigned feature:
COST ANALYSIS:
- Estimated daily calls: 8,500
- Cost per call: $0.0120
- Monthly cost: $3,060.00
VALUE ANALYSIS:
- Target metric: conversion_rate
- Expected lift: 0.25%
- Confidence: 72%
- Monthly value: $4,890.00
ROI: 59.8%
DECISION: SHIPBy targeting the feature precisely, they achieved positive ROI.
Part Four: The Product Strategy
Systematizing Product Strategy
Mercury's final transformation was organizational: replacing tribal knowledge with technical precision.
They adopted High-Fidelity Technical Specifications for every AI feature.
A real spec includes:
- Problem definition (with data backing)
- Data requirements (sources, quality thresholds)
- Model behavior (inputs, outputs, edge cases)
- Fallback logic (what happens when AI fails)
- Cost guardrails (hard limits)
- Verification pathways (how to validate correctness)
- Success metrics (with targets)
Example: Verified Attribute Pages Spec
feature_name: Verified Attribute Pages
version: 2.3
owner: Product Intelligence Team
last_updated: 2024-11-15
problem_definition:
statement: |
Buyers don't trust product specifications because they've been burned
by incorrect data. This reduces conversion and increases returns.
evidence:
- 34% of support tickets mention "wrong specs"
- Post-purchase surveys cite "spec accuracy" as #2 concern
- Return rate for electronics 2.3x higher when specs disputed
data_requirements:
inputs:
- source: Commerce Knowledge Graph
type: verified_attributes
freshness: <24 hours
- source: Seller Feeds
type: raw_claims
freshness: real-time
quality_thresholds:
minimum_truth_score: 0.85
minimum_sources: 2
oem_required_for: [safety_certifications, voltage, compatibility]
model_behavior:
display_logic:
- truth_score >= 0.95: label="Verified", icon=checkmark_green
- truth_score >= 0.85: label="Likely Accurate", icon=checkmark_gray
- truth_score < 0.85: label="Seller Claim", icon=info
hover_behavior: |
Show provenance: "Verified from [source_count] sources including [top_source]"
conflict_display: |
If conflicting_claims > 0: Show "Some sources report [alternative]"
fallback_logic:
if_graph_unavailable: Display raw seller data with "Unverified" label
if_truth_score_missing: Treat as truth_score = 0.5
if_attribute_missing: Omit from display (don't show "Unknown")
cost_guardrails:
max_cost_per_page_view: $0.0007
max_monthly_spend: $45,000
alert_threshold: 80%
hard_shutoff: 95%
verification_pathways:
automated:
- Daily comparison against OEM feeds
- Weekly accuracy audit (sample 1000 products)
manual:
- Escalated disputes reviewed within 4 hours
- Monthly accuracy report to leadership
success_metrics:
primary:
- metric: B2B conversion rate
baseline: 3.2%
target: 3.8%
measurement: A/B test, 4-week window
secondary:
- metric: Spec-related support tickets
baseline: 1,100/day
target: 700/day
- metric: Return rate (spec disputes)
baseline: 4.2%
target: 2.8%
rollout_plan:
phase_1: Electronics category, 10% traffic (2 weeks)
phase_2: Electronics 100%, expand to Appliances (4 weeks)
phase_3: Full catalog (ongoing)
rollback_trigger: Conversion drops >0.3% or costs exceed guardrailThis is how you scale AI without scaling chaos.
Part Five: The Trust Architecture
Building the Next-Gen Verification Platform
Six months into the transformation, Mercury discovered something uncomfortable.
Revenue was quietly influencing ranking.
Not explicitly—no one had written a rule that said "boost high-margin products." But the optimization algorithms had learned that certain products generated more revenue, and those products mysteriously ranked higher.
The data team found it during a routine audit. High-margin products were appearing 23% more often in top-10 results than their relevance scores justified.
Mercury's leadership made a hard decision: Ranking must be unbiased and separate from monetization.
They built a transaction layer of trust:
@dataclass
class RankingFactors:
"""
Explicit, auditable ranking factors.
Note what's NOT here: revenue, margin, seller fees.
"""
relevance_score: float # 0-1, from search model
verified_quality_score: float # 0-1, from verification pipeline
user_satisfaction_score: float # 0-1, from behavioral signals
fulfillment_reliability: float # 0-1, from delivery data
def calculate_unbiased_rank(factors: RankingFactors) -> float:
"""
Ranking formula with explicit, auditable weights.
Revenue is NOT a factor. This is intentional and non-negotiable.
"""
weights = {
"relevance": 0.55,
"verified_quality": 0.20,
"user_satisfaction": 0.15,
"fulfillment": 0.10
}
score = (
weights["relevance"] * factors.relevance_score +
weights["verified_quality"] * factors.verified_quality_score +
weights["user_satisfaction"] * factors.user_satisfaction_score +
weights["fulfillment"] * factors.fulfillment_reliability
)
return score
class RankingAuditor:
"""
Continuous monitoring for ranking bias.
Runs daily. Reports to compliance. No exceptions.
"""
def __init__(self):
self.bias_thresholds = {
"margin_correlation": 0.15, # Max correlation with margin
"revenue_correlation": 0.15, # Max correlation with revenue
"seller_tier_lift": 1.20, # Max advantage for premium sellers
}
def audit_ranking_fairness(self, ranking_data: list[dict]) -> dict:
"""Check for unintended bias in rankings."""
violations = []
# Check margin correlation
margin_corr = self._calculate_correlation(
ranking_data, "rank_position", "product_margin"
)
if abs(margin_corr) > self.bias_thresholds["margin_correlation"]:
violations.append({
"type": "margin_bias",
"correlation": margin_corr,
"threshold": self.bias_thresholds["margin_correlation"],
"severity": "high"
})
# Check seller tier advantage
premium_avg_rank = self._average_rank_by_seller_tier(ranking_data, "premium")
standard_avg_rank = self._average_rank_by_seller_tier(ranking_data, "standard")
if premium_avg_rank > 0:
tier_lift = standard_avg_rank / premium_avg_rank
if tier_lift > self.bias_thresholds["seller_tier_lift"]:
violations.append({
"type": "seller_tier_bias",
"lift": tier_lift,
"threshold": self.bias_thresholds["seller_tier_lift"],
"severity": "medium"
})
return {
"audit_timestamp": datetime.now().isoformat(),
"records_analyzed": len(ranking_data),
"violations": violations,
"status": "PASS" if not violations else "FAIL"
}Part Six: Research Alignment
How This Maps to Current Research
The architecture described here aligns with three major research domains:
1. Neurosymbolic AI
Research consistently shows that hybrid neural-symbolic systems outperform pure neural models in:
- Truth verification (reducing hallucinations by 40-60%)
- Multi-source reasoning (handling conflicting information)
- Constraint satisfaction (enforcing business rules)
- Interpretability (explaining why decisions were made)
Mercury's three-layer pipeline (neural extraction → symbolic validation → truth resolution) is a direct implementation of these principles.
2. Knowledge Graph Reasoning
Studies demonstrate that knowledge graphs provide:
- Provenance tracking (knowing where data came from)
- Conflict resolution (handling disagreements systematically)
- Interpretable reasoning (explaining conclusions)
- Multi-hop inference (connecting related facts)
The Commerce Knowledge Graph isn't a nice-to-have—it's the foundation that makes verified truth possible.
3. FinOps for AI Systems
Emerging literature emphasizes:
- Inference cost modeling (understanding where money goes)
- Value-based feature gating (spending AI budget where it matters)
- Model distillation and tiering (right-sizing models to tasks)
- Caching strategies (avoiding redundant computation)
Mercury's 73% cost reduction came directly from applying these principles.
Part Seven: The Results
One Year Later
Twelve months after starting the transformation, Mercury's numbers told the story:
Verification Metrics
- Products with verified attributes: 89% (up from 12%)
- Average TruthScore for verified attributes: 0.91
- Human escalation rate: 4.2% (down from 31%)
- Time to verify new SKU: 2.3 hours (down from 14 days)
Business Metrics
- Spec-related support tickets: -67%
- Return rate (spec disputes): -58%
- B2B conversion rate: +0.7% absolute
- Customer NPS for "product information accuracy": +34 points
Economic Metrics
- Monthly AI infrastructure cost: $396K (down from $1.47M)
- Cost per verified SKU: $0.0031
- Feature ROI (weighted average): 127%
Operational Metrics
- Pipeline uptime: 99.7%
- Average extraction latency: 340ms
- Cache hit rate: 84%
- Rule coverage: 2,847 active rules across 43 categories
Conclusion: The Work That Matters
The next era of AI will not be defined by who has the biggest model.
It will be defined by teams who can:
- Architect intelligence layers grounded in truth — not probabilistic guesses, but verified facts with provenance
- Engineer AI systems with sustainable economics — features that generate more value than they cost
- Replace intuition with systematic strategy — specs, metrics, and frameworks instead of tribal knowledge
- Build verification platforms that align incentives with users — ranking that serves buyers, not just revenue
Mercury Commerce started with a $847,000 invoice dispute and a crisis of trust.
They ended with a verification platform that processes 150,000 SKUs daily, maintains 89% verified coverage, and runs at 27% of the original cost.
This is the work that turns AI from novelty into infrastructure.
This is the work that defines the next decade.
This article represents a technical analysis of production AI systems. The "Mercury Commerce" example is a composite based on real implementations across multiple enterprise deployments. Code samples have been simplified for readability while preserving architectural decisions.