2025-06-14 · 12 min read
The Async AI Processing Pipeline: Decoupling Storage from Intelligence
I keep seeing this exact same architectural car crash lately. Honestly, it works flawlessly in a slick demo video (which is probably why everyone builds it this way at first) but then it quietly self-destructs the minute you hit actual production traffic.
Picture this: A user uploads a PDF. Your backend parses the text. You hit the OpenAI API for embeddings. You dump those vectors into Pinecone. Finally, you spit back a 200 OK.
Seems clean, right? Simple. It’s also completely wrong at any meaningful scale.
The Demo That Lies to You
If we actually map out what that “simple” flow looks like under the hood, it’s terrifying:
sequenceDiagram
participant U as User
participant API as API Gateway
participant EMB as Embedding API
participant VDB as Vector DB
U->>API: POST /upload (PDF)
API->>API: Parse Document
API->>EMB: Generate Embeddings
Note over API,EMB: Wait... (2-10 seconds)
EMB-->>API: Vectors
API->>VDB: Upsert Vectors
Note over API,VDB: Wait...
VDB-->>API: OK
API-->>U: 200 OK
Note over U,API: User waited 10+ seconds
Every single arrow in that diagram is a loaded gun pointing straight at your user’s upload request. What if the embedding API decides to rate-limit you at 9:00 AM? The user’s file upload fails. The PDF parser chokes on some weirdly formatted, corrupted document from 2013? Upload fails. The vector database experiences a minor network blip? Yep, upload fails.
We’ve essentially glued three totally separate things: getting a file safely onto a hard drive, extracting intelligent data from it, and updating a search index. And we forced them all into one incredibly brittle HTTP request.
The Mental Model Shift
If there’s one thing to take away, it’s this:
Storage must be immediate. AI extraction can be eventually consistent.
These are two entirely different promises we’re making to our users, with completely different SLAs. Think about it. When someone drops a file into a Dropbox-style app, their only real concern in that split second is safety. They need to know the file is durable and accessible right now. They really don’t care if the full-text semantic search takes 30 seconds to catch up in the background. But they absolutely care if the upload button spins endlessly for ten seconds and then throws a random error code.
Look at how Google Drive or Notion handles this stuff. You upload a file, and boom, it’s confirmed instantly. Generating the thumbnail, running OCR, updating the search index, all of that heavy lifting is happening off-stage. You see this pattern every single day, we just forget to build it that way ourselves.
The Event-Driven Escape Hatch
The way out of this trap is decoupling via an event-driven architecture. Let me sketch out the full picture before we dig into the weeds:
flowchart TD
U([User]) -->|Upload File| AG[API Gateway]
AG -->|1. Store Raw Bytes| S3[(AWS S3 / Object Storage)]
AG -->|2. Write Metadata| PG[(PostgreSQL / Relational DB)]
AG -->|3. Return 200 OK| U
S3 -->|S3 Event Notification| LMB[AWS Lambda]
LMB -->|Produce Message| KB[/Apache Kafka Message Broker/]
KB -->|Distribute Work| W1[Worker 1]
KB -->|Distribute Work| W2[Worker 2]
KB -->|Distribute Work| W3[Worker 3]
W1 & W2 & W3 -->|Upsert Vectors| VDB[(Vector DB)]
W1 & W2 & W3 -->|Update Status| PG
style AG fill:#4f46e5,color:#fff
style KB fill:#f59e0b,color:#000
style VDB fill:#10b981,color:#fff
style S3 fill:#3b82f6,color:#fff
style PG fill:#3b82f6,color:#fff
Let’s break down why this actually works in the real world.
Step 1 — Fast Storage, Happy Users
When someone hits upload, our API gateway does exactly two things:
- Shoves the raw bytes directly into S3.
- Writes a quick metadata row in PostgreSQL (who owns it, where it belongs, timestamp, etc.).
And we’re done. The user gets a 200 OK, the connection closes, and from their perspective, life is good.
# api/routes/upload.py
async def handle_upload(file: UploadFile, user_id: str, db: Session):
file_id = generate_ulid()
# 1. Stream straight to S3 — absolutely no buffering in app memory!
s3_path = await s3_client.upload_fileobj(
fileobj=file.file,
bucket=settings.S3_BUCKET,
key=f"uploads/{user_id}/{file_id}/{file.filename}",
)
# 2. Jot down the metadata
db_file = FileRecord(
id=file_id,
owner_id=user_id,
filename=file.filename,
s3_path=s3_path,
status=FileStatus.PENDING, # <-- the crucial part: it's not indexed yet
size_bytes=file.size,
)
db.add(db_file)
db.commit()
# 3. Bail out immediately. No AI, no parsing.
return {"file_id": file_id, "status": "uploaded"}
Notice that FileStatus.PENDING flag? The client app knows the file is safe, but it’s not searchable yet. You can throw a subtle little “Indexing…” spinner on the UI. It’s honest, it’s accurate, and most importantly, it doesn’t freeze the user’s workflow.
Step 2 — The Kickoff Event
The moment S3 gets the file, it fires off a webhook or Lambda to toss an event onto our message broker. When you’re dealing with serious volume, the payload needs to be lean:
{
"event": "file_uploaded",
"file_id": "01H8XK3M5P6Q7R8S9T0V",
"s3_path": "s3://my-bucket/uploads/user_123/01H8XK3.../report.pdf",
"file_type": "application/pdf",
"owner_id": "user_123",
"timestamp": "2025-06-14T09:00:00Z"
}
We push this into Apache Kafka. This broker right here? This is our shock absorber.
graph LR
subgraph S1 [9:00 AM Monday]
direction TB
F1[File 1] & F2[File 2] & F3[...] & FN[File 10,000]
end
subgraph S2 [Kafka Broker]
direction TB
T[Topic: file.uploaded / 10,000 messages queued]
end
subgraph S3 [Worker Pool]
direction TB
W1[Worker] & W2[Worker] & W3[Worker]
note[Processing at controlled rate]
end
F1 & F2 & F3 & FN -->|instant write| T
T -->|controlled consumption| W1 & W2 & W3
Imagine a whole department decides to dump 10,000 files into a shared workspace right at 9 AM. Kafka just absorbs the hit, buffering all 10,000 events. Your storage layer didn’t break a sweat because writing to S3 and Postgres is incredibly cheap. Meanwhile, the AI backend just chews through that queue at whatever pace it can naturally sustain without a meltdown.
The Worker Layer: Where the Magic Happens
This is the actual “AI” part. We use long-running worker processes (using Python’s confluent_kafka) that subscribe to our Kafka topics and pull messages one by one.
Here’s the lifecycle of a single message:
flowchart LR
A([Kafka Message]) --> B[1. Retrieve from S3]
B --> C[2. Extract Text OCR / Parser]
C --> D[3. Chunk Text]
D --> E[4. Generate Embeddings]
E --> F[5. Upsert to Vector DB]
F --> G[6. Update Status PostgreSQL]
G --> H([Done])
style A fill:#f59e0b,color:#000
style H fill:#10b981,color:#fff
style C fill:#8b5cf6,color:#fff
style E fill:#8b5cf6,color:#fff
Let’s look at how that translates to real, robust code. Notice how we strictly control when Kafka commits the offset.
# workers/document_processor.py
import json
from confluent_kafka import Consumer, KafkaError
from tenacity import retry, stop_after_attempt, wait_exponential
consumer = Consumer({
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"group.id": "document-processor-group",
"auto.offset.reset": "earliest",
# We only commit the offset after successful processing
"enable.auto.commit": False,
})
consumer.subscribe(["file.uploaded"])
def run_worker():
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
event = json.loads(msg.value().decode("utf-8"))
try:
process_document(event)
# Success! Tell Kafka we're officially done with this message.
consumer.commit(message=msg)
except UnrecoverableParsingError:
send_to_dead_letter_queue(event, reason="corrupted_document")
update_file_status(event["file_id"], FileStatus.EXTRACTION_FAILED)
# Commit anyway so we don't infinitely retry garbage files
consumer.commit(message=msg)
except Exception as exc:
# Don't commit. Let the retry logic or DLQ handler manage it.
handle_retryable_failure(event, exc)
def process_document(event: dict):
file_id = event["file_id"]
s3_path = event["s3_path"]
# 1. Grab the file
raw_bytes = s3_client.download_file(s3_path)
# 2. Rip out the text
extracted_text = document_parser.extract(
content=raw_bytes,
mime_type=event["file_type"]
)
if not extracted_text or len(extracted_text.strip()) == 0:
# Not necessarily a failure. Maybe it's just a blank image
update_file_status(file_id, FileStatus.NOT_INDEXABLE)
return
# 3. Slice it up
chunks = text_chunker.chunk(
text=extracted_text,
chunk_size=512,
overlap=50, # gotta have that sliding window
)
# 4. Do the AI math (this handles its own API retries)
embeddings = generate_embeddings(chunks)
# 5. Shove it into the Vector DB
vectors = [
{
"id": f"{file_id}_chunk_{i}",
"values": embedding,
"metadata": {
"file_id": file_id,
"owner_id": event["owner_id"],
"chunk_index": i,
"text": chunk,
},
}
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings))
]
# Note: Using Pinecone's namespace for tenant isolation here.
# If you're on Milvus, use Collections or a partition_key_field instead.
vector_db.upsert(vectors=vectors, namespace=event["owner_id"])
# 6. We're good to go
update_file_status(file_id, FileStatus.INDEXED)
A Quick Detour: The Art of Chunking
I have to mention chunking, because it’s honestly one of the most critical decisions you’ll make for how well your retrieval actually performs. It’s not just an afterthought. Here’s a visual of how a 512-token chunk with a 50-token overlap behaves:
Full Document Text:
┌─────────────────────────────────────────────────────────────────┐
│ The quarterly revenue grew by 23%... [2,400 tokens of text] │
└─────────────────────────────────────────────────────────────────┘
After Chunking (512 tokens, 50-token overlap):
┌────────────────────┐
│ Chunk 1 │ tokens 0 → 512
└────────────────────┘
┌────────────────────┐
│ Chunk 2 │ tokens 462 → 974 (50-token overlap)
└────────────────────┘
┌────────────────────┐
│ Chunk 3 │ tokens 924 → 1436
└────────────────────┘
Why the overlap? Because if you don’t use it, you’ll inevitably slice a crucial sentence right in half, and then when someone asks the LLM a question about that specific concept, it misses the answer entirely. It’s a tiny detail that makes a massive difference.
Handling the Inevitable Failures
This is the exact spot where 99% of AI tutorials end, and where the nightmare of running a real production system begins.
flowchart TD
W[Worker Picks Up Message] --> TRY{Attempt Processing}
TRY -->|Success| DONE([Mark INDEXED / Commit Offset])
TRY -->|Rate Limited 429| RL[Exponential Backoff]
RL -->|Retry < 5 attempts| TRY
RL -->|Retry limit hit| DLQ
TRY -->|Corrupted File| CF[Log Error]
CF --> DLQ
TRY -->|Network Timeout| NT[Retry with Jitter]
NT -->|Retry < 5 attempts| TRY
NT -->|Retry limit hit| DLQ
DLQ[/Dead Letter Queue/] --> MON[Alert Engineering Team]
MON --> FIX[Patch Parser / Config]
FIX --> REPLAY[Replay DLQ Messages]
REPLAY --> TRY
style DONE fill:#10b981,color:#fff
style DLQ fill:#ef4444,color:#fff
style RL fill:#f59e0b,color:#000
Stuff will break. You need to plan for it.
The Annoying Rate Limit (HTTP 429)
OpenAI, Cohere, they all have rate limits. If your workers are chewing through 10,000 files, you’re going to hit the wall. And when you do, you absolutely cannot just drop those jobs on the floor.
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
@retry(
retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(5),
)
def generate_embeddings(chunks: list[str]) -> list[list[float]]:
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=chunks,
)
return [item.embedding for item in response.data]
By using wait_exponential, your retries space out: 2s, 4s, 8s, 16s… It gives the external API a chance to breathe instead of you just frantically hammering a locked door.
The Garbage File
Some files are just cursed. Corrupted PDFs, weirdly password-protected docs, zero-byte anomalies. Don’t retry these. It’s a waste of compute because they will fail every single time.
class UnrecoverableParsingError(Exception):
pass
def extract_text_from_pdf(content: bytes) -> str:
try:
return pdf_parser.extract(content)
except (PDFSyntaxError, PDFPasswordIncorrect) as e:
# Hard fail. Don't even try again.
raise UnrecoverableParsingError(f"Document is garbage: {e}") from e
# Inside the worker...
try:
text = extract_text_from_pdf(raw_bytes)
except UnrecoverableParsingError:
send_to_dead_letter_queue(event, reason="corrupted_document")
update_file_status(file_id, FileStatus.EXTRACTION_FAILED)
return # We're done here.
The Safety Net: Dead Letter Queues
The DLQ is basically your system’s purgatory. If a message runs out of retries, it goes here instead of vanishing into the void.
# Pseudocode: a real implementation would use consumer.poll() in a loop to aggregate metrics
def get_dlq_summary() -> dict:
messages = get_all_dlq_messages(topic="file.uploaded.dlq")
failure_reasons = Counter(m["metadata"]["failure_reason"] for m in messages)
return {
"total_failed": len(messages),
"by_reason": dict(failure_reasons),
"oldest_message": messages[0]["timestamp"] if messages else None,
}
When you finally figure out why a weird edge-case PDF was crashing your parser and deploy a fix, you just replay the DLQ. Every file gets a second chance. The best part? The user never saw an error message. To them, the file was always safe in storage; the search function just magically started working a bit later.
Zooming Out: The Big Picture
flowchart TD
subgraph Client ["Client Layer"]
U([User Browser / Mobile App])
end
subgraph Ingestion ["Ingestion Layer (Fast Path)"]
AG[API Gateway]
S3[(S3 Object Storage)]
PG[(PostgreSQL Metadata)]
end
subgraph Messaging ["Messaging Layer (Buffer)"]
K[/Kafka file.uploaded topic/]
DLQ[/Dead Letter Queue/]
end
subgraph Processing ["Processing Layer (Slow Path)"]
W1[Worker Pod]
W2[Worker Pod]
W3[Worker Pod]
EMB[Embedding API]
end
subgraph Storage ["Storage Layer (Query Path)"]
VDB[(Vector DB)]
end
subgraph Observability ["Observability"]
MON[Grafana Dashboard]
end
U -->|Upload| AG
AG -->|Store Bytes| S3
AG -->|Write Metadata| PG
AG -->|200 OK| U
S3 -->|S3 Event Notification| LMB[AWS Lambda]
LMB -->|Produce Message| K
K -->|Consume| W1 & W2 & W3
W1 & W2 & W3 <-->|Embeddings| EMB
W1 & W2 & W3 -->|Upsert Vectors| VDB
W1 & W2 & W3 -->|Update Status| PG
W1 & W2 & W3 -->|On Failure| DLQ
DLQ --> MON
style AG fill:#4f46e5,color:#fff
style K fill:#f59e0b,color:#000
style DLQ fill:#ef4444,color:#fff
style VDB fill:#10b981,color:#fff
style EMB fill:#8b5cf6,color:#fff
style MON fill:#64748b,color:#fff
style LMB fill:#f97316,color:#fff
The Brutal Reality of Tradeoffs
Look, I won’t pretend this is a free lunch. Whenever you introduce event-driven architectures, you’re signing a blood pact with complexity.
| Consideration | Synchronous Pipeline | Async Pipeline |
|---|---|---|
| Upload Latency | Agonizingly slow | Basically instant |
| Reliability | Super fragile | Highly resilient |
| Search Availability | Right away (if it doesn’t crash) | Whenever the workers catch up |
| Operational Headache | Very low | Pretty high |
| Scalability | Non-existent | Practically infinite |
| Failure Recovery | A manual nightmare | DLQ + replay |
You’re taking on the burden of managing Kafka, monitoring a fleet of workers, and dealing with eventual consistency. For a weekend hackathon? Stick to the synchronous script. But if you’re building a product that actual human beings are going to rely on day in and day out, this is genuinely the only responsible way to architect it.
What Comes Next
This architecture handles the core pipeline, but production systems layer on a few more pieces. You’ll eventually need a re-indexing strategy for when you upgrade embedding models, and you’ll want priority queues for files where the user is actively waiting at the UI. If you process a lot of identical boilerplate contracts, adding an embedding cache will cut your API costs significantly. And once you move beyond PDFs, you’ll need multi-modal pipelines to handle image and audio extraction.
But the pipeline described here is the foundation. Every one of those extensions plugs neatly into the worker layer and the message broker, which is exactly why decoupling was worth the complexity in the first place.