RAG Without a Framework: A Minimal pgvector Pipeline with Claude
Every RAG tutorial reaches for LangChain or LlamaIndex in the first five minutes. Those frameworks are useful. They are also opaque when something goes wrong, and something always goes wrong in production.
This post builds a complete RAG pipeline from primitives: chunking, embedding, pgvector storage and retrieval, and grounded generation with Claude. No abstraction layers. When you understand what the framework is doing for you, you can decide when it is worth the dependency and when it is not.
The code is adapted from JobPulse RAG, where this pipeline embeds resumes and retrieves relevant sections to ground cover letter generation.
The four stages
RAG has four stages, each with decisions that affect the quality of the final output:
- Chunking — splitting documents into retrievable units
- Embedding — turning chunks into vectors
- Retrieval — finding the right chunks at query time
- Generation — producing the answer, grounded in retrieved chunks
The framework abstracts all four. Understanding each one is how you debug bad retrieval quality.
Chunking
The naive approach is fixed-size chunks with overlap. It works badly on structured documents.
from dataclasses import dataclass
from typing import Iterator
import re
@dataclass
class Chunk:
text: str
doc_id: str
chunk_index: int
start_char: int
end_char: int
metadata: dict
def chunk_document(
text: str,
doc_id: str,
chunk_size: int = 512,
overlap: int = 64,
) -> list[Chunk]:
"""
Split a document into overlapping chunks for embedding.
Uses sentence boundaries rather than hard character splits to avoid
cutting mid-sentence, which degrades embedding quality.
"""
sentences = _split_sentences(text)
chunks = []
buffer: list[str] = []
buffer_len = 0
chunk_index = 0
start_char = 0
for sentence in sentences:
sentence_len = len(sentence)
if buffer_len + sentence_len > chunk_size and buffer:
chunk_text = " ".join(buffer)
chunks.append(Chunk(
text=chunk_text,
doc_id=doc_id,
chunk_index=chunk_index,
start_char=start_char,
end_char=start_char + len(chunk_text),
metadata={},
))
chunk_index += 1
# Keep the tail of the previous chunk for overlap
overlap_text = chunk_text[-overlap:] if len(chunk_text) > overlap else chunk_text
buffer = [overlap_text, sentence]
buffer_len = len(overlap_text) + sentence_len
start_char += len(chunk_text) - len(overlap_text)
else:
buffer.append(sentence)
buffer_len += sentence_len
if buffer:
chunk_text = " ".join(buffer)
chunks.append(Chunk(
text=chunk_text,
doc_id=doc_id,
chunk_index=chunk_index,
start_char=start_char,
end_char=start_char + len(chunk_text),
metadata={},
))
return chunks
def _split_sentences(text: str) -> list[str]:
# Simple sentence splitter — replace with spaCy for better accuracy on
# technical text with abbreviations like "Ph.D." or "e.g."
return [s.strip() for s in re.split(r'(?<=[.!?])\s+', text) if s.strip()]The overlap is not about preserving context for the reader — it is about ensuring that a piece of information straddling a chunk boundary is retrievable from either chunk. Without overlap, a fact split across two chunks might score low on both.
For JobPulse I use 512-token chunks with 64-token overlap. The right numbers depend on your document type. Resumes have dense, short sections. Legal contracts have long paragraphs. Adjust chunk size so that each chunk is semantically coherent — a complete thought, not a sentence fragment.
Embedding
OpenAI's text-embedding-3-large is the model I use for English technical content. It produces 3072-dimensional vectors by default, but you can request 1536 dimensions for a 2× storage saving with a small accuracy penalty.
import asyncio
from openai import AsyncOpenAI
import numpy as np
client = AsyncOpenAI()
async def embed_chunks(
chunks: list[Chunk],
model: str = "text-embedding-3-large",
dimensions: int = 1536,
batch_size: int = 100,
) -> list[tuple[Chunk, list[float]]]:
"""
Embed a list of chunks, returning each chunk paired with its vector.
Batches requests to stay within the API's per-request token limit.
"""
results: list[tuple[Chunk, list[float]]] = []
for batch_start in range(0, len(chunks), batch_size):
batch = chunks[batch_start : batch_start + batch_size]
texts = [c.text for c in batch]
response = await client.embeddings.create(
model=model,
input=texts,
dimensions=dimensions,
encoding_format="float",
)
for chunk, embedding_data in zip(batch, response.data):
results.append((chunk, embedding_data.embedding))
return results
async def embed_query(
query: str,
model: str = "text-embedding-3-large",
dimensions: int = 1536,
) -> list[float]:
"""Embed a single query string for retrieval."""
response = await client.embeddings.create(
model=model,
input=[query],
dimensions=dimensions,
encoding_format="float",
)
return response.data[0].embeddingOne gotcha: the model you use for embedding queries must match the model you used for embedding documents. Mixing models produces vectors in different spaces and retrieval quality collapses. This sounds obvious but it bites people when they switch models and forget to re-embed their existing documents.
pgvector storage
pgvector is a PostgreSQL extension that adds a vector column type and cosine similarity operators. The schema is straightforward:
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
doc_id TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding vector(1536),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);The HNSW index (hnsw) is what makes similarity search fast. Without it, every query does an exact scan of all rows. For a few thousand chunks this is fine. At 100,000+ chunks it becomes unusable. Build the index before you need it.
Storing chunks with asyncpg:
import asyncpg
async def store_chunks(
conn: asyncpg.Connection,
chunk_embeddings: list[tuple[Chunk, list[float]]],
) -> None:
"""Upsert chunks and their embeddings into the database."""
records = [
(
chunk.doc_id,
chunk.chunk_index,
chunk.text,
chunk.metadata,
embedding,
)
for chunk, embedding in chunk_embeddings
]
await conn.executemany(
"""
INSERT INTO document_chunks (doc_id, chunk_index, text, metadata, embedding)
VALUES ($1, $2, $3, $4, $5::vector)
ON CONFLICT (doc_id, chunk_index)
DO UPDATE SET
text = EXCLUDED.text,
metadata = EXCLUDED.metadata,
embedding = EXCLUDED.embedding
""",
records,
)The ON CONFLICT DO UPDATE makes this idempotent — re-indexing a document replaces its existing chunks rather than duplicating them. Add a unique constraint on (doc_id, chunk_index) to make the conflict clause work.
Retrieval
async def retrieve(
conn: asyncpg.Connection,
query_embedding: list[float],
top_k: int = 10,
doc_id_filter: str | None = None,
) -> list[dict]:
"""
Retrieve the top-k most similar chunks by cosine similarity.
Returns chunks ordered by relevance, most relevant first.
"""
if doc_id_filter:
rows = await conn.fetch(
"""
SELECT doc_id, chunk_index, text, metadata,
1 - (embedding <=> $1::vector) AS similarity
FROM document_chunks
WHERE doc_id = $2
ORDER BY embedding <=> $1::vector
LIMIT $3
""",
query_embedding,
doc_id_filter,
top_k,
)
else:
rows = await conn.fetch(
"""
SELECT doc_id, chunk_index, text, metadata,
1 - (embedding <=> $1::vector) AS similarity
FROM document_chunks
ORDER BY embedding <=> $1::vector
LIMIT $2
""",
query_embedding,
top_k,
)
return [dict(row) for row in rows]The <=> operator is pgvector's cosine distance operator. 1 - (embedding <=> query) converts distance to similarity: 1.0 means identical, 0.0 means orthogonal. Filter on similarity score if you want to exclude low-relevance results — WHERE 1 - (embedding <=> $1::vector) >= 0.7 — but tune the threshold against your data before hardcoding it.
Generation with Claude
The final stage: pass the retrieved chunks to Claude as context, with the original query.
import anthropic
claude = anthropic.AsyncAnthropic()
async def generate_grounded_answer(
query: str,
chunks: list[dict],
model: str = "claude-sonnet-4-20250514",
max_tokens: int = 1024,
) -> str:
"""
Generate an answer grounded in the retrieved chunks.
The system prompt instructs Claude to use only the provided context
and to cite sources when it uses them.
"""
context_blocks = "\n\n".join(
f"[Source: {c['doc_id']}, chunk {c['chunk_index']}, "
f"similarity {c['similarity']:.2f}]\n{c['text']}"
for c in chunks
)
response = await claude.messages.create(
model=model,
max_tokens=max_tokens,
system=(
"You are a precise assistant. Answer the user's question using only "
"the context provided below. If the context does not contain enough "
"information to answer fully, say so explicitly. Cite the source "
"identifier when you use a specific piece of information."
),
messages=[
{
"role": "user",
"content": f"Context:\n{context_blocks}\n\nQuestion: {query}",
}
],
)
return response.content[0].textThe system prompt instruction to use only the provided context is important. Without it, Claude will supplement retrieved content with its parametric knowledge, producing answers that look grounded but are not. That might be fine for some applications, but for a pipeline where auditability matters — legal documents, compliance, medical — you want explicit retrieval grounding.
Putting it together
import asyncpg
async def rag_query(
query: str,
database_url: str,
doc_id_filter: str | None = None,
top_k: int = 10,
) -> str:
conn = await asyncpg.connect(database_url)
try:
query_embedding = await embed_query(query)
chunks = await retrieve(conn, query_embedding, top_k=top_k, doc_id_filter=doc_id_filter)
if not chunks:
return "No relevant content found for this query."
return await generate_grounded_answer(query, chunks)
finally:
await conn.close()In production, replace the single connection with a asyncpg.create_pool connection pool. Opening and closing a connection per query is fine for a script; it is too slow and too expensive for a service.
Where frameworks earn their keep
I said this pipeline does not need LangChain. That is true for a pipeline this size. At the point where you are managing multiple retrievers, hybrid search (BM25 + vector), re-ranking, query rewriting, and conversation history across multiple turns, the orchestration code grows fast. That is where a framework's abstractions start paying off.
The useful test: can you explain every step of your RAG pipeline to a colleague from the code alone? If yes, you do not need the framework. If the answer is "I think it's using cosine similarity but I'm not sure how the re-ranker is configured", you have a debugging problem waiting to happen.
Build it from primitives first. Add the framework when the primitives become too much to manage.
Al Amin Ahamed
Senior software engineer & AI practitioner. 5+ years shipping Laravel platforms, WordPress plugins, WooCommerce extensions, and AI-driven products.
About me →More from the blog