← Notes

2025-06-14 · 12 min read

The Async AI Processing Pipeline: Decoupling Storage from Intelligence

I keep seeing this exact same architectural car crash lately. Honestly, it works flawlessly in a slick demo video (which is probably why everyone builds it this way at first) but then it quietly self-destructs the minute you hit actual production traffic.

Picture this: A user uploads a PDF. Your backend parses the text. You hit the OpenAI API for embeddings. You dump those vectors into Pinecone. Finally, you spit back a 200 OK.

Seems clean, right? Simple. It’s also completely wrong at any meaningful scale.

The Demo That Lies to You

If we actually map out what that “simple” flow looks like under the hood, it’s terrifying:

sequenceDiagram
    participant U as User
    participant API as API Gateway
    participant EMB as Embedding API
    participant VDB as Vector DB

    U->>API: POST /upload (PDF)
    API->>API: Parse Document
    API->>EMB: Generate Embeddings
    Note over API,EMB: Wait... (2-10 seconds)
    EMB-->>API: Vectors
    API->>VDB: Upsert Vectors
    Note over API,VDB: Wait...
    VDB-->>API: OK
    API-->>U: 200 OK
    Note over U,API: User waited 10+ seconds

Every single arrow in that diagram is a loaded gun pointing straight at your user’s upload request. What if the embedding API decides to rate-limit you at 9:00 AM? The user’s file upload fails. The PDF parser chokes on some weirdly formatted, corrupted document from 2013? Upload fails. The vector database experiences a minor network blip? Yep, upload fails.

We’ve essentially glued three totally separate things: getting a file safely onto a hard drive, extracting intelligent data from it, and updating a search index. And we forced them all into one incredibly brittle HTTP request.

The Mental Model Shift

If there’s one thing to take away, it’s this:

Storage must be immediate. AI extraction can be eventually consistent.

These are two entirely different promises we’re making to our users, with completely different SLAs. Think about it. When someone drops a file into a Dropbox-style app, their only real concern in that split second is safety. They need to know the file is durable and accessible right now. They really don’t care if the full-text semantic search takes 30 seconds to catch up in the background. But they absolutely care if the upload button spins endlessly for ten seconds and then throws a random error code.

Look at how Google Drive or Notion handles this stuff. You upload a file, and boom, it’s confirmed instantly. Generating the thumbnail, running OCR, updating the search index, all of that heavy lifting is happening off-stage. You see this pattern every single day, we just forget to build it that way ourselves.

The Event-Driven Escape Hatch

The way out of this trap is decoupling via an event-driven architecture. Let me sketch out the full picture before we dig into the weeds:

flowchart TD
    U([User]) -->|Upload File| AG[API Gateway]
    
    AG -->|1. Store Raw Bytes| S3[(AWS S3 / Object Storage)]
    AG -->|2. Write Metadata| PG[(PostgreSQL / Relational DB)]
    AG -->|3. Return 200 OK| U
    
    S3 -->|S3 Event Notification| LMB[AWS Lambda]
    
    LMB -->|Produce Message| KB[/Apache Kafka Message Broker/]
    
    KB -->|Distribute Work| W1[Worker 1]
    KB -->|Distribute Work| W2[Worker 2]
    KB -->|Distribute Work| W3[Worker 3]
    
    W1 & W2 & W3 -->|Upsert Vectors| VDB[(Vector DB)]
    W1 & W2 & W3 -->|Update Status| PG
    
    style AG fill:#4f46e5,color:#fff
    style KB fill:#f59e0b,color:#000
    style VDB fill:#10b981,color:#fff
    style S3 fill:#3b82f6,color:#fff
    style PG fill:#3b82f6,color:#fff

Let’s break down why this actually works in the real world.

Step 1 — Fast Storage, Happy Users

When someone hits upload, our API gateway does exactly two things:

  1. Shoves the raw bytes directly into S3.
  2. Writes a quick metadata row in PostgreSQL (who owns it, where it belongs, timestamp, etc.).

And we’re done. The user gets a 200 OK, the connection closes, and from their perspective, life is good.

# api/routes/upload.py

async def handle_upload(file: UploadFile, user_id: str, db: Session):
    file_id = generate_ulid()

    # 1. Stream straight to S3 — absolutely no buffering in app memory!
    s3_path = await s3_client.upload_fileobj(
        fileobj=file.file,
        bucket=settings.S3_BUCKET,
        key=f"uploads/{user_id}/{file_id}/{file.filename}",
    )

    # 2. Jot down the metadata
    db_file = FileRecord(
        id=file_id,
        owner_id=user_id,
        filename=file.filename,
        s3_path=s3_path,
        status=FileStatus.PENDING,  # <-- the crucial part: it's not indexed yet
        size_bytes=file.size,
    )
    db.add(db_file)
    db.commit()

    # 3. Bail out immediately. No AI, no parsing.
    return {"file_id": file_id, "status": "uploaded"}

Notice that FileStatus.PENDING flag? The client app knows the file is safe, but it’s not searchable yet. You can throw a subtle little “Indexing…” spinner on the UI. It’s honest, it’s accurate, and most importantly, it doesn’t freeze the user’s workflow.

Step 2 — The Kickoff Event

The moment S3 gets the file, it fires off a webhook or Lambda to toss an event onto our message broker. When you’re dealing with serious volume, the payload needs to be lean:

{
  "event": "file_uploaded",
  "file_id": "01H8XK3M5P6Q7R8S9T0V",
  "s3_path": "s3://my-bucket/uploads/user_123/01H8XK3.../report.pdf",
  "file_type": "application/pdf",
  "owner_id": "user_123",
  "timestamp": "2025-06-14T09:00:00Z"
}

We push this into Apache Kafka. This broker right here? This is our shock absorber.

graph LR
    subgraph S1 [9:00 AM Monday]
        direction TB
        F1[File 1] & F2[File 2] & F3[...] & FN[File 10,000]
    end
    
    subgraph S2 [Kafka Broker]
        direction TB
        T[Topic: file.uploaded / 10,000 messages queued]
    end
    
    subgraph S3 [Worker Pool]
        direction TB
        W1[Worker] & W2[Worker] & W3[Worker]
        note[Processing at controlled rate]
    end
    
    F1 & F2 & F3 & FN -->|instant write| T
    T -->|controlled consumption| W1 & W2 & W3

Imagine a whole department decides to dump 10,000 files into a shared workspace right at 9 AM. Kafka just absorbs the hit, buffering all 10,000 events. Your storage layer didn’t break a sweat because writing to S3 and Postgres is incredibly cheap. Meanwhile, the AI backend just chews through that queue at whatever pace it can naturally sustain without a meltdown.

The Worker Layer: Where the Magic Happens

This is the actual “AI” part. We use long-running worker processes (using Python’s confluent_kafka) that subscribe to our Kafka topics and pull messages one by one.

Here’s the lifecycle of a single message:

flowchart LR
    A([Kafka Message]) --> B[1. Retrieve from S3]
    B --> C[2. Extract Text OCR / Parser]
    C --> D[3. Chunk Text]
    D --> E[4. Generate Embeddings]
    E --> F[5. Upsert to Vector DB]
    F --> G[6. Update Status PostgreSQL]
    G --> H([Done])

    style A fill:#f59e0b,color:#000
    style H fill:#10b981,color:#fff
    style C fill:#8b5cf6,color:#fff
    style E fill:#8b5cf6,color:#fff

Let’s look at how that translates to real, robust code. Notice how we strictly control when Kafka commits the offset.

# workers/document_processor.py

import json
from confluent_kafka import Consumer, KafkaError
from tenacity import retry, stop_after_attempt, wait_exponential

consumer = Consumer({
    "bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
    "group.id": "document-processor-group",
    "auto.offset.reset": "earliest",
    # We only commit the offset after successful processing
    "enable.auto.commit": False, 
})

consumer.subscribe(["file.uploaded"])

def run_worker():
    while True:
        msg = consumer.poll(timeout=1.0)
        
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            raise Exception(msg.error())
        
        event = json.loads(msg.value().decode("utf-8"))
        
        try:
            process_document(event)
            # Success! Tell Kafka we're officially done with this message.
            consumer.commit(message=msg)
            
        except UnrecoverableParsingError:
            send_to_dead_letter_queue(event, reason="corrupted_document")
            update_file_status(event["file_id"], FileStatus.EXTRACTION_FAILED)
            # Commit anyway so we don't infinitely retry garbage files
            consumer.commit(message=msg)
            
        except Exception as exc:
            # Don't commit. Let the retry logic or DLQ handler manage it.
            handle_retryable_failure(event, exc)

def process_document(event: dict):
    file_id = event["file_id"]
    s3_path = event["s3_path"]

    # 1. Grab the file
    raw_bytes = s3_client.download_file(s3_path)

    # 2. Rip out the text
    extracted_text = document_parser.extract(
        content=raw_bytes,
        mime_type=event["file_type"]
    )

    if not extracted_text or len(extracted_text.strip()) == 0:
        # Not necessarily a failure. Maybe it's just a blank image
        update_file_status(file_id, FileStatus.NOT_INDEXABLE)
        return

    # 3. Slice it up
    chunks = text_chunker.chunk(
        text=extracted_text,
        chunk_size=512,       
        overlap=50,           # gotta have that sliding window
    )

    # 4. Do the AI math (this handles its own API retries)
    embeddings = generate_embeddings(chunks)

    # 5. Shove it into the Vector DB
    vectors = [
        {
            "id": f"{file_id}_chunk_{i}",
            "values": embedding,
            "metadata": {
                "file_id": file_id,
                "owner_id": event["owner_id"],
                "chunk_index": i,
                "text": chunk,
            },
        }
        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings))
    ]
    
    # Note: Using Pinecone's namespace for tenant isolation here.
    # If you're on Milvus, use Collections or a partition_key_field instead.
    vector_db.upsert(vectors=vectors, namespace=event["owner_id"])

    # 6. We're good to go
    update_file_status(file_id, FileStatus.INDEXED)

A Quick Detour: The Art of Chunking

I have to mention chunking, because it’s honestly one of the most critical decisions you’ll make for how well your retrieval actually performs. It’s not just an afterthought. Here’s a visual of how a 512-token chunk with a 50-token overlap behaves:

Full Document Text:
┌─────────────────────────────────────────────────────────────────┐
│ The quarterly revenue grew by 23%... [2,400 tokens of text]     │
└─────────────────────────────────────────────────────────────────┘

After Chunking (512 tokens, 50-token overlap):
┌────────────────────┐
│ Chunk 1            │  tokens 0   → 512
└────────────────────┘
              ┌────────────────────┐
              │ Chunk 2            │  tokens 462 → 974   (50-token overlap)
              └────────────────────┘
                            ┌────────────────────┐
                            │ Chunk 3            │  tokens 924 → 1436
                            └────────────────────┘

Why the overlap? Because if you don’t use it, you’ll inevitably slice a crucial sentence right in half, and then when someone asks the LLM a question about that specific concept, it misses the answer entirely. It’s a tiny detail that makes a massive difference.

Handling the Inevitable Failures

This is the exact spot where 99% of AI tutorials end, and where the nightmare of running a real production system begins.

flowchart TD
    W[Worker Picks Up Message] --> TRY{Attempt Processing}
    
    TRY -->|Success| DONE([Mark INDEXED / Commit Offset])
    
    TRY -->|Rate Limited 429| RL[Exponential Backoff]
    RL -->|Retry < 5 attempts| TRY
    RL -->|Retry limit hit| DLQ
    
    TRY -->|Corrupted File| CF[Log Error]
    CF --> DLQ
    
    TRY -->|Network Timeout| NT[Retry with Jitter]
    NT -->|Retry < 5 attempts| TRY
    NT -->|Retry limit hit| DLQ
    
    DLQ[/Dead Letter Queue/] --> MON[Alert Engineering Team]
    MON --> FIX[Patch Parser / Config]
    FIX --> REPLAY[Replay DLQ Messages]
    REPLAY --> TRY

    style DONE fill:#10b981,color:#fff
    style DLQ fill:#ef4444,color:#fff
    style RL fill:#f59e0b,color:#000

Stuff will break. You need to plan for it.

The Annoying Rate Limit (HTTP 429)

OpenAI, Cohere, they all have rate limits. If your workers are chewing through 10,000 files, you’re going to hit the wall. And when you do, you absolutely cannot just drop those jobs on the floor.

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

@retry(
    retry=retry_if_exception_type(RateLimitError),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    stop=stop_after_attempt(5),
)
def generate_embeddings(chunks: list[str]) -> list[list[float]]:
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=chunks,
    )
    return [item.embedding for item in response.data]

By using wait_exponential, your retries space out: 2s, 4s, 8s, 16s… It gives the external API a chance to breathe instead of you just frantically hammering a locked door.

The Garbage File

Some files are just cursed. Corrupted PDFs, weirdly password-protected docs, zero-byte anomalies. Don’t retry these. It’s a waste of compute because they will fail every single time.

class UnrecoverableParsingError(Exception):
    pass

def extract_text_from_pdf(content: bytes) -> str:
    try:
        return pdf_parser.extract(content)
    except (PDFSyntaxError, PDFPasswordIncorrect) as e:
        # Hard fail. Don't even try again.
        raise UnrecoverableParsingError(f"Document is garbage: {e}") from e

# Inside the worker...
try:
    text = extract_text_from_pdf(raw_bytes)
except UnrecoverableParsingError:
    send_to_dead_letter_queue(event, reason="corrupted_document")
    update_file_status(file_id, FileStatus.EXTRACTION_FAILED)
    return  # We're done here.

The Safety Net: Dead Letter Queues

The DLQ is basically your system’s purgatory. If a message runs out of retries, it goes here instead of vanishing into the void.

# Pseudocode: a real implementation would use consumer.poll() in a loop to aggregate metrics
def get_dlq_summary() -> dict:
    messages = get_all_dlq_messages(topic="file.uploaded.dlq")
    
    failure_reasons = Counter(m["metadata"]["failure_reason"] for m in messages)
    
    return {
        "total_failed": len(messages),
        "by_reason": dict(failure_reasons),
        "oldest_message": messages[0]["timestamp"] if messages else None,
    }

When you finally figure out why a weird edge-case PDF was crashing your parser and deploy a fix, you just replay the DLQ. Every file gets a second chance. The best part? The user never saw an error message. To them, the file was always safe in storage; the search function just magically started working a bit later.

Zooming Out: The Big Picture

flowchart TD
    subgraph Client ["Client Layer"]
        U([User Browser / Mobile App])
    end

    subgraph Ingestion ["Ingestion Layer (Fast Path)"]
        AG[API Gateway]
        S3[(S3 Object Storage)]
        PG[(PostgreSQL Metadata)]
    end

    subgraph Messaging ["Messaging Layer (Buffer)"]
        K[/Kafka file.uploaded topic/]
        DLQ[/Dead Letter Queue/]
    end

    subgraph Processing ["Processing Layer (Slow Path)"]
        W1[Worker Pod]
        W2[Worker Pod]
        W3[Worker Pod]
        EMB[Embedding API]
    end

    subgraph Storage ["Storage Layer (Query Path)"]
        VDB[(Vector DB)]
    end

    subgraph Observability ["Observability"]
        MON[Grafana Dashboard]
    end

    U -->|Upload| AG
    AG -->|Store Bytes| S3
    AG -->|Write Metadata| PG
    AG -->|200 OK| U

    S3 -->|S3 Event Notification| LMB[AWS Lambda]
    LMB -->|Produce Message| K
    
    K -->|Consume| W1 & W2 & W3
    W1 & W2 & W3 <-->|Embeddings| EMB
    W1 & W2 & W3 -->|Upsert Vectors| VDB
    W1 & W2 & W3 -->|Update Status| PG
    W1 & W2 & W3 -->|On Failure| DLQ
    DLQ --> MON

    style AG fill:#4f46e5,color:#fff
    style K fill:#f59e0b,color:#000
    style DLQ fill:#ef4444,color:#fff
    style VDB fill:#10b981,color:#fff
    style EMB fill:#8b5cf6,color:#fff
    style MON fill:#64748b,color:#fff
    style LMB fill:#f97316,color:#fff

The Brutal Reality of Tradeoffs

Look, I won’t pretend this is a free lunch. Whenever you introduce event-driven architectures, you’re signing a blood pact with complexity.

ConsiderationSynchronous PipelineAsync Pipeline
Upload LatencyAgonizingly slowBasically instant
ReliabilitySuper fragileHighly resilient
Search AvailabilityRight away (if it doesn’t crash)Whenever the workers catch up
Operational HeadacheVery lowPretty high
ScalabilityNon-existentPractically infinite
Failure RecoveryA manual nightmareDLQ + replay

You’re taking on the burden of managing Kafka, monitoring a fleet of workers, and dealing with eventual consistency. For a weekend hackathon? Stick to the synchronous script. But if you’re building a product that actual human beings are going to rely on day in and day out, this is genuinely the only responsible way to architect it.

What Comes Next

This architecture handles the core pipeline, but production systems layer on a few more pieces. You’ll eventually need a re-indexing strategy for when you upgrade embedding models, and you’ll want priority queues for files where the user is actively waiting at the UI. If you process a lot of identical boilerplate contracts, adding an embedding cache will cut your API costs significantly. And once you move beyond PDFs, you’ll need multi-modal pipelines to handle image and audio extraction.

But the pipeline described here is the foundation. Every one of those extensions plugs neatly into the worker layer and the message broker, which is exactly why decoupling was worth the complexity in the first place.