Scalable AI LLM RAG Pipeline: Async Document Processing with LangChain, NestJS, BullMQ, and Qdrant - Andy Primawan
Learn how to build a robust, scalable pipeline for vectorizing PDFs, DOCX, and TXT files asynchronously. We'll use S3, NestJS, BullMQ, LangChain, OpenAI, and Qdrant to create a resilient ingestion system for your next AI LLM RAG application.

Introduction
In the world of AI and Large Language Models (LLMs), Retrieval-Augmented Generation (RAG) is a game-changer. It allows us to ground LLMs in our own private data, providing contextually aware and accurate responses. But before we can retrieve anything, we have to solve a fundamental problem: how do we efficiently and reliably process user-uploaded documents (like PDFs, DOCX files, etc.), turn them into vectors, and store them in a vector database?
Doing this synchronously during an HTTP request is a recipe for disaster. It's slow, error-prone, and doesn't scale. Today, I'm going to walk you through a production-ready, asynchronous architecture I've been working on, using a powerful stack: NestJS, BullMQ, Amazon S3, OpenAI, and Qdrant.
The Problem Statement
Imagine a user uploads a 50MB, 200-page PDF to your web application. A simple REST API endpoint that tries to handle this synchronously will face several issues:
- Request Timeouts: Processing a large document, splitting it, and making hundreds of API calls to an embedding service can take minutes, far exceeding typical HTTP request timeouts.
- Server Bandwidth Consumption: Forcing large files to be uploaded through your backend server consumes significant bandwidth and processing power, starving other requests.
- API Rate Limits: If 10 users upload documents at once, you might make thousands of calls to the OpenAI API in a short burst, triggering rate limit errors.
- Lack of Resilience: If the OpenAI API has a hiccup or your vector database is momentarily unavailable during the process, the entire operation fails. Retrying a failed multi-minute process is a poor user experience.
The Solution: An Asynchronous, Queue-Based Architecture
To solve these problems, we decouple the process. We accept the file upload acknowledgment and immediately hand off the heavy lifting to a background worker. This makes our system scalable, resilient, and efficient.
Here is a step-by-step look at the entire flow.
System Architecture Diagram
You can use a tool like Mermaid.js to render this diagram from the following syntax:
sequenceDiagram
participant User
participant FrontendApp
participant BackendAPI (NestJS)
participant S3
participant BullMQ (Redis)
participant Worker (NestJS/BullMQ)
participant OpenAI_API
participant Qdrant
User->>FrontendApp: Selects a file to upload
FrontendApp->>BackendAPI: POST /api/uploads/presigned-url ( {fileName, fileType} )
BackendAPI->>S3: Generate Presigned PUT URL
S3-->>BackendAPI: Returns Presigned URL
BackendAPI-->>FrontendApp: Sends Presigned URL to client
Note over FrontendApp, S3: User's browser uploads file directly to S3
FrontendApp->>S3: PUT [presignedUrl] (with file data)
S3-->>FrontendApp: 200 OK (Upload Complete)
FrontendApp->>BackendAPI: POST /api/uploads/acknowledge ( {s3ObjectKey} )
BackendAPI->>BullMQ: Enqueue job: "process-document"
BackendAPI-->>FrontendApp: 202 Accepted (File queued for processing)
Note over Worker, BullMQ: Worker process polls Redis for jobs
Worker->>BullMQ: Dequeue job: "process-document"
Worker->>S3: Download file from s3ObjectKey to /tmp
S3-->>Worker: Returns file stream
Note over Worker: Use LangChain to load & split
Worker->>Worker: Load document from /tmp using DocumentLoader
Worker->>Worker: Split document into chunks using TextSplitter
loop For each chunk
Worker->>OpenAI_API: Get embedding for chunk text
OpenAI_API-->>Worker: Return vector
Worker->>Qdrant: Upsert vector with metadata (s3Key, chunkNum)
Qdrant-->>Worker: Point upserted
end
Worker->>BullMQ: Mark job as complete
The PNG version of the diagram:

Step 1: The Upload (S3 Presigned URLs)
The user never uploads the file to our Node.js server. Instead, the frontend asks our backend for a special, one-time-use S3 Presigned URL. This URL grants temporary permission to upload a specific file directly to our S3 bucket.
This is brilliant because the multi-megabyte file transfer happens between the user's browser and Amazon's highly scalable infrastructure, completely bypassing our server's bandwidth.
Step 2: Acknowledgment and Queuing (BullMQ)
Once the direct upload to S3 is complete, the frontend makes a second, lightweight API call to our backend. This call simply says, "Hey, the file is ready at this s3ObjectKey
."
Our NestJS controller's only job is to take that s3ObjectKey
and create a job in a message queue. We're using BullMQ, a fantastic Redis-based queue system for Node.js. It's robust and gives us fine-grained control over our jobs.
// In your NestJS service
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
// ...
constructor(@InjectQueue('knowledge-files/vectorize') private vectorizeQueue: Queue) {}
public async queueDocumentProcessing(s3ObjectKey: string, originalName: string) {
// Add job to vectorization queue
await this.vectorizeQueue.add(
'vectorize-file',
{
knowledgeFileId: updatedKnowledgeFile.id,
storageKey: updatedKnowledgeFile.fileUrl, // fileUrl contains the storage key
},
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000,
},
},
);
}
Step 3: The Worker Awakens (BullMQ Consumer)
This is where the magic happens. We run a separate NestJS process (or the same one, configured for hybrid mode) that acts as a BullMQ Consumer. This worker constantly listens for new jobs in the document-processing
queue.
When it picks up a process-document
job, it gets the s3ObjectKey
and begins the real work.
// In your worker/processor file
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
interface VectorizeJobData {
knowledgeFileId: string;
storageKey: string;
}
@Processor('knowledge-files/vectorize')
export class VectorizeKnowledgeFileConsumer extends WorkerHost {
async process(job: Job<VectorizeJobData>): Promise<void> {
const { knowledgeFileId, storageKey } = job.data;
this.logger.log(`Processing document: ${knowledgeFileId}`);
// 1. Download file from S3 to a local /tmp folder
const localPath = await this.downloadFile(storageKey);
// 2. Load document with LangChain
const docs = await this.loadDocument(localPath);
// 3. Split document into chunks
const chunks = await this.splitIntoChunks(docs);
// 4. Vectorize and store each chunk
await this.vectorizeAndStore(chunks, { source: storageKey });
// 5. Cleanup the local file
await this.cleanup(localPath);
}
}
Step 4: Loading & Chunking (LangChain)
Inside our worker, we first download the file from S3 to a temporary directory like /tmp
.
Then, we leverage LangChain.js to abstract away the complexity of file parsing. LangChain's document loaders (PDFLoader
, DocxLoader
, TextLoader
) can read a file and extract its text content.
import { Document } from '@langchain/core/documents';
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf';
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx';
import { TextLoader } from 'langchain/document_loaders/fs/text';
// ... inside the worker
// Load PDF/DOC/DOCX/TXT file
let loader: TextLoader | PDFLoader | DocxLoader;
let documents: Document[] = [];
switch (extension) {
case '.txt':
loader = new TextLoader(filePath);
documents = await loader.load();
break;
case '.pdf':
loader = new PDFLoader(filePath);
documents = await loader.load();
break;
case '.docx':
loader = new DocxLoader(filePath);
documents = await loader.load();
break;
case '.doc':
loader = new DocxLoader(filePath);
documents = await loader.load();
break;
default:
throw new Error(`Unsupported file type: ${extension}`);
}
// Add metadata to each document
documents = documents.map((doc, index) => {
return new Document({
pageContent: doc.pageContent,
metadata: {
...doc.metadata,
knowledgeFileId: knowledgeFile.id,
knowledgeId: knowledgeFile.knowledgeId,
fileName: knowledgeFile.fileName,
fileType: knowledgeFile.fileType,
source: `${knowledgeFile.fileName}#page-${index}`,
documentIndex: index,
},
});
});
Then using LangChain to split the document to smaller manageable chunks:
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter';
// ... inside the worker
const allChunks: Document[] = [];
for (const doc of documents) {
const chunks = await splitter.splitDocuments([doc]);
// Add chunk-specific metadata
const chunksWithMetadata = chunks.map((chunk, index) => {
return new Document({
pageContent: chunk.pageContent,
metadata: {
...chunk.metadata,
chunkIndex: index,
source: `${chunk.metadata.fileName}#doc-${chunk.metadata.documentIndex}-chunk-${index}`,
},
});
});
allChunks.push(
...chunksWithMetadata.filter(
(chunk) => chunk.pageContent.trim().length > 0,
),
);
}
Step 5: Vectorization & Storage (OpenAI & Qdrant)
Now we iterate over the chunks
. For each chunk, we make a call to the OpenAI Embeddings API (e.g., text-embedding-3-small
) to get its vector representation.
This vector, along with important metadata (like the source document's S3 key and the chunk's position), is then "upserted" into our Qdrant vector database. Qdrant is a high-performance, open-source vector database that's perfect for this kind of application.
import { QdrantVectorStore } from '@langchain/qdrant';
import { OpenAIEmbeddings } from '@langchain/openai';
// ... inside the worker
// Initialize OpenAI embeddings
const embeddings = new OpenAIEmbeddings({
openAIApiKey: this.configsService.openaiApiKey,
modelName: this.configsService.vectorModel,
});
// Create vector store instance using fromDocuments
await QdrantVectorStore.fromDocuments(documents, embeddings, {
url: this.configsService.qdrantDatabaseUrl,
collectionName,
collectionConfig: {
vectors: {
size: 1536, // OpenAI text-embedding-3-small dimension
distance: 'Cosine',
},
},
});
Here screenshot of Qdrant Dashboard run on a local Docker container, contains vector data from OpenAI embedding output:

Strengths of This Architecture
This design isn't just a workaround; it's a robust pattern with clear advantages:
- Durability and Decoupling: The original document is safe in S3, which is designed for 99.999999999% (11 nines) of durability. Our processing workflow is completely decoupled from the file store.
- Resilience and Retries: BullMQ has built-in support for automatic retries with exponential backoff. If the OpenAI API is down for a minute, BullMQ will simply try the job again later without any manual intervention.
- Scalability & Concurrency Control: This is a huge win. You can run multiple worker processes across different machines to process jobs in parallel. Even better, you can configure the BullMQ worker to only process a certain number of jobs concurrently (
concurrency
option). This allows you to respect the OpenAI API rate limits and avoid overwhelming your vector database, all with a single line of configuration. - Responsiveness: The user's interface is never blocked. They get an immediate "your file is being processed" response, leading to a much better user experience.
Conclusion
Building a production-grade AI LLM RAG application requires more than just a clever prompt. The data ingestion pipeline is the foundation upon which everything else is built. By adopting an asynchronous, queue-based architecture with NestJS, BullMQ, and S3, you create a system that is not only functional but also scalable, resilient, and efficient. It handles failures gracefully, respects external service limits, and provides a snappy experience for your users.
This pattern provides a solid, enterprise-ready blueprint for any application that needs to perform heavy background processing initiated by a user action.
Code Sample
Find an example implementation and code on GitHub:
https://github.com/dojotek/dojotek-ai-chatbot-backend
Code sample to generate S3 presigned URL:
- function create on
knowledge-files.controller.ts
here - function create on
knowledge-files.service.ts
here s3-storage.adapter.ts
here
Code sample of BullMQ consumer (background worker) vectorize-knowledge-file.consumer.ts