LangChain provides a framework for building applications with language models, but processing transcripts at scale requires robust data infrastructure. Fenic, the open-source DataFrame framework from Typedef.ai, fills this gap by bringing structured batch processing to unstructured transcript data.
This guide shows how to use Fenic as a preprocessing layer for LangChain applications, leverage the Model Context Protocol for agent integration, and process transcripts efficiently with semantic operations.
Why Augment LangChain with Fenic
LangChain excels at orchestrating language model interactions, but transcript processing introduces specific challenges:
Scale and Efficiency
- Batch processing hundreds or thousands of transcripts
- Managing rate limits across multiple model providers
- Concurrent inference without manual async coordination
Data Structure
- Parsing multiple transcript formats (SRT, WebVTT, generic)
- Maintaining temporal relationships in conversation data
- Joining transcript segments with metadata
Production Reliability
- Deterministic transformations for testing and debugging
- Query lineage for audit trails
- Automatic retry logic and error handling
Fenic addresses these requirements with a DataFrame API that handles inference as a first-class operation.
Architecture Patterns
Pattern 1: Preprocessing Layer
Use Fenic to prepare transcript data before LangChain processing:
Transcripts → Fenic (parse, structure, enrich) → LangChain (orchestration, decisions)
Fenic handles batch operations while LangChain manages interactive flows.
Pattern 2: MCP Integration
Expose Fenic operations as Model Context Protocol tools that LangChain agents can call:
LangChain Agent → MCP Server (Fenic tools) → Structured Data
Agents access transcript analysis without leaving their execution context.
Pattern 3: Hybrid Processing
Combine both approaches for advanced workflows:
Fenic (batch enrichment) → Data Store → LangChain (interactive) ↔ MCP (Fenic tools)
Setting Up the Environment
Install Fenic with required dependencies:
bashpip install fenic
Configure a session with your model providers:
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig from fenic.core.types.inference.openai import OpenAILanguageModel config = SessionConfig( app_name="transcript_processor", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=100 ) } ) ) session = Session.get_or_create(config)
Parsing Transcript Formats
Fenic's parse_transcript function converts multiple formats into a unified schema.
Supported Formats
SRT (SubRip Subtitle)
1
00:00:01,000 --> 00:00:05,000
First line of dialogue
2
00:00:05,500 --> 00:00:10,000
Second line of dialogue
WebVTT (Web Video Text Tracks)
WEBVTT
Speaker 1
00:00:01.000 --> 00:00:05.000
First line of dialogue
Speaker 2
00:00:05.500 --> 00:00:10.000
Second line of dialogue
Generic Conversation
[00:00:01] Speaker 1: First line of dialogue
[00:00:05] Speaker 2: Second line of dialogue
Parsing Implementation
pythonimport fenic.api.functions as fc from fenic.api.functions import text # Load transcript files df = session.read.docs( "transcripts/**/*.txt", content_type="markdown", recursive=True ) # Parse SRT format parsed_df = df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) # Explode segments into rows segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("segment") ).select( fc.col("file_path"), fc.col("segment.index").alias("index"), fc.col("segment.speaker").alias("speaker"), fc.col("segment.start_time").alias("start_time"), fc.col("segment.end_time").alias("end_time"), fc.col("segment.content").alias("text") )
The unified schema provides:
index: Entry number (1-based)speaker: Speaker identifier (when available)start_time: Timestamp in secondsend_time: End timestamp in secondsduration: Segment durationcontent: Transcript textformat: Original format type
Semantic Operations on Transcripts
Fenic provides semantic functions that apply language models declaratively within DataFrame operations.
Classification
Categorize transcript segments by topic or intent:
pythonfrom fenic.api.functions import semantic # Classify support call segments classified_df = segments_df.with_column( "category", semantic.classify( fc.col("text"), ["Technical Issue", "Billing Question", "Feature Request", "General Inquiry"] ) )
Sentiment Analysis
Analyze emotional tone across conversations:
python# Add sentiment scores sentiment_df = segments_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ) # Aggregate by speaker speaker_sentiment = sentiment_df.group_by("speaker").agg( fc.count("*").alias("segment_count"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive_count"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative_count") )
Extraction
Pull structured information from unstructured dialogue:
pythonfrom pydantic import BaseModel, Field from typing import List class ActionItem(BaseModel): task: str = Field(description="The action to be taken") assignee: str = Field(description="Person responsible") deadline: str = Field(description="Due date if mentioned") # Extract action items from meeting segments actions_df = segments_df.select( fc.col("file_path"), fc.col("text"), semantic.extract( fc.col("text"), ActionItem ).alias("action_item") ).filter( fc.col("action_item").is_not_null() )
Summarization
Generate concise summaries of conversation sections:
pythonfrom fenic.core.types.summarize import KeyPoints, Paragraph # Summarize each file's transcript summary_df = segments_df.group_by("file_path").agg( semantic.reduce( "Summarize this conversation highlighting key decisions and outcomes", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") )
Semantic Filtering
Filter transcripts using natural language predicates:
python# Find segments discussing specific topics relevant_df = segments_df.filter( semantic.predicate( "This text discusses product features or roadmap planning: {{ text }}", text=fc.col("text") ) )
Creating MCP Tools from DataFrames
The Model Context Protocol allows LangChain agents to call Fenic operations as tools.
Declarative Tool Creation
Register DataFrame queries as reusable tools:
pythonfrom fenic.core.mcp.types import ToolParam # Save processed transcripts as a table segments_df.write.save_as_table("transcripts", mode="overwrite") # Create a search tool from fenic.core.types import StringType, IntegerType session.catalog.create_tool( tool_name="search_transcripts", tool_description="Search transcript segments by keyword or phrase", tool_query=session.table("transcripts").filter( fc.col("text").like(fc.lit("%").concat( fc.tool_param("search_term", StringType) ).concat(fc.lit("%"))) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam( name="search_term", description="Keyword or phrase to search for", default_value="" ), ToolParam( name="limit", description="Maximum number of results", default_value=10 ) ], result_limit=50 ) # Create an analytics tool from fenic.core.types import StringType session.catalog.create_tool( tool_name="analyze_speaker", tool_description="Get statistics and sentiment for a specific speaker", tool_query=session.table("transcripts").filter( fc.col("speaker") == fc.tool_param("speaker_name", StringType) ).agg( fc.count("*").alias("total_segments"), fc.avg(fc.col("end_time") - fc.col("start_time")).alias("avg_segment_duration"), semantic.reduce( "Summarize this speaker's main points and communication style", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") ), tool_params=[ ToolParam( name="speaker_name", description="Name or identifier of the speaker", default_value="" ) ] )
Running an MCP Server
Launch the server to expose tools to LangChain:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_sync # Get all registered tools tools = session.catalog.list_tools() # Create server server = create_mcp_server( session, "TranscriptServer", user_defined_tools=tools, concurrency_limit=8 ) # Run with HTTP transport run_mcp_server_sync( server, transport="http", stateless_http=True, port=8000, host="127.0.0.1", path="/mcp" )
LangChain Integration via MCP
Connect LangChain agents to the MCP server:
pythonfrom langchain.agents import AgentExecutor, create_openai_functions_agent from langchain_openai import ChatOpenAI from langchain.tools import Tool import requests # Create wrapper for Fenic MCP tool def call_fenic_tool(tool_name: str, **params): response = requests.post( "http://127.0.0.1:8000/mcp", json={ "tool": tool_name, "parameters": params } ) return response.json() # Define LangChain tools langchain_tools = [ Tool( name="search_transcripts", func=lambda q, limit=10: call_fenic_tool( "search_transcripts", search_term=q, limit=limit ), description="Search transcript segments by keyword or phrase" ), Tool( name="analyze_speaker", func=lambda speaker: call_fenic_tool( "analyze_speaker", speaker_name=speaker ), description="Get statistics and analysis for a specific speaker" ) ] # Create agent llm = ChatOpenAI(model="gpt-4") agent = create_openai_functions_agent(llm, langchain_tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=langchain_tools) # Agent can now call Fenic tools result = agent_executor.invoke({ "input": "What did Sarah discuss in the meeting?" })
Advanced Transcript Processing Workflows
Multi-File Analysis
Process directories of transcripts:
pythonfrom fenic.core.types.summarize import KeyPoints, Paragraph # Load all transcripts all_transcripts = session.read.docs( ["meetings/**/*.srt", "calls/**/*.vtt"], content_type="markdown", recursive=True ) # Parse based on file extension parsed = all_transcripts.with_column( "format", fc.when(fc.col("file_path").like("%.srt"), fc.lit("srt")) .when(fc.col("file_path").like("%.vtt"), fc.lit("webvtt")) .otherwise(fc.lit("generic")) ).with_column( "segments", text.parse_transcript(fc.col("content"), fc.col("format")) ) # Enrich with metadata enriched = parsed.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("seg") ).select( fc.regexp_extract( fc.col("file_path"), r"(\d{4}-\d{2}-\d{2})", 1 ).alias("date"), fc.col("seg.speaker").alias("speaker"), fc.col("seg.content").alias("text"), semantic.classify( fc.col("seg.content"), ["Question", "Answer", "Statement", "Action Item"] ).alias("utterance_type") )
Temporal Analysis
Track sentiment changes over time:
python# Add time windows windowed_df = segments_df.with_column( "time_bucket", (fc.col("start_time") / 300).cast("integer") * 300 # 5-minute buckets ).with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ) # Aggregate by time window temporal_sentiment = windowed_df.group_by("file_path", "time_bucket").agg( fc.count("*").alias("segment_count"), fc.sum( fc.when(fc.col("sentiment") == "positive", 1).otherwise(0) ).alias("positive"), fc.sum( fc.when(fc.col("sentiment") == "negative", 1).otherwise(0) ).alias("negative") )
Speaker Diarization Enhancement
Join speaker metadata with transcript segments:
python# Load speaker information speakers_df = session.read.csv("speaker_metadata.csv") # Fuzzy join on speaker names from fenic.api.functions import text as text_fn joined_df = segments_df.join( speakers_df, text_fn.compute_fuzzy_ratio( fc.col("speaker"), speakers_df["speaker_name"], "jaro_winkler" ) > 85, "left" ).select( fc.col("file_path"), fc.coalesce(speakers_df["full_name"], fc.col("speaker")).alias("speaker"), speakers_df["department"].alias("department"), fc.col("text"), fc.col("start_time") )
Cross-Transcript Semantic Join
Link related discussion points across different transcripts:
python# Extract key topics from each transcript topics_df1 = segments_df.filter( fc.col("file_path").like("%meeting_1%") ).select( fc.col("text").alias("meeting_1_text") ) topics_df2 = segments_df.filter( fc.col("file_path").like("%meeting_2%") ).select( fc.col("text").alias("meeting_2_text") ) # Semantic join to find related discussions related = topics_df1.semantic.join( other=topics_df2, predicate=""" Meeting 1: {{ left_on }} Meeting 2: {{ right_on }} These segments discuss the same topic or decision. """, left_on=fc.col("meeting_1_text"), right_on=fc.col("meeting_2_text") )
Async UDFs for External APIs
Integrate third-party services with concurrent execution:
pythonimport fenic as fc from fenic.core.types import StructType, StructField, StringType, IntegerType import aiohttp @fc.async_udf( return_type=StructType([ StructField("entity_type", StringType), StructField("confidence", IntegerType) ]), max_concurrency=10, timeout_seconds=5, num_retries=2 ) async def call_entity_api(text: str) -> dict: async with aiohttp.ClientSession() as session: async with session.post( "https://api.example.com/entities", json={"text": text} ) as resp: data = await resp.json() return { "entity_type": data.get("type"), "confidence": data.get("confidence") } # Apply to transcript segments enriched_df = segments_df.select( fc.col("text"), call_entity_api(fc.col("text")).alias("entity_info") )
Performance Optimization
Batch Size Tuning
Fenic automatically batches inference calls. Control concurrency through model configuration:
pythonconfig = SessionConfig( app_name="transcript_processor", semantic=SemanticConfig( language_models={ "fast_model": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=500, # Higher throughput tpm=50000 ) } ) )
Selective Processing
Filter before applying expensive operations:
python# Only analyze segments longer than 10 words filtered_df = segments_df.filter( fc.length(fc.split(fc.col("text"), " ")) > 10 ).with_column( "summary", semantic.summarize(fc.col("text")) )
Caching Results
Save intermediate results to avoid recomputation:
python# Save enriched data enriched_df.write.save_as_table("enriched_transcripts", mode="overwrite") # Load for subsequent processing cached_df = session.table("enriched_transcripts")
Metrics and Monitoring
Track inference costs and latency:
python# Access built-in metrics metrics = session.table("fenic_system.query_metrics") metrics.select( fc.col("model"), fc.col("latency_ms"), fc.col("cost_usd"), fc.col("input_tokens"), fc.col("output_tokens") ).order_by(fc.col("cost_usd").desc()).show() # Aggregate by operation metrics.group_by("operation").agg( fc.count("*").alias("call_count"), fc.sum("cost_usd").alias("total_cost"), fc.avg("latency_ms").alias("avg_latency") ).show()
Production Deployment Patterns
ASGI Server Integration
Deploy MCP servers in production environments:
pythonfrom fenic.api.mcp import create_mcp_server, run_mcp_server_asgi server = create_mcp_server( session, "TranscriptServer", user_defined_tools=session.catalog.list_tools() ) app = run_mcp_server_asgi( server, stateless_http=True, port=8000, host="0.0.0.0", path="/mcp" ) # Launch with uvicorn # uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
Error Handling
Handle failures gracefully in production pipelines:
python# Add error columns safe_df = segments_df.with_column( "classification", fc.when( fc.col("text").is_not_null() & (fc.length(fc.col("text")) > 0), semantic.classify(fc.col("text"), ["Type A", "Type B"]) ).otherwise(fc.lit("unclassified")) ) # Log failures failed_df = segments_df.filter( fc.col("text").is_null() ).select( fc.col("file_path"), fc.lit("Null text content").alias("error") ) failed_df.write.csv("processing_errors.csv")
Example: Complete Transcript Analysis Pipeline
pythonfrom fenic.api.session import Session from fenic.api.session.config import SessionConfig, SemanticConfig from fenic.core.types.inference.openai import OpenAILanguageModel import fenic.api.functions as fc from fenic.api.functions import text, semantic from pydantic import BaseModel, Field from typing import List # 1. Setup config = SessionConfig( app_name="transcript_analysis", semantic=SemanticConfig( language_models={ "gpt4": OpenAILanguageModel( model_name="gpt-4o-mini", rpm=100, tpm=10000 ) } ) ) session = Session.get_or_create(config) # 2. Load and parse transcripts raw_df = session.read.docs( "transcripts/**/*.srt", content_type="markdown", recursive=True ) parsed_df = raw_df.select( fc.col("file_path"), text.parse_transcript(fc.col("content"), "srt").alias("segments") ) segments_df = parsed_df.select( fc.col("file_path"), fc.explode(fc.col("segments")).alias("seg") ).select( fc.col("file_path"), fc.col("seg.speaker").alias("speaker"), fc.col("seg.start_time").alias("start_time"), fc.col("seg.content").alias("text") ) # 3. Enrich with semantic analysis enriched_df = segments_df.with_column( "sentiment", semantic.analyze_sentiment(fc.col("text")) ).with_column( "category", semantic.classify( fc.col("text"), ["Technical", "Business", "Administrative", "Other"] ) ) # 4. Generate summaries per file summaries_df = enriched_df.group_by("file_path").agg( fc.count("*").alias("segment_count"), semantic.reduce( "Create a concise summary of this conversation", fc.col("text"), order_by=fc.col("start_time") ).alias("summary") ) # 5. Save results enriched_df.write.save_as_table("processed_transcripts", mode="overwrite") summaries_df.write.csv("transcript_summaries.csv") # 6. Create MCP tools for LangChain from fenic.core.mcp.types import ToolParam from fenic.core.types import StringType, IntegerType session.catalog.create_tool( tool_name="query_transcripts", tool_description="Search and filter processed transcripts", tool_query=session.table("processed_transcripts").filter( fc.col("category") == fc.tool_param("category", StringType) ).limit(fc.tool_param("limit", IntegerType)), tool_params=[ ToolParam(name="category", description="Filter by category", default_value="Technical"), ToolParam(name="limit", description="Max results", default_value=20) ] ) # 7. Launch MCP server from fenic.api.mcp import create_mcp_server, run_mcp_server_sync server = create_mcp_server( session, "TranscriptAnalysis", user_defined_tools=session.catalog.list_tools() ) run_mcp_server_sync(server, transport="http", port=8000)
Resources
Conclusion
Fenic brings production-grade data processing to transcript analysis. By handling parsing, semantic enrichment, and batch inference within a DataFrame API, it provides the structured preprocessing layer that LangChain applications need.
The Model Context Protocol integration enables seamless agent access to transcript tools, while semantic operations eliminate custom prompt engineering for common tasks. For teams building LangChain applications at scale, Fenic offers a path to reliable, efficient transcript processing.

