Traditional data architectures force teams into an impossible choice: build reliable analytics pipelines or build AI-powered applications. The assumption that these workloads require separate systems creates operational complexity, duplicated infrastructure, and fragile glue code connecting disparate components.
This guide shows how to unify inference, search, and analytics in a single engine using an inference-first architecture. You'll learn the technical patterns, implementation strategies, and production considerations for building deterministic workflows on non-deterministic models.
The Architectural Gap in Traditional Data Platforms
Why Legacy Systems Fail for AI Workloads
SQL-era data platforms optimize for structured queries, batch ETL processes, and deterministic transformations. They treat LLM operations as external black boxes wrapped in User Defined Functions. This creates fundamental impedance mismatches:
python# Traditional approach: LLM calls as external UDFs import time from openai import OpenAI import pandas as pd client = OpenAI() def extract_sentiment(text): # Manual rate limiting time.sleep(0.1) try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": f"Analyze sentiment: {text}"}] ) return response.choices[0].message.content except Exception as e: return retry_with_backoff(extract_sentiment, text) df = pd.DataFrame({"text": ["I love this product!", "This is terrible."]}) df["sentiment"] = df["text"].apply(extract_sentiment)
The query engine has zero visibility into the UDF's operations. It cannot batch API calls, cache inference patterns, optimize operation ordering, or handle rate limits intelligently. Teams end up managing these concerns manually across every pipeline.
The Cost of Separation
When inference, search, and analytics live in separate systems, teams face:
Infrastructure sprawl: OCR services, transcription APIs, vector databases, LLM providers, OLAP warehouses, and custom microservices—each requiring integration, monitoring, and maintenance.
Data movement overhead: Constant serialization between systems adds latency and failure modes. Context gets lost crossing boundaries.
Optimization impossibility: Each system optimizes locally without understanding the full pipeline. Operations that could be reordered or batched run sequentially.
Development velocity collapse: Engineers spend 80% of time on infrastructure plumbing, 20% on business logic.
The RudderStack case study demonstrates this gap. Their team piloted LLM/MCP experiments but faced low signal-to-noise ratios from heterogeneous inputs and brittle outcomes requiring constant prompt steering.
Building an Inference-First Architecture
Core Principles
Unifying inference, search, and analytics requires rethinking the query engine from first principles. The key insight: treat semantic understanding as a native data operation, not a bolted-on feature.
Fenic implements this through an inference-first architecture where LLM calls are first-class operations within the data processing engine. The platform fuses OLAP, search, and inference into one system.
Semantic operators as DataFrame primitives: Operations like semantic.extract, semantic.filter, and semantic.join are native to the engine, not external functions.
Query optimization for inference: The optimizer understands when inference happens, enabling automatic batching, intelligent caching, and cost optimization.
Native unstructured data types: Markdown, transcripts, JSON, HTML, and embeddings are first-class types with specialized operations.
The DataFrame Abstraction for AI Workloads
The platform's core insight: agentic workflows and AI applications are pipelines. They take inputs, reason over context, generate outputs, and log results. DataFrames provide the right abstraction:
- Lineage: Every column and row has traceable origins, even from model output
 - Columnar consistency: Whether summary, embedding, or classification, columns stay structured
 - Deterministic transformations: Inference wrapped in declarative logic—model + prompt + input → output—enables caching, versioning, and debugging
 - Lazy evaluation: Optimize entire pipelines before execution
 
Implementing Unified Inference Operations
Schema-Driven Extraction
Transform unstructured text into structured data using Pydantic schemas:
pythonimport fenic as fc from pydantic import BaseModel, Field from typing import Literal class Ticket(BaseModel): customer_tier: Literal["free", "pro", "enterprise"] region: Literal["us", "eu", "apac"] issues: List[Issue] tickets = (df .with_column("extracted", fc.semantic.extract(fc.col("raw_ticket"), Ticket)) .unnest("extracted") .filter(fc.col("region") == "apac") .explode("issues") ) bugs = tickets.filter(fc.col("issues").category == "bug")
The schema acts as both documentation and validation. Type-safe results eliminate manual parsing and prompt brittleness. More importantly, the query engine sees the extraction operation and can optimize its execution.
Semantic Filtering and Predicates
Traditional filters require exact matches. Semantic predicates enable natural language filtering:
pythonapplicants = df.filter( (fc.col("yoe") > 5) & fc.semantic.predicate( "Has MCP Protocol experience? Resume: {{resume}}", resume=fc.col("resume") ) )
This combines traditional column filtering with semantic understanding. The engine can reorder operations—running the cheap structured filter first, then applying the expensive semantic predicate only to remaining rows.
Meaning-Based Joins
Semantic joins enable joining DataFrames based on semantic similarity rather than exact matches:
pythonprompt = """ Is this candidate a good fit for the job? Candidate Background: {{left_on}} Job Requirements: {{right_on}} Use the following criteria to make your decision: ... """ joined = (applicants .semantic.join( other=jobs, predicate=prompt, left_on=fc.col("resume"), right_on=fc.col("job_description") ) .order_by("application_date") .limit(5) )
The semantic join matches candidates to jobs based on qualifications rather than keywords. The query optimizer can apply this after other filters to minimize expensive LLM operations.
Integrating Search Capabilities
Vector Operations as First-Class Citizens
The platform includes EmbeddingType with native similarity operations:
pythondf = (df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks(fc.col("raw_blog"), header_level=2) ) .with_column("title", fc.json.jq(fc.col("raw_blog"), ".title")) .explode("chunks") .with_column( "embeddings", fc.semantic.embed(fc.col("chunks").content) ) )
Embeddings integrate seamlessly with other operations. The engine understands vector similarity and can optimize hybrid search patterns combining semantic and structured filters.
Multi-Stage Context Refinement
Implement sophisticated search-to-inference pipelines:
python# Stage 1: Broad similarity search stage1_broad = (df .with_column( "similarity", fc.embedding.compute_similarity(fc.col("content_embedding"), user_embedding) ) .order_by(fc.col("similarity").desc()) .limit(100) ) # Stage 2: Semantic filter stage2_relevant = stage1_broad.filter( fc.semantic.predicate( "Directly addresses: {{content}}", content=fc.col("content") ) ) # Stage 3: Aggregate with LLM stage3_refined = (stage2_relevant .group_by(fc.lit(1).alias("group")) .agg( fc.semantic.reduce( "Create comprehensive summary", fc.col("content") ).alias("refined_context") ) )
This pattern combines vector search (Stage 1), semantic filtering (Stage 2), and inference-based aggregation (Stage 3). The query optimizer sees the full pipeline and can reorder operations for efficiency.
Unified Analytics with Semantic Operations
Semantic Group By and Reduce
Group data by semantic similarity rather than exact matches:
python# Group semantically similar support tickets clustered = (df .semantic.with_cluster_labels( by=fc.col("embedding"), num_clusters=5, label_column="cluster" ) .group_by("cluster") .agg( fc.semantic.reduce( "Summarize common themes", fc.col("ticket_text") ).alias("cluster_summary") ) )
The semantic.reduce operator aggregates grouped data with LLM operations. This enables analytics that understand meaning, not just keywords.
Classification and Transformation
Apply natural language transformations at scale:
pythonclassified = (df .with_column( "category", fc.semantic.classify( fc.col("text"), classes=["category1", "category2", "category3"] # needs classes parameter ) ) .with_column( "priority", fc.semantic.map( "Assess urgency: {{text}}", text=fc.col("text") ) ) )
These operations integrate naturally with traditional analytics. Filter by category, aggregate by priority, join with structured data—all in one pipeline.
Production-Grade Optimization
Automatic Batching and Caching
The query engine optimizes inference operations automatically:
pythonconfig = fc.SessionConfig( app_name="production_pipeline", semantic=fc.SemanticConfig( language_models={ "nano": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, tpm=200_000 ), "flash": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash", rpm=300, tpm=150_000 ) }, default_language_model="flash" ) ) session = fc.Session.get_or_create(config)
The engine handles:
- Batch optimization: Groups API calls efficiently based on provider limits
 - Async I/O: Maximizes throughput with concurrent request batching
 - Self-throttling: Respects rate limits through automatic adjustment
 - Intelligent caching: Reuses inference results across pipeline stages
 
According to performance statistics, semantic operators achieve substantial benchmark speedups—up to several hundred times faster than naive implementations while maintaining statistical accuracy guarantees.
Multi-Provider Model Integration
Configure multiple LLM providers with consistent interfaces:
pythonconfig = fc.SessionConfig( semantic=fc.SemanticConfig( language_models={ "fast": fc.OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100000 ), "accurate": fc.AnthropicLanguageModel( model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=100000, output_tpm=50000 ), "cheap": fc.GoogleVertexLanguageModel( model_name="gemini-2.0-flash", rpm=200, tpm=200000 ) } ) )
Select models based on task requirements without changing pipeline code:
python# Use fast model for classification df.select( fc.semantic.classify(fc.col("content"), classes=["class1", "class2", "class3"], model_alias="fast") ) # Use accurate model for complex extraction df.select( fc.semantic.extract(fc.col("document"), ComplexSchema, model_alias="accurate") )
Cost Optimization Through Model Cascades
The OLAP-LLM integration statistics show that optimized semantic pipelines reduce LLM invocations by orders of magnitude through model cascading—using small models or embeddings to filter before expensive model calls.
python# Cascade pattern: fast filter → accurate extraction pipeline = (df .filter( fc.semantic.predicate( "Contains financial data: {{text}}", text=fc.col("text"), model_alias="fast" ) ) .with_column( "extracted", fc.semantic.extract( fc.col("text"), FinancialSchema, model_alias="accurate" ) ) )
The engine automatically batches operations and minimizes expensive calls.
Handling Specialized Data Types
Native Markdown Processing
pythondf = (df .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType)) .with_column( "chunks", fc.markdown.extract_header_chunks( fc.col("raw_blog"), header_level=2 ) ) .explode("chunks") )
The MarkdownType understands document structure. Chunking respects semantic boundaries rather than arbitrary character counts.
Transcript Processing
pythonmeetings = (df .with_column( "transcript", fc.col("file").cast(fc.TranscriptType) ) .with_column( "action_items", fc.semantic.extract( fc.col("transcript"), ActionItemSchema ) ) .filter(fc.col("action_items.owner") == "Engineering") )
The TranscriptType handles SRT and WebVTT formats with speaker awareness and timestamp preservation.
Document Path Handling
pythondf = session.read.docs( "/data/documents/", content_type="markdown", recursive=True )
Load entire directories into DataFrames with automatic format detection and metadata extraction.
Real-World Implementation Patterns
Content Classification Pipeline
Media companies use unified pipelines for large-scale content intelligence:
pythonarticles = (df .with_column( "extracted", fc.semantic.extract(fc.col("content"), ArticleSchema) ) .unnest("extracted") .with_column( "embeddings", fc.semantic.embed(fc.col("extracted.summary")) ) .semantic.with_cluster_labels( by=fc.col("embeddings"), num_clusters=20, label_column="topic_cluster" ) .group_by("topic_cluster") .agg( fc.count("*").alias("article_count"), fc.semantic.reduce( "Identify cluster theme", fc.col("extracted.summary") ).alias("theme") ) )
This single pipeline handles extraction (inference), clustering (search), and aggregation (analytics).
Policy Analysis System
Insurance companies process thousands of policies with semantic extraction:
pythonclass PolicyInsight(BaseModel): risk_level: Literal["low", "medium", "high", "critical"] coverage_gaps: list[str] recommendations: list[str] results = (df .with_column( "policy_insight", fc.semantic.extract(fc.col("policy_text"), PolicyInsight) ) .unnest("policy_insight") .filter( fc.semantic.predicate( "{{policy_insight}} has non-empty coverage gaps", policy_insight=fc.col("policy_insight") ) ) .semantic.join( other=claims_df, predicate="Policy {{left_on}} relates to claim {{right_on}}", left_on=fc.col("policy_id"), right_on=fc.col("claim_policy_ref") ) )
As reported by Matic Insurance: "Typedef lets us build and deploy semantic extraction pipelines across thousands of policies and transcripts in days not months."
Product Triage Agent
The RudderStack implementation cut triage time by 95% through warehouse-native context:
python# 1. Ingest and normalize warehouse_data = session.read.table("support_tickets") docs_data = session.read.docs("/product_docs/", content_type="markdown") # 2. Build semantic context taxonomy = (docs_data .with_column( "product_features", fc.semantic.extract(fc.col("content"), FeatureSchema) ) .unnest("product_features") ) # 3. Map tickets to taxonomy classified_tickets = (warehouse_data .semantic.join( other=taxonomy, predicate="Ticket {{left_on}} relates to feature {{right_on}}", left_on=fc.col("ticket_text"), right_on=fc.col("product_features.description") ) .with_column( "decision", fc.semantic.map( "Recommend: Prioritize / Monitor / Decline for {{combined_context}}", combined_context=fc.col("combined_context") ) ) )
This unifies ticket analysis (inference), feature matching (search), and decision aggregation (analytics).
Scaling from Prototype to Production
Local-First Development
Develop and test with the full engine locally:
python# Local development session = fc.Session.get_or_create(fc.SessionConfig( app_name="prototype" )) df = session.read.csv("local_data.csv") processed = df.with_column( "extracted", fc.semantic.extract(fc.col("text"), Schema) ) processed.write.parquet("results.parquet")
Cloud Deployment Without Code Changes
Scale to production with zero code modifications:
python# Production deployment config = fc.SessionConfig( app_name="production", cloud=fc.CloudConfig( size=fc.CloudExecutorSize.MEDIUM ) ) session = fc.Session.get_or_create(config) # Same pipeline code df = session.read.csv("s3://bucket/data/*.csv") processed = df.with_column( "extracted", fc.semantic.extract(fc.col("text"), Schema) ) processed.write.parquet("s3://bucket/results/")
The platform handles:
- Automatic scaling based on workload
 - Distributed execution across nodes
 - Resource management and optimization
 - Cost tracking and monitoring
 
Monitoring and Observability
Row-Level Lineage
Track individual record processing history:
pythonresult = df.select( fc.semantic.map( "Analyze sentiment: {{text}}", text=fc.col("text") ) ).collect() # Access comprehensive metrics print(result.metrics.total_lm_metrics.num_output_tokens) print(result.metrics.total_lm_metrics.cost) print(result.metrics.execution_time_ms)
Lineage enables debugging non-deterministic AI pipelines—trace how individual records transform through each operation.
Query Metrics and Cost Tracking
Built-in visibility into resource usage:
pythonmetrics = session.table("fenic_system.query_metrics") metrics.select( "model", "latency_ms", "cost_usd" ).order_by("latency_ms").show()
Track token usage, query performance, and costs across models for optimization.
Explain Plans
Visualize how the optimizer executes queries:
pythonpipeline.explain()
Understanding the query plan helps identify bottlenecks and optimization opportunities.
Declarative Tool Integration
The Fenic 0.4.0 release introduced declarative tool creation for agent systems:
pythonfrom fenic.core.mcp.types import ToolParam # Register DataFrame query as tool session.catalog.create_tool( tool_name="search_documents", tool_description="Search through documentation", tool_query=df, # The DataFrame query tool_params=[ ToolParam( name="search_term", description="Term to search for", default_value="default" ) ], result_limit=50 )
Tools are type-safe, versionable metadata that work across MCP servers, ASGI applications, and CLI interfaces. This unifies tool creation with data processing—the same DataFrame operations power both batch pipelines and agent interactions.
Best Practices
Define Schemas Once
Use Pydantic models as single source of truth:
pythonclass ExtractedData(BaseModel): """Reusable schema across pipeline""" entities: list[str] relationships: dict[str, str] confidence: float = Field(ge=0, le=1) # Reuse throughout pipeline df.select(fc.semantic.extract(fc.col("text"), ExtractedData))
Leverage Lazy Evaluation
Build complex pipelines before execution:
python# Define multi-stage pipeline pipeline = (df .filter(condition1) .with_column("extracted", fc.semantic.extract(...)) .semantic.join(other_df, ...) .cache() # Explicit caching point .filter(fc.semantic.predicate(...)) ) # Execute when ready results = pipeline.collect()
The optimizer sees the entire pipeline and applies sophisticated optimizations.
Use Appropriate Models
Configure model tiers for cost optimization:
pythonlanguage_models = { "nano": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000), # Fast, cheap "standard": fc.AnthropicLanguageModel(model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=50000, output_tpm=50000), # Balanced "power": fc.OpenAILanguageModel(model_name="gpt-4o", rpm=50, tpm=50000) # Accurate } # Use appropriate model per task df.select( fc.semantic.classify(fc.col("text"), classes=["class1", "class2"], model_alias="nano") # Simple classification ) df.select( fc.semantic.extract(fc.col("complex_doc"), Schema, model_alias="power") # Complex extraction )
Implement Explicit Caching
Cache expensive operations strategically:
pythondf_enriched = (df .with_column( "extracted", fc.semantic.extract(fc.col("text"), schema, model_alias="gpt-4") ) .unnest("extracted") .cache() # Persist expensive extraction .filter(fc.semantic.predicate(...)) )
Caching speeds up iterative development and reduces redundant API calls.
The Path Forward
Unifying inference, search, and analytics in a single engine eliminates the architectural complexity that prevents AI systems from reaching production. By treating semantic operations as first-class citizens within the data processing layer, teams build deterministic workflows on non-deterministic models.
The results demonstrate the approach's viability:
- Insurance companies deploy semantic extraction pipelines in days instead of months
 - Product teams achieve 95% reductions in triage time
 - Media platforms process millions of articles with context-aware classification
 - Enterprise analytics report 100x time savings on semantic queries
 
As the semantic knowledge graph market grows at 14.2% CAGR, organizations building on inference-first architectures gain sustainable competitive advantages through reliable, operationalized AI systems.
Start building with Fenic, explore the open source announcement, or learn about the latest release features. For enterprise deployments, visit typedef.ai to explore the cloud platform. How to Unify Inference, Sear ... efcf0800298feeb90ff053ec4.md External Displaying How to Unify Inference, Search, and Analytics in a 295df41efcf0800298feeb90ff053ec4.md.

