AI agents fail in production not because of model limitations, but because they lack properly enriched data. The most critical AI applications require preprocessing thousands of documents, transcripts, and unstructured text into structured, validated data before any agent makes a decision. This preprocessing maze—OCR models, transcription services, document chunking, rate limits, and brittle glue code—prevents 95% of enterprise GenAI pilots from reaching production impact.
Semantic operators solve this by treating data enrichment as first-class DataFrame operations rather than external API calls. This approach transforms data enrichment from operational overhead into reliable, optimizable pipelines that scale.
The Data Enrichment Problem for AI Agents
Traditional AI agent architectures couple heavy data preprocessing with real-time decision-making. This creates three critical failures:
Unpredictable latency - Agents process raw PDFs, transcripts, and documents in the request path, causing response times that range from milliseconds to minutes depending on document size and LLM load.
Resource contention - Batch inference operations compete with real-time reasoning for compute resources, degrading both workloads.
Debugging nightmares - When agents produce incorrect results, teams can't determine if the issue stems from reasoning logic or corrupted preprocessing without proper lineage tracking.
The solution requires separating batch data enrichment from real-time agent execution. Heavy lifting—extraction, classification, semantic joins, clustering—happens offline in structured pipelines. Agents consume clean, validated, enriched data and focus exclusively on decision-making.
What Semantic Operators Are and Why They Matter
Semantic operators are DataFrame operations that process meaning, not just values. Unlike traditional operations that work on exact matches and numeric calculations, semantic operators leverage LLMs to transform, filter, join, and aggregate data based on semantic knowledge.
Fenic provides nine semantic operators as first-class DataFrame primitives:
- semantic.extract - Transforms unstructured text into structured data using Pydantic schemas
- semantic.map - Applies natural language transformations to data (summarization, translation, rewriting)
- semantic.classify - Categorizes text with few-shot examples
- semantic.join - Joins DataFrames based on meaning rather than exact values
- semantic.predicate - Creates natural language filters for row selection
- semantic.with_cluster_labels - Clusters rows by semantic similarity using embeddings
- semantic.reduce - Aggregates grouped data with LLM operations
- semantic.analyze_sentiment - Built-in sentiment analysis
- semantic.embed - Generates embeddings for text columns
The critical difference from traditional LLM pipelines: Fenic's query engine fully understands these operations. When you write df.semantic.extract(...), the optimizer knows this is an inference operation with specific characteristics—high latency, token costs, batching benefits, caching opportunities. This visibility enables optimizations impossible when LLM calls are hidden in custom scripts or microservices.
Core Semantic Operators for Data Enrichment
Schema-Driven Extraction for Type-Safe Enrichment
The semantic.extract operator converts unstructured text into validated data structures, eliminating the "prompt-parse-validate" cycle that makes traditional LLM pipelines brittle.
Define your extraction schema once using Pydantic:
pythonfrom pydantic import BaseModel, Field from typing import List, Literal class Issue(BaseModel): category: Literal["bug", "feature_request", "question"] severity: Literal["low", "medium", "high", "critical"] description: str class Ticket(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] region: Literal["us", "eu", "apac"] issues: List[Issue] tickets = ( df .with_column("extracted", fc.semantic.extract("raw_ticket", Ticket)) .unnest("extracted") .filter(fc.col("region") == "apac") .explode("issues") ) bugs = tickets.filter(fc.col("issues").category == "bug")
The schema provides three critical enrichment benefits:
Validation at preprocessing time - Type errors surface during batch processing, not during agent runtime when failures are costly.
Consistent structure - Every extracted entity conforms to your schema, making downstream processing predictable.
Row-level lineage - Track which source document produced each extracted entity, enabling debugging when extractions fail.
Field descriptions guide extraction quality:
pythonclass Transaction(BaseModel): merchant: str = Field(description="The business name where transaction occurred") category: Literal["grocery", "dining", "transport", "entertainment", "other"] = Field( description="Transaction category based on merchant type and purchase details" ) amount: float = Field(description="Transaction amount in USD") is_recurring: bool = Field( description="True if this appears to be a recurring/subscription charge" )
Clear descriptions with constraints reduce hallucination and improve extraction accuracy. This pattern extends to nested schemas where you extract structured hierarchies from unstructured text in a single operation.
Semantic Filtering with Natural Language Predicates
The semantic.predicate operator enables content-based filtering using natural language conditions instead of regex or keyword matching:
pythonapplicants = df.filter( (fc.col("yoe") > 5) & fc.semantic.predicate( "Has MCP Protocol experience? Resume: {{resume}}", resume=fc.col("resume"), ) )
This combines traditional boolean logic with semantic knowledge. The query engine optimizes both together—filtering on cheap boolean conditions first before invoking expensive LLM predicates.
Predicates accept Jinja template variables for dynamic, data-aware prompts:
pythonfc.semantic.predicate( """ Does this feedback mention {{ search_term }}? {% if priority == "high" %} Only return true if it's a critical issue. {% endif %} Feedback: {{ feedback_text }} """, search_term=fc.lit("UI problems"), priority=fc.col("priority"), feedback_text=fc.col("raw_feedback") )
The template evaluates per row, allowing row-specific filtering logic while maintaining the declarative DataFrame abstraction.
Semantic Joins for Meaning-Based Data Enrichment
Traditional joins require exact matches. Semantic joins determine matches based on meaning:
pythonprompt = """ Is this candidate a good fit for the job? Candidate Background: {{left_on}} Job Requirements: {{right_on}} Use the following criteria to make your decision: - Technical skills alignment - Experience level appropriateness - Domain knowledge overlap """ joined = ( applicants.semantic.join( other=jobs, predicate=prompt, left_on=fc.col("resume"), right_on=fc.col("job_description"), ) .order_by("application_date") .limit(5) )
The predicate receives both left and right row data as context, enabling sophisticated matching logic. Fenic optimizes semantic joins by:
- Batching LLM calls across candidate pairs
- Caching decisions for repeated comparisons
- Using embeddings for initial filtering before applying expensive LLM predicates
This pattern works for:
- Matching documents to queries in RAG systems
- Linking related records across databases without foreign keys
- Finding similar but not identical content
- Deduplication based on semantic similarity rather than string distance
Semantic joins enrich data by connecting related entities that traditional joins cannot match. For RudderStack's triage system, semantic joins connected new feature requests to existing PRDs and strategy documents, reducing PM triage time by 95%.
Enriching with AI-Native Data Types
Fenic goes beyond standard data types with first-class support for AI-native formats: MarkdownType, TranscriptType, JSONType, HTMLType, and EmbeddingType. These aren't metadata tags—they unlock specialized enrichment operations.
Structure-Aware Document Enrichment with MarkdownType
pythondf = ( df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks("raw_blog", header_level=2) ) .explode("chunks") .with_column( "embeddings", fc.semantic.embed(fc.col("chunks").content) ) )
The markdown.extract_header_chunks function leverages document structure (sections, paragraphs, headings) for semantically meaningful chunks instead of naive character-count splitting. This dramatically improves RAG quality by preserving context boundaries and avoiding mid-sentence splits.
Speaker-Aware Transcript Enrichment with TranscriptType
TranscriptType handles SRT, WebVTT, and generic transcript formats with native speaker and timestamp awareness:
pythonfrom pydantic import BaseModel, Field class SegmentSchema(BaseModel): speaker: str = Field(description="Who is talking in this segment") start_time: float = Field(description="Start time (seconds)") end_time: float = Field(description="End time (seconds)") key_points: list[str] = Field(description="Bullet points for this segment") # Load and process transcript transcript_text = Path("data/transcript.json").read_text() df = session.create_dataframe({"transcript": [transcript_text]}) processed = ( df.select( "*", fc.text.recursive_token_chunk( "transcript", chunk_size=1200, chunk_overlap_percentage=0 ).alias("chunks"), ) .explode("chunks") .select( fc.col("chunks").alias("chunk"), fc.semantic.extract( "chunk", SegmentSchema, model_alias="mini" ).alias("segment"), ) )
Fenic preserves speaker identity and timestamps through transformations, enabling speaker-aware analysis without manual parsing. Aggregate by speaker, analyze conversation flows, or extract speaker-specific insights while maintaining temporal context.
Nested Data Enrichment with JQ Expressions
JSONType supports JQ expressions for nested data manipulation:
python.with_column("author", fc.json.jq("metadata", ".author.name")) .with_column("tags", fc.json.jq("metadata", ".tags[]"))
This eliminates verbose dictionary navigation code and handles missing keys gracefully, making JSON enrichment operations declarative rather than imperative.
Building Complete Data Enrichment Pipelines
Production data enrichment combines multiple semantic operators with traditional DataFrame operations. Here's a complete podcast enrichment pipeline:
pythonfrom pathlib import Path from pydantic import BaseModel, Field import fenic as fc class SegmentSchema(BaseModel): speaker: str = Field(description="Who is talking in this segment") start_time: float = Field(description="Start time (seconds)") end_time: float = Field(description="End time (seconds)") key_points: list[str] = Field(description="Bullet points for this segment") class EpisodeSummary(BaseModel): title: str guests: list[str] main_topics: list[str] actionable_insights: list[str] # Initialize session with model alias config = fc.SessionConfig( app_name="podcast_enrichment", semantic=fc.SemanticConfig( language_models={ "mini": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=300, tpm=150_000 ) } ), ) session = fc.Session.get_or_create(config) # Load raw data data_dir = Path("data") transcript_text = (data_dir / "transcript.json").read_text() meta_text = (data_dir / "meta.json").read_text() df = session.create_dataframe({"meta": [meta_text], "transcript": [transcript_text]}) # Extract metadata and segment transcript processed = ( df.select( "*", fc.semantic.extract( "meta", EpisodeSummary, model_alias="mini" ).alias("episode"), fc.text.recursive_token_chunk( "transcript", chunk_size=1200, chunk_overlap_percentage=0 ).alias("chunks"), ) .explode("chunks") .select( fc.col("chunks").alias("chunk"), fc.semantic.extract( "chunk", SegmentSchema, model_alias="mini" ).alias("segment"), ) ) # Create abstracts per segment and aggregate by speaker final = ( processed .select( "*", fc.semantic.map( "Summarize this segment in 2 sentences:\n{{chunk}}", chunk=fc.col("chunk"), model_alias="mini" ).alias("segment_summary") ) .group_by(fc.col("segment").speaker) .agg( fc.semantic.reduce( "Combine these summaries into one clear paragraph", fc.col("segment_summary"), model_alias="mini" ).alias("speaker_summary") ) ) final.show(truncate=120) final.write.parquet("enriched_podcasts.parquet")
This pipeline demonstrates six composability patterns:
- Schema-driven extraction - Pydantic models define output structure for consistent parsing
- Intelligent chunking - Semantic-aware text splitting respects structure and context
- Explode for row multiplication - Transform single transcript into multiple segment rows
- Nested structure access - Reference nested fields like
segment.speakernaturally - Semantic aggregation - Group data and apply LLM operations across groups
- Mixed operations - Combine semantic and traditional DataFrame operations in one pipeline
The pipeline reads raw text, extracts structure, transforms content, aggregates semantically, and writes results—all declaratively expressed with automatic optimization, batching, and error handling.
Production-Ready Data Enrichment Configuration
Multi-Provider Model Configuration for Cost Optimization
Production enrichment pipelines need flexibility in model selection. Configure multiple providers with different cost-performance profiles:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "cheap": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "fast": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash-lite", rpm=300, tpm=100_000 ), "powerful": fc.AnthropicLanguageModel( model_name="claude-opus-4-0", rpm=100, input_tpm=100_000, output_tpm=100_000 ), }, default_language_model="cheap", ) )
Use cheap models for simple classification, fast models for bulk processing, and powerful models for intricate reasoning. Strategic model selection reduces costs by 80% while maintaining quality for appropriate tasks.
Model Profiles for Dynamic Workload Adaptation
Configure the same model with different settings for specific workloads:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "claude": fc.AnthropicLanguageModel( model_name="claude-opus-4-0", rpm=100, input_tpm=100, output_tpm=100, profiles={ "thinking_disabled": fc.AnthropicLanguageModel.Profile(), "fast": fc.AnthropicLanguageModel.Profile( thinking_token_budget=1024 ), "thorough": fc.AnthropicLanguageModel.Profile( thinking_token_budget=4096 ) }, default_profile="fast" ) }, default_language_model="claude" ) ) # Use default "fast" profile for standard enrichment fc.semantic.map( "Extract key insights from {{text}}", text=fc.col("text"), model_alias="claude" ) # Override to "thorough" profile for intricate analysis fc.semantic.map( "Analyze technical details in {{specification}}", specification=fc.col("spec"), model_alias=fc.ModelAlias(name="claude", profile="thorough") )
This enables dynamic model selection based on enrichment task difficulty without changing pipeline code.
Rate Limiting and Self-Throttling
Fenic automatically respects provider rate limits with configured rpm (requests per minute) and tpm (tokens per minute):
python"nano": fc.OpenAILanguageModel( model_name="gpt-4.1-nano", rpm=500, tpm=200_000 )
The engine tracks token usage in real-time and self-throttles when approaching limits. Async I/O with concurrent request batching maximizes throughput while staying within constraints. Built-in retry logic handles transient failures automatically.
Architectural Patterns for Agent Data Enrichment
Batch Preprocessing Separated from Real-Time Execution
The most impactful pattern for agent performance is separating batch data enrichment from real-time decision-making:
python# Batch preprocessing pipeline (runs offline) enriched_data = ( raw_documents .with_column("raw_md", fc.col("content").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks(fc.col("raw_md"), header_level=2) ) .explode("chunks") .with_column( "embedding", fc.semantic.embed(fc.col("chunks").content) ) .with_column( "metadata", fc.semantic.extract( fc.col("chunks"), DocumentMetadata, model_alias="cheap" ) ) ) enriched_data.write.parquet("s3://my-bucket/enriched/") # Agent runtime: fast, predictable # Agents query enriched data without expensive inference at request time
This architecture provides:
- More predictable agents - No LLM latency in user-facing paths
- Better resource utilization - Batch processing amortizes fixed costs
- Cleaner separation - Planning/orchestration decoupled from execution
- Easier debugging - Preprocessing happens once, validated offline
Intelligent Caching for Iterative Development
Cache expensive enrichment operations explicitly:
pythondf_cached = ( df .filter(...) .with_column("extracted", fc.semantic.extract(...)) .cache() ) # Subsequent operations use cached results without recomputation result1 = df_cached.filter(condition1).collect() result2 = df_cached.filter(condition2).collect()
The engine also caches identical inference calls automatically within a session, preventing redundant API calls when the same prompt with same input appears multiple times.
Lakehouse-Native Architecture
Fenic is pure compute with no proprietary storage layer. Read from and write to existing lakehouses without data movement:
pythondf = session.read.parquet("s3://data-lake/raw/*.parquet") enriched = ( df .with_column("extracted", fc.semantic.extract(...)) .with_column("classified", fc.semantic.classify(...)) .filter(...) ) enriched.write.parquet("s3://data-lake/enriched/")
Full compatibility with Parquet, Iceberg, Delta Lake, and Lance enables seamless integration with existing infrastructure. Built on Apache Arrow for ecosystem interoperability—enriched data works with Spark, Polars, DuckDB, and pandas.
Practical Data Enrichment Patterns
Pattern 1: Hierarchical Extraction for Cost Optimization
Start with fast, cheap models for initial classification, then apply expensive accurate models only to high-value subsets:
pythonresult = ( df .with_column( "mentions_pricing", fc.semantic.classify( fc.col("text"), classes=["yes", "no"], model_alias="fast" ) ) .filter(fc.col("mentions_pricing") == "yes") .with_column( "pricing_info", fc.semantic.extract( fc.col("text"), PricingInquiry, model_alias="accurate" ) ) )
The cost difference between models is often 10-100x. Hierarchical extraction reduces costs by 80% while maintaining quality.
Pattern 2: Semantic Clustering Before Enrichment
Group related entities before expensive enrichment operations:
pythonclustered = ( feedback .semantic.with_cluster_labels( by=fc.col("text_embedding"), num_clusters=15 ) .group_by("cluster_label") .agg( fc.semantic.reduce( "Summarize the common themes in this feedback: {{text}}", fc.col("text"), group_context={"text": fc.col("text")} ) ) )
Process 15 cluster summaries instead of thousands of individual comments—dramatically faster and cheaper.
Pattern 3: Progressive Enrichment Pipelines
Build structured metadata incrementally where each stage adds information:
pythonenriched = ( raw_data .with_column( "metadata", fc.semantic.extract(fc.col("text"), BaseMetadata) ) .with_column( "priority", fc.semantic.classify( fc.col("text"), classes=["urgent", "normal", "low"] ) ) .with_column("embedding", fc.semantic.embed(fc.col("text"))) .semantic.with_cluster_labels( by=fc.col("embedding"), num_clusters=10 ) )
Each enrichment stage builds on previous stages, creating progressively richer structured data.
Pattern 4: Hybrid Fuzzy and Semantic Matching
Use fuzzy string matching for initial candidate selection before expensive semantic joins:
python# Fast fuzzy scoring for blocking candidates = ( left_df.join(right_df) # Cross join .with_column( "fuzzy_score", fc.text.compute_fuzzy_ratio( fc.col("company_name"), fc.col("business_name"), method="jaro_winkler" ) ) .filter(fc.col("fuzzy_score") > 80) # Score is 0-100 ) # Then expensive semantic matching on candidates final = candidates.semantic.join( predicate="Are these the same company? Left: {{left_name}}, Right: {{right_name}}", left_on=fc.col("company_description"), right_on=fc.col("business_description") )
This hybrid approach reduces costs by orders of magnitude compared to semantic joins on full cross-products.
Observability and Debugging for Data Enrichment
Row-Level Lineage for Tracing Enrichment Operations
When enrichment produces unexpected results, lineage traces every output back through transformations:
python# Access lineage information lineage = df.lineage() # Trace backwards from problematic result rows source_rows = lineage.backward(["result_uuid1", "result_uuid2"]) # Trace forwards from source rows result_rows = lineage.forward(["source_uuid1"]) # Execute the query result = df.collect()
Granular tracking shows:
- Source document that provided context
- Every transformation applied during enrichment
- Which prompt template was used
- What model generated the result
- Token costs and timing for each operation
Query Metrics for Cost and Performance Analysis
Built-in metrics provide operation-level visibility:
pythonresult = df.collect() print(f"Query duration: {result.metrics.query_duration_ms}ms") print(f"Total tokens: {result.metrics.lm_metrics.total_tokens}") print(f"Total cost: ${result.metrics.lm_metrics.total_cost}") for op in result.metrics.operator_metrics: print(f"Operator: {op.operator_name}") print(f"Duration: {op.duration_ms}ms")
This observability transforms enrichment development from "tweak prompts and hope" to "measure, analyze, optimize." Identify which operations are bottlenecks, which models provide the best accuracy-to-cost ratio, and where to focus optimization efforts.
Best Practices for Production Data Enrichment
Design Clear Pydantic Schemas with Descriptions
Schema field descriptions guide extraction quality:
pythonclass CustomerFeedback(BaseModel): sentiment: Literal["positive", "negative", "neutral"] = Field( description="Overall sentiment of the feedback" ) product_area: Literal["ui", "performance", "features", "pricing"] = Field( description="Which product area the feedback relates to" ) priority: Literal["low", "medium", "high", "urgent"] = Field( description="Urgency based on impact and user frustration level" ) action_required: bool = Field( description="True if this requires immediate product team action" )
Clear descriptions with examples and constraints improve extraction accuracy significantly. Literal types constrain outputs to valid categories, reducing hallucination.
Test Enrichment Pipelines Incrementally
Develop and test with small representative samples before scaling:
python# Development: 100 rows df_sample = df.limit(100) result = df_sample.with_column("extracted", fc.semantic.extract(fc.col("text"), schema)).collect() print(f"Cost for 100 rows: ${result.metrics.lm_metrics.total_cost}") # Validate results, then scale df_full.with_column("extracted", fc.semantic.extract(fc.col("text"), schema)).write.parquet("output/")
Fenic's lazy evaluation and metrics make it easy to estimate costs and validate logic before processing millions of rows.
Monitor and Optimize Based on Metrics
Regularly review pipeline metrics to identify optimization opportunities:
pythonresult = pipeline.collect() metrics = result.metrics # Identify expensive operators for op in metrics.operator_metrics: if op.cost > 10.0: # $10+ operators print(f"Expensive operator: {op.operator_name}, Cost: ${op.cost}") # Check model usage distribution print(f"Total tokens: {metrics.lm_metrics.total_tokens} tokens") print(f"Total cost: ${metrics.lm_metrics.total_cost}")
Use insights to shift operations to cheaper models, add caching, or restructure pipelines for efficiency.
The Infrastructure Advantage for Data Enrichment
Traditional data platforms treat LLM calls as external black-box UDFs that query optimizers cannot inspect or optimize. Fenic's inference-first approach embeds LLM operations directly into the query engine as first-class citizens.
When the query optimizer sees semantic.extract() or semantic.join(), it understands this is an inference operation with specific characteristics: high latency, token costs, batching benefits, caching opportunities. The optimizer can:
- Reorder operations to minimize data processed by expensive inference
- Batch requests across rows to amortize fixed costs
- Cache aggressively since deterministic operations with same inputs produce same outputs
- Parallelize intelligently across multiple providers or models
- Estimate costs accurately before execution
This is impossible when LLM calls are hidden in custom scripts or microservices. Fenic's semantic operators make inference visible to the optimizer, enabling optimizations that dramatically improve performance and reduce costs.
The declarative API also provides auditability and reproducibility. Every enrichment operation is explicitly defined with inputs, prompts, and model configurations tracked automatically. Row-level lineage traces data flow through transformations—critical for debugging and compliance.
Combined with native AI data types (Markdown, Transcript, JSON with structure awareness), automatic batch optimization, multi-provider support, and production-grade error handling, Fenic represents infrastructure purpose-built for data enrichment workloads.
From Enrichment Chaos to Structured Pipelines
Data enrichment using semantic operators transforms AI agent development from brittle preprocessing scripts into robust, optimizable pipelines. By treating inference as a first-class operation within a familiar DataFrame API, teams build production systems with the same rigor applied to traditional data pipelines.
The key principles: declarative operations enable optimization, type-safe schemas eliminate brittle prompts, intelligent batching reduces costs, and row-level lineage makes debugging tractable. When semantic operators compose naturally with traditional DataFrame operations, you stop choosing between structured and unstructured data—you build unified enrichment pipelines that handle both.
For teams building semantic extraction, content classification, RAG systems, or agent preprocessing pipelines, semantic operators provide the foundation for reliable, scalable, cost-effective data enrichment infrastructure.
Start with simple operations like semantic.extract or semantic.classify on small datasets, validate results and costs, then scale to production with confidence that the infrastructure handles batching, optimization, error handling, and observability automatically.
Additional Resources
- Learn more about Typedef
- Explore Fenic on GitHub
- Read about how Typedef cut RudderStack's triage time by 95%
- Building reliable AI pipelines with Fenic's semantic operators
- Enhance AI agent preprocessing for better decision-making
- Fenic 0.5.0 release notes How to Tackle Data Enrichme ... fcf080ed947de5cd98c99dba.md External Displaying How to Tackle Data Enrichment for AI Agents Using 297df41efcf080ed947de5cd98c99dba.md.

