<< goback()

How to Unify Inference, Search, and Analytics in a Single Engine

Typedef Team

How to Unify Inference, Search, and Analytics in a Single Engine

Traditional data architectures force teams into an impossible choice: build reliable analytics pipelines or build AI-powered applications. The assumption that these workloads require separate systems creates operational complexity, duplicated infrastructure, and fragile glue code connecting disparate components.

This guide shows how to unify inference, search, and analytics in a single engine using an inference-first architecture. You'll learn the technical patterns, implementation strategies, and production considerations for building deterministic workflows on non-deterministic models.

The Architectural Gap in Traditional Data Platforms

Why Legacy Systems Fail for AI Workloads

SQL-era data platforms optimize for structured queries, batch ETL processes, and deterministic transformations. They treat LLM operations as external black boxes wrapped in User Defined Functions. This creates fundamental impedance mismatches:

python
# Traditional approach: LLM calls as external UDFs
import time
from openai import OpenAI
import pandas as pd

client = OpenAI()

def extract_sentiment(text):
    # Manual rate limiting
    time.sleep(0.1)
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": f"Analyze sentiment: {text}"}]
        )
        return response.choices[0].message.content
    except Exception as e:
        return retry_with_backoff(extract_sentiment, text)

df = pd.DataFrame({"text": ["I love this product!", "This is terrible."]})
df["sentiment"] = df["text"].apply(extract_sentiment)

The query engine has zero visibility into the UDF's operations. It cannot batch API calls, cache inference patterns, optimize operation ordering, or handle rate limits intelligently. Teams end up managing these concerns manually across every pipeline.

The Cost of Separation

When inference, search, and analytics live in separate systems, teams face:

Infrastructure sprawl: OCR services, transcription APIs, vector databases, LLM providers, OLAP warehouses, and custom microservices—each requiring integration, monitoring, and maintenance.

Data movement overhead: Constant serialization between systems adds latency and failure modes. Context gets lost crossing boundaries.

Optimization impossibility: Each system optimizes locally without understanding the full pipeline. Operations that could be reordered or batched run sequentially.

Development velocity collapse: Engineers spend 80% of time on infrastructure plumbing, 20% on business logic.

The RudderStack case study demonstrates this gap. Their team piloted LLM/MCP experiments but faced low signal-to-noise ratios from heterogeneous inputs and brittle outcomes requiring constant prompt steering.

Building an Inference-First Architecture

Core Principles

Unifying inference, search, and analytics requires rethinking the query engine from first principles. The key insight: treat semantic understanding as a native data operation, not a bolted-on feature.

Fenic implements this through an inference-first architecture where LLM calls are first-class operations within the data processing engine. The platform fuses OLAP, search, and inference into one system.

Semantic operators as DataFrame primitives: Operations like semantic.extract, semantic.filter, and semantic.join are native to the engine, not external functions.

Query optimization for inference: The optimizer understands when inference happens, enabling automatic batching, intelligent caching, and cost optimization.

Native unstructured data types: Markdown, transcripts, JSON, HTML, and embeddings are first-class types with specialized operations.

The DataFrame Abstraction for AI Workloads

The platform's core insight: agentic workflows and AI applications are pipelines. They take inputs, reason over context, generate outputs, and log results. DataFrames provide the right abstraction:

  • Lineage: Every column and row has traceable origins, even from model output
  • Columnar consistency: Whether summary, embedding, or classification, columns stay structured
  • Deterministic transformations: Inference wrapped in declarative logic—model + prompt + input → output—enables caching, versioning, and debugging
  • Lazy evaluation: Optimize entire pipelines before execution

Implementing Unified Inference Operations

Schema-Driven Extraction

Transform unstructured text into structured data using Pydantic schemas:

python
import fenic as fc
from pydantic import BaseModel, Field
from typing import Literal

class Ticket(BaseModel):
    customer_tier: Literal["free", "pro", "enterprise"]
    region: Literal["us", "eu", "apac"]
    issues: List[Issue]

tickets = (df
    .with_column("extracted", fc.semantic.extract(fc.col("raw_ticket"), Ticket))
    .unnest("extracted")
    .filter(fc.col("region") == "apac")
    .explode("issues")
)

bugs = tickets.filter(fc.col("issues").category == "bug")

The schema acts as both documentation and validation. Type-safe results eliminate manual parsing and prompt brittleness. More importantly, the query engine sees the extraction operation and can optimize its execution.

Semantic Filtering and Predicates

Traditional filters require exact matches. Semantic predicates enable natural language filtering:

python
applicants = df.filter(
    (fc.col("yoe") > 5) &
    fc.semantic.predicate(
        "Has MCP Protocol experience? Resume: {{resume}}",
        resume=fc.col("resume")
    )
)

This combines traditional column filtering with semantic understanding. The engine can reorder operations—running the cheap structured filter first, then applying the expensive semantic predicate only to remaining rows.

Meaning-Based Joins

Semantic joins enable joining DataFrames based on semantic similarity rather than exact matches:

python
prompt = """
Is this candidate a good fit for the job?

Candidate Background: {{left_on}}
Job Requirements: {{right_on}}

Use the following criteria to make your decision: ...
"""

joined = (applicants
    .semantic.join(
        other=jobs,
        predicate=prompt,
        left_on=fc.col("resume"),
        right_on=fc.col("job_description")
    )
    .order_by("application_date")
    .limit(5)
)

The semantic join matches candidates to jobs based on qualifications rather than keywords. The query optimizer can apply this after other filters to minimize expensive LLM operations.

Integrating Search Capabilities

Vector Operations as First-Class Citizens

The platform includes EmbeddingType with native similarity operations:

python
df = (df
    .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks(fc.col("raw_blog"), header_level=2)
    )
    .with_column("title", fc.json.jq(fc.col("raw_blog"), ".title"))
    .explode("chunks")
    .with_column(
        "embeddings",
        fc.semantic.embed(fc.col("chunks").content)
    )
)

Embeddings integrate seamlessly with other operations. The engine understands vector similarity and can optimize hybrid search patterns combining semantic and structured filters.

Multi-Stage Context Refinement

Implement sophisticated search-to-inference pipelines:

python
# Stage 1: Broad similarity search
stage1_broad = (df
    .with_column(
        "similarity",
        fc.embedding.compute_similarity(fc.col("content_embedding"), user_embedding)
    )
    .order_by(fc.col("similarity").desc())
    .limit(100)
)

# Stage 2: Semantic filter
stage2_relevant = stage1_broad.filter(
    fc.semantic.predicate(
        "Directly addresses: {{content}}",
        content=fc.col("content")
    )
)

# Stage 3: Aggregate with LLM
stage3_refined = (stage2_relevant
    .group_by(fc.lit(1).alias("group"))
    .agg(
        fc.semantic.reduce(
            "Create comprehensive summary",
            fc.col("content")
        ).alias("refined_context")
    )
)

This pattern combines vector search (Stage 1), semantic filtering (Stage 2), and inference-based aggregation (Stage 3). The query optimizer sees the full pipeline and can reorder operations for efficiency.

Unified Analytics with Semantic Operations

Semantic Group By and Reduce

Group data by semantic similarity rather than exact matches:

python
# Group semantically similar support tickets
clustered = (df
    .semantic.with_cluster_labels(
        by=fc.col("embedding"),
        num_clusters=5,
        label_column="cluster"
    )
    .group_by("cluster")
    .agg(
        fc.semantic.reduce(
            "Summarize common themes",
            fc.col("ticket_text")
        ).alias("cluster_summary")
    )
)

The semantic.reduce operator aggregates grouped data with LLM operations. This enables analytics that understand meaning, not just keywords.

Classification and Transformation

Apply natural language transformations at scale:

python
classified = (df
    .with_column(
        "category",
        fc.semantic.classify(
            fc.col("text"),
            classes=["category1", "category2", "category3"]  # needs classes parameter
        )
    )
    .with_column(
        "priority",
        fc.semantic.map(
            "Assess urgency: {{text}}",
            text=fc.col("text")
        )
    )
)

These operations integrate naturally with traditional analytics. Filter by category, aggregate by priority, join with structured data—all in one pipeline.

Production-Grade Optimization

Automatic Batching and Caching

The query engine optimizes inference operations automatically:

python
config = fc.SessionConfig(
    app_name="production_pipeline",
    semantic=fc.SemanticConfig(
        language_models={
            "nano": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,
                tpm=200_000
            ),
            "flash": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=300,
                tpm=150_000
            )
        },
        default_language_model="flash"
    )
)

session = fc.Session.get_or_create(config)

The engine handles:

  • Batch optimization: Groups API calls efficiently based on provider limits
  • Async I/O: Maximizes throughput with concurrent request batching
  • Self-throttling: Respects rate limits through automatic adjustment
  • Intelligent caching: Reuses inference results across pipeline stages

According to performance statistics, semantic operators achieve substantial benchmark speedups—up to several hundred times faster than naive implementations while maintaining statistical accuracy guarantees.

Multi-Provider Model Integration

Configure multiple LLM providers with consistent interfaces:

python
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            ),
            "accurate": fc.AnthropicLanguageModel(
                model_name="claude-3-5-haiku-latest",
                rpm=50,
                input_tpm=100000,
                output_tpm=50000
            ),
            "cheap": fc.GoogleVertexLanguageModel(
                model_name="gemini-2.0-flash",
                rpm=200,
                tpm=200000
            )
        }
    )
)

Select models based on task requirements without changing pipeline code:

python
# Use fast model for classification
df.select(
    fc.semantic.classify(fc.col("content"), classes=["class1", "class2", "class3"], model_alias="fast")
)

# Use accurate model for complex extraction
df.select(
    fc.semantic.extract(fc.col("document"), ComplexSchema, model_alias="accurate")
)

Cost Optimization Through Model Cascades

The OLAP-LLM integration statistics show that optimized semantic pipelines reduce LLM invocations by orders of magnitude through model cascading—using small models or embeddings to filter before expensive model calls.

python
# Cascade pattern: fast filter → accurate extraction
pipeline = (df
    .filter(
        fc.semantic.predicate(
            "Contains financial data: {{text}}",
            text=fc.col("text"),
            model_alias="fast"
        )
    )
    .with_column(
        "extracted",
        fc.semantic.extract(
            fc.col("text"),
            FinancialSchema,
            model_alias="accurate"
        )
    )
)

The engine automatically batches operations and minimizes expensive calls.

Handling Specialized Data Types

Native Markdown Processing

python
df = (df
    .with_column("raw_blog", fc.col("blog").cast(fc.MarkdownType))
    .with_column(
        "chunks",
        fc.markdown.extract_header_chunks(
            fc.col("raw_blog"),
            header_level=2
        )
    )
    .explode("chunks")
)

The MarkdownType understands document structure. Chunking respects semantic boundaries rather than arbitrary character counts.

Transcript Processing

python
meetings = (df
    .with_column(
        "transcript",
        fc.col("file").cast(fc.TranscriptType)
    )
    .with_column(
        "action_items",
        fc.semantic.extract(
            fc.col("transcript"),
            ActionItemSchema
        )
    )
    .filter(fc.col("action_items.owner") == "Engineering")
)

The TranscriptType handles SRT and WebVTT formats with speaker awareness and timestamp preservation.

Document Path Handling

python
df = session.read.docs(
    "/data/documents/",
    content_type="markdown",
    recursive=True
)

Load entire directories into DataFrames with automatic format detection and metadata extraction.

Real-World Implementation Patterns

Content Classification Pipeline

Media companies use unified pipelines for large-scale content intelligence:

python
articles = (df
    .with_column(
        "extracted",
        fc.semantic.extract(fc.col("content"), ArticleSchema)
    )
    .unnest("extracted")
    .with_column(
        "embeddings",
        fc.semantic.embed(fc.col("extracted.summary"))
    )
    .semantic.with_cluster_labels(
        by=fc.col("embeddings"),
        num_clusters=20,
        label_column="topic_cluster"
    )
    .group_by("topic_cluster")
    .agg(
        fc.count("*").alias("article_count"),
        fc.semantic.reduce(
            "Identify cluster theme",
            fc.col("extracted.summary")
        ).alias("theme")
    )
)

This single pipeline handles extraction (inference), clustering (search), and aggregation (analytics).

Policy Analysis System

Insurance companies process thousands of policies with semantic extraction:

python
class PolicyInsight(BaseModel):
    risk_level: Literal["low", "medium", "high", "critical"]
    coverage_gaps: list[str]
    recommendations: list[str]

results = (df
    .with_column(
        "policy_insight",
        fc.semantic.extract(fc.col("policy_text"), PolicyInsight)
    )
    .unnest("policy_insight")
    .filter(
        fc.semantic.predicate(
            "{{policy_insight}} has non-empty coverage gaps",
            policy_insight=fc.col("policy_insight")
        )
    )
    .semantic.join(
        other=claims_df,
        predicate="Policy {{left_on}} relates to claim {{right_on}}",
        left_on=fc.col("policy_id"),
        right_on=fc.col("claim_policy_ref")
    )
)

As reported by Matic Insurance: "Typedef lets us build and deploy semantic extraction pipelines across thousands of policies and transcripts in days not months."

Product Triage Agent

The RudderStack implementation cut triage time by 95% through warehouse-native context:

python
# 1. Ingest and normalize
warehouse_data = session.read.table("support_tickets")
docs_data = session.read.docs("/product_docs/", content_type="markdown")

# 2. Build semantic context
taxonomy = (docs_data
    .with_column(
        "product_features",
        fc.semantic.extract(fc.col("content"), FeatureSchema)
    )
    .unnest("product_features")
)

# 3. Map tickets to taxonomy
classified_tickets = (warehouse_data
    .semantic.join(
        other=taxonomy,
        predicate="Ticket {{left_on}} relates to feature {{right_on}}",
        left_on=fc.col("ticket_text"),
        right_on=fc.col("product_features.description")
    )
    .with_column(
        "decision",
        fc.semantic.map(
            "Recommend: Prioritize / Monitor / Decline for {{combined_context}}",
            combined_context=fc.col("combined_context")
        )
    )
)

This unifies ticket analysis (inference), feature matching (search), and decision aggregation (analytics).

Scaling from Prototype to Production

Local-First Development

Develop and test with the full engine locally:

python
# Local development
session = fc.Session.get_or_create(fc.SessionConfig(
    app_name="prototype"
))

df = session.read.csv("local_data.csv")
processed = df.with_column(
    "extracted",
    fc.semantic.extract(fc.col("text"), Schema)
)
processed.write.parquet("results.parquet")

Cloud Deployment Without Code Changes

Scale to production with zero code modifications:

python
# Production deployment
config = fc.SessionConfig(
    app_name="production",
    cloud=fc.CloudConfig(
        size=fc.CloudExecutorSize.MEDIUM
    )
)

session = fc.Session.get_or_create(config)

# Same pipeline code
df = session.read.csv("s3://bucket/data/*.csv")
processed = df.with_column(
    "extracted",
    fc.semantic.extract(fc.col("text"), Schema)
)
processed.write.parquet("s3://bucket/results/")

The platform handles:

  • Automatic scaling based on workload
  • Distributed execution across nodes
  • Resource management and optimization
  • Cost tracking and monitoring

Monitoring and Observability

Row-Level Lineage

Track individual record processing history:

python
result = df.select(
    fc.semantic.map(
        "Analyze sentiment: {{text}}",
        text=fc.col("text")
    )
).collect()

# Access comprehensive metrics
print(result.metrics.total_lm_metrics.num_output_tokens)
print(result.metrics.total_lm_metrics.cost)
print(result.metrics.execution_time_ms)

Lineage enables debugging non-deterministic AI pipelines—trace how individual records transform through each operation.

Query Metrics and Cost Tracking

Built-in visibility into resource usage:

python
metrics = session.table("fenic_system.query_metrics")
metrics.select(
    "model",
    "latency_ms",
    "cost_usd"
).order_by("latency_ms").show()

Track token usage, query performance, and costs across models for optimization.

Explain Plans

Visualize how the optimizer executes queries:

python
pipeline.explain()

Understanding the query plan helps identify bottlenecks and optimization opportunities.

Declarative Tool Integration

The Fenic 0.4.0 release introduced declarative tool creation for agent systems:

python
from fenic.core.mcp.types import ToolParam

# Register DataFrame query as tool
session.catalog.create_tool(
    tool_name="search_documents",
    tool_description="Search through documentation",
    tool_query=df,  # The DataFrame query
    tool_params=[
        ToolParam(
            name="search_term",
            description="Term to search for",
            default_value="default"
        )
    ],
    result_limit=50
)

Tools are type-safe, versionable metadata that work across MCP servers, ASGI applications, and CLI interfaces. This unifies tool creation with data processing—the same DataFrame operations power both batch pipelines and agent interactions.

Best Practices

Define Schemas Once

Use Pydantic models as single source of truth:

python
class ExtractedData(BaseModel):
    """Reusable schema across pipeline"""
    entities: list[str]
    relationships: dict[str, str]
    confidence: float = Field(ge=0, le=1)

# Reuse throughout pipeline
df.select(fc.semantic.extract(fc.col("text"), ExtractedData))

Leverage Lazy Evaluation

Build complex pipelines before execution:

python
# Define multi-stage pipeline
pipeline = (df
    .filter(condition1)
    .with_column("extracted", fc.semantic.extract(...))
    .semantic.join(other_df, ...)
    .cache()  # Explicit caching point
    .filter(fc.semantic.predicate(...))
)

# Execute when ready
results = pipeline.collect()

The optimizer sees the entire pipeline and applies sophisticated optimizations.

Use Appropriate Models

Configure model tiers for cost optimization:

python
language_models = {
    "nano": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100000),  # Fast, cheap
    "standard": fc.AnthropicLanguageModel(model_name="claude-3-5-haiku-latest", rpm=50, input_tpm=50000, output_tpm=50000),  # Balanced
    "power": fc.OpenAILanguageModel(model_name="gpt-4o", rpm=50, tpm=50000)  # Accurate
}

# Use appropriate model per task
df.select(
    fc.semantic.classify(fc.col("text"), classes=["class1", "class2"], model_alias="nano")  # Simple classification
)
df.select(
    fc.semantic.extract(fc.col("complex_doc"), Schema, model_alias="power")  # Complex extraction
)

Implement Explicit Caching

Cache expensive operations strategically:

python
df_enriched = (df
    .with_column(
        "extracted",
        fc.semantic.extract(fc.col("text"), schema, model_alias="gpt-4")
    )
    .unnest("extracted")
    .cache()  # Persist expensive extraction
    .filter(fc.semantic.predicate(...))
)

Caching speeds up iterative development and reduces redundant API calls.

The Path Forward

Unifying inference, search, and analytics in a single engine eliminates the architectural complexity that prevents AI systems from reaching production. By treating semantic operations as first-class citizens within the data processing layer, teams build deterministic workflows on non-deterministic models.

The results demonstrate the approach's viability:

  • Insurance companies deploy semantic extraction pipelines in days instead of months
  • Product teams achieve 95% reductions in triage time
  • Media platforms process millions of articles with context-aware classification
  • Enterprise analytics report 100x time savings on semantic queries

As the semantic knowledge graph market grows at 14.2% CAGR, organizations building on inference-first architectures gain sustainable competitive advantages through reliable, operationalized AI systems.

Start building with Fenic, explore the open source announcement, or learn about the latest release features. For enterprise deployments, visit typedef.ai to explore the cloud platform. How to Unify Inference, Sear ... efcf0800298feeb90ff053ec4.md External Displaying How to Unify Inference, Search, and Analytics in a 295df41efcf0800298feeb90ff053ec4.md.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.