<< goback()

How to Use Fenic as a Data Layer in LangChain for Unstructured Text Handling

Typedef Team

How to Use Fenic as a Data Layer in LangChain for Unstructured Text Handling

Introduction

Building production-grade LLM applications requires robust data infrastructure. While frameworks like LangChain handle orchestration and agent logic, managing unstructured text data at scale presents unique challenges. Fenic addresses this gap with a PySpark-inspired DataFrame API designed specifically for AI workloads.

This guide demonstrates how to leverage Fenic as a data layer alongside LangChain, handling text preprocessing, embedding generation, and semantic operations at scale.

Why Fenic for Text Processing

Traditional data processing frameworks weren't built for LLM workflows. Fenic provides:

  • Native text operations: Chunking, tokenization, and extraction built into the DataFrame API
  • Semantic capabilities: Generate embeddings and perform LLM-based transformations directly on DataFrames
  • Scalable processing: Handle large document collections with efficient batch operations
  • Unified interface: Consistent API from data ingestion to semantic operations

Setting Up Fenic

Install Fenic and configure your session with the necessary model credentials:

python
from fenic.api.session import Session, SessionConfig
from fenic.api.session.config import (
    SemanticConfig,
    OpenAILanguageModel,
    OpenAIEmbeddingModel
)

# Configure session with LLM and embedding models
config = SessionConfig(
    app_name="langchain_data_layer",
    semantic=SemanticConfig(
        language_models={
            "gpt4": OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=100,
                tpm=100000
            )
        },
        default_language_model="gpt4",
        embedding_models={
            "embeddings": OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=100,
                tpm=100000
            )
        },
        default_embedding_model="embeddings"
    )
)

session = Session.get_or_create(config)

Loading Unstructured Documents

Fenic provides specialized readers for various document formats:

Loading Markdown Documents

python
# Load all markdown files from a directory
docs_df = session.read.docs(
    "data/documents/**/*.md",
    content_type="markdown",
    recursive=True
)

# Filter out any documents that failed to load
docs_df = docs_df.filter(col("error").is_null())

Loading CSV Data

python
# Load CSV files with automatic schema inference
csv_df = session.read.csv("data/*.csv")

# Or with explicit schema for better control
from fenic.core.types.schema import Schema, ColumnField
from fenic.core.types.datatypes import StringType, IntegerType

csv_df = session.read.csv(
    "data/products.csv",
    schema=Schema([
        ColumnField(name="product_id", data_type=IntegerType),
        ColumnField(name="description", data_type=StringType)
    ])
)

Loading PDF Metadata

python
# Extract metadata from PDFs for filtering and organization
pdf_metadata = session.read.pdf_metadata(
    "data/pdfs/**/*.pdf",
    recursive=True
)

# Filter by page count or other metadata
relevant_pdfs = pdf_metadata.filter(col("page_count") < 100)

Text Chunking Strategies

Proper chunking is critical for RAG applications. Fenic provides multiple chunking strategies:

Recursive Token Chunking

python
from fenic.api.functions import text, col

# Chunk documents preserving structure at natural boundaries
chunked_df = docs_df.select(
    col("file_path"),
    text.recursive_token_chunk(
        col("content"),
        chunk_size=500,
        chunk_overlap_percentage=10
    ).alias("chunks")
)

# Explode chunks into individual rows
chunked_df = chunked_df.select(
    col("file_path"),
    col("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

The recursive approach attempts to split at paragraph breaks, sentence boundaries, and other natural divisions, maintaining context better than simple character splits.

Character-Based Chunking

python
# For simpler, fixed-size chunks
chunked_df = docs_df.select(
    col("file_path"),
    text.character_chunk(
        col("content"),
        chunk_size=1000,
        chunk_overlap_percentage=15
    ).alias("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Word-Based Chunking

python
# Chunk by word count with custom delimiters
chunked_df = docs_df.select(
    col("file_path"),
    text.recursive_word_chunk(
        col("content"),
        chunk_size=200,
        chunk_overlap_percentage=10,
        chunking_character_set_custom_characters=['\n\n', '\n', '.', ' ']
    ).alias("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

Generating Embeddings

Once text is chunked, generate embeddings for vector search:

python
from fenic.api.functions import semantic

# Generate embeddings for all chunks
embedded_df = chunked_df.select(
    col("file_path"),
    col("chunk_text"),
    semantic.embed(col("chunk_text")).alias("embedding")
)

# Token counting for monitoring
embedded_df = embedded_df.with_column(
    text.count_tokens(col("chunk_text")).alias("token_count")
)

Semantic Similarity Search

Fenic enables semantic search operations directly in the DataFrame API:

python
from fenic.api.functions import embedding
import numpy as np

# Generate query embedding (this would typically come from your application)
query_text = "machine learning best practices"
query_df = session.create_dataframe([{"query": query_text}])
query_embedding = query_df.select(
    semantic.embed(col("query")).alias("query_emb")
).collect()[0]["query_emb"]

# Compute similarity scores
results_df = embedded_df.select(
    col("file_path"),
    col("chunk_text"),
    embedding.compute_similarity(
        col("embedding"),
        query_embedding,
        metric="cosine"
    ).alias("similarity_score")
)

# Get top matches
top_results = results_df.order_by(
    col("similarity_score").desc()
).limit(10)

Text Extraction and Structured Data

Extract structured information from unstructured text:

python
from pydantic import BaseModel, Field
from typing import List

# Define extraction schema
class Entity(BaseModel):
    name: str = Field(description="Entity name")
    type: str = Field(description="Entity type: person, organization, or location")

class ExtractedInfo(BaseModel):
    entities: List[Entity] = Field(description="Named entities found in text")
    summary: str = Field(description="Brief summary of the content")
    topics: List[str] = Field(description="Main topics discussed")

# Extract structured data
extracted_df = docs_df.select(
    col("file_path"),
    semantic.extract(
        col("content"),
        response_format=ExtractedInfo
    ).alias("extracted_data")
)

# Access nested fields
extracted_df = extracted_df.select(
    col("file_path"),
    col("extracted_data.summary").alias("summary"),
    col("extracted_data.entities").alias("entities"),
    col("extracted_data.topics").alias("topics")
)

Text Classification

Classify documents into categories:

python
from fenic.api.functions.semantic import ClassDefinition

# Define classification categories
categories = [
    ClassDefinition(
        label="Technical Documentation",
        description="API docs, architecture guides, technical specifications"
    ),
    ClassDefinition(
        label="Business Content",
        description="Marketing materials, business plans, proposals"
    ),
    ClassDefinition(
        label="Support Material",
        description="FAQs, troubleshooting guides, user support content"
    )
]

# Classify documents
classified_df = docs_df.select(
    col("file_path"),
    col("content"),
    semantic.classify(
        col("content"),
        classes=categories
    ).alias("category")
)

Building a RAG Data Pipeline

Combine these operations into a complete pipeline:

python
from fenic.api.functions import text, col, semantic, dt
from fenic.api.functions.builtin import md5

# 1. Load documents
raw_docs = session.read.docs("data/docs/**/*.md", content_type="markdown", recursive=True)

# 2. Clean and prepare text
cleaned_docs = raw_docs.select(
    col("file_path"),
    text.trim(col("content")).alias("content")
).filter(col("content") != "")

# 3. Chunk documents
chunked = cleaned_docs.select(
    col("file_path"),
    text.recursive_token_chunk(
        col("content"),
        chunk_size=400,
        chunk_overlap_percentage=10
    ).alias("chunks")
).explode("chunks").select(
    col("file_path"),
    col("chunks").alias("chunk_text")
)

# 4. Generate embeddings
embedded = chunked.select(
    col("file_path"),
    col("chunk_text"),
    semantic.embed(col("chunk_text")).alias("embedding"),
    text.count_tokens(col("chunk_text")).alias("tokens")
)

# 5. Add metadata
final_df = embedded.with_columns(
    md5(col("chunk_text")).alias("chunk_id"),
    dt.current_timestamp().alias("processed_at")
)

# 6. Export for use in LangChain or other frameworks
final_df.write.parquet("output/processed_chunks.parquet")

Integration with LangChain

Export Fenic-processed data for LangChain consumption:

python
# Export to formats LangChain can consume
final_df.write.csv("output/chunks_with_embeddings.csv")

# Or save as a table in Fenic's catalog for querying
final_df.write.save_as_table("processed_documents", mode="overwrite")

# Later, retrieve processed data
processed_data = session.table("processed_documents")

# Convert to pandas for LangChain integration
pandas_df = processed_data.to_pandas()

# Use in LangChain
from langchain.schema import Document

documents = [
    Document(
        page_content=row["chunk_text"],
        metadata={
            "file_path": row["file_path"],
            "chunk_id": row["chunk_id"],
            "tokens": row["tokens"]
        }
    )
    for row in pandas_df.to_dict('records')
]

Advanced Text Operations

Template-Based Extraction

python
# Extract structured data from formatted text
log_df = session.create_dataframe([
    {"log": "2024-01-15 ERROR Connection failed to server"},
    {"log": "2024-01-15 INFO Request processed successfully"}
])

parsed_logs = log_df.select(
    text.extract(
        col("log"),
        template="${date} ${level} ${message}"
    ).alias("parsed")
)

# Access parsed fields
parsed_logs = parsed_logs.select(
    col("parsed.date").alias("date"),
    col("parsed.level").alias("level"),
    col("parsed.message").alias("message")
)

Jinja Template Rendering

python
from fenic.api.functions.builtin import lit
# Generate prompts dynamically
prompt_df = chunked_df.select(
    text.jinja(
        """Answer the following question based on the context:

Context: {{ context }}

Question: {{ question }}

Provide a detailed answer based solely on the context provided.""",
        context=col("chunk_text"),
        question=fc.lit("What are the main concepts discussed?")
    ).alias("prompt")
)

Semantic Join Operations

Join datasets based on semantic similarity:

python
# Product descriptions from two sources
products_a = session.create_dataframe([
    {"id": 1, "desc": "wireless bluetooth headphones"},
    {"id": 2, "desc": "laptop computer with touchscreen"}
])

products_b = session.create_dataframe([
    {"id": 101, "name": "Premium Audio Headset"},
    {"id": 102, "name": "Portable Touchscreen Notebook"}
])

# Semantic join to match similar products
from fenic.api.functions import col

matched = products_a.semantic.join(
    other=products_b,
    predicate="Does this product description match this product name? Description: {{left_on}} Name: {{right_on}}",
    left_on=col("desc"),
    right_on=col("name")
)

Performance Optimization

Batch Processing

python
# Process large datasets in batches
large_docs = session.read.docs("data/large_corpus/**/*.md", recursive=True)

# Process in chunks to manage memory
batch_size = 1000
total_rows = large_docs.count()

for offset in range(0, total_rows, batch_size):
    batch = large_docs.limit(batch_size).offset(offset)

    # Process batch
    processed_batch = batch.select(
        col("file_path"),
        semantic.embed(col("content")).alias("embedding")
    )

    # Save batch results
    processed_batch.write.parquet(
        f"output/batch_{offset}.parquet",
        mode="overwrite"
    )

Monitoring Token Usage

python
from fenic.api.functions.builtin import sum, count, avg
# Track token consumption
metrics_df = embedded_df.select(
    fc.sum(col("token_count")).alias("total_tokens"),
    fc.count("*").alias("total_chunks"),
    fc.avg(col("token_count")).alias("avg_tokens_per_chunk")
)

# View metrics
metrics_df.show()

# Access session metrics for API calls
session.stop()  # Prints comprehensive session metrics

Exporting Data for Vector Databases

Prepare data for vector database ingestion:

python
from fenic.api.functions.builtin import struct

# Format for Pinecone, Weaviate, or other vector stores
export_df = embedded_df.select(
    col("chunk_id"),
    col("chunk_text"),
    col("embedding"),
    fc.struct(
        col("file_path"),
        col("tokens")
    ).alias("metadata")
)

# Export to JSON for vector DB ingestion
export_df.write.parquet("output/vector_db_ready.parquet")

# Or convert to dictionary format
vector_records = export_df.collect()
for record in vector_records:
    # Insert into vector database
    vector_db.upsert(
        id=record["chunk_id"],
        values=record["embedding"],
        metadata={
            "text": record["chunk_text"],
            "file_path": record["metadata"]["file_path"],
            "tokens": record["metadata"]["tokens"]
        }
    )

Working with SQL

For advanced queries, use Fenic's SQL interface:

python
# Complex filtering and aggregation with SQL
results = session.sql("""
    SELECT
        file_path,
        COUNT(*) as chunk_count,
        AVG(tokens) as avg_tokens,
        MAX(similarity_score) as max_similarity
    FROM {chunks}
    WHERE tokens BETWEEN 100 AND 500
    GROUP BY file_path
    HAVING chunk_count > 5
    ORDER BY max_similarity DESC
""", chunks=results_df)

Best Practices

Chunk Size Selection

Choose chunk sizes based on your model's context window:

python
# For models with 8K context window
chunk_size = 400  # tokens
overlap = 10  # percent

# For models with 128K context window
chunk_size = 2000  # tokens
overlap = 5  # percent

Error Handling

python
from fenic.api.functions.builtin import when, lit

# Handle documents that fail to process
processed_df = docs_df.with_column(
    fc.when(col("error").is_null(), True)
      .otherwise(False)
      .alias("is_valid")
)

# Separate successful and failed processing
success_df = processed_df.filter(col("is_valid"))
failed_df = processed_df.filter(~col("is_valid"))

# Log failures
failed_df.select(col("file_path"), col("error")).show()

Incremental Processing

python
# Save processing state
processed_df.write.save_as_table("processed_documents", mode="append")

# Load existing processed files
existing = session.table("processed_documents")
existing_paths = existing.select(col("file_path")).distinct().collect()
processed_paths = {row["file_path"] for row in existing_paths}

# Filter new documents
new_docs = docs_df.filter(
    ~col("file_path").isin(list(processed_paths))
)

Conclusion

Fenic provides a production-ready data layer for LLM applications, handling the challenges of unstructured text processing at scale. By leveraging Fenic's DataFrame API alongside LangChain's orchestration capabilities, you can build robust RAG pipelines with:

  • Efficient text chunking and tokenization
  • Native embedding generation and semantic search
  • Structured data extraction from unstructured text
  • Scalable batch processing
  • Comprehensive monitoring and metrics

The combination of Fenic's data processing capabilities with LangChain's agent framework enables you to build sophisticated AI applications without compromising on performance or maintainability.

For more information, visit the Fenic documentation and explore the Typedef platform.

Share this page
the next generation of

data processingdata processingdata processing

Join us in igniting a new paradigm in data infrastructure. Enter your email to get early access and redefine how you build and scale data workflows with typedef.