Check out Mercury, a business operations management application on https://usemercury.app.

Scenrario

A large retailer with 50,000 SKUs decides to migrate their inventory to Mercury. They upload their carefully prepared CSV file, watch the progress bar crawl to 99%, and then... crash. The entire import fails with an unhelpful 500 Internal Server Error. Hours of work, gone.

The first few iterations of the import inventory failed woefully.

The Hidden Complexities

Data Integrity Challenges:

Performance Bottlenecks:

Concurrency Nightmares:

System Architecture: A Multi-Stage Pipeline

My solution is a sophisticated pipeline that isolates concerns, maximizes parallelism, and provides multiple layers of resilience. Built on FastAPI and Beanie ODM (asyncio-native MongoDB ODM), the system processes files through distinct, optimized stages.

image

Key Architectural Principles

  1. Asynchronous by Design: Every I/O operation is non-blocking, enabling high concurrency
  2. Batch Everything: Transform O(N) operations into O(1) or O(log N) operations
  3. Fail Fast, Recover Smart: Validate early, but handle failures gracefully
  4. Isolation of Concerns: Each stage has a single responsibility and clear interfaces
  5. Observable and Debuggable: Comprehensive logging and metrics at every stage

Intelligent File Processing and Parsing

The first stage demonstrates that even "simple" file parsing requires sophisticated engineering when done at scale. Mercury supports 7 different file formats through a pluggable parser architecture:

# From mercury/utilities/inventory_utils.py

def parse_document(filename: str, content: bytes) -> List[Dict[str, Any]]:
    """
    Parse documents with intelligent format detection and fallback strategies.
    Supports: CSV, Excel (XLSX/XLS), PDF, DOCX, TXT, JSON, XML
    """
    ext = os.path.splitext(filename)[1].lower()
    
    parser_mapping = {
        ".csv": parse_csv,
        ".xlsx": parse_excel, ".xls": parse_excel,
        ".pdf": parse_pdf,
        ".docx": parse_docx,
        ".txt": parse_txt,
        ".json": parse_json,
        ".xml": parse_xml
    }
    
    if ext in parser_mapping:
        return parser_mapping[ext](content)
    else:
        # Fallback: attempt regex-based parsing
        return parse_with_regex(content)

The Excel Parser: Handling Real-World Messiness

Excel files are particularly tricky because users create them in unpredictable ways. Our parser handles common issues like:

def parse_excel(content: bytes) -> List[Dict[str, Any]]:
    """
    Robust Excel parser handling:
    - BOM (Byte Order Mark) characters
    - Inconsistent header formats
    - Empty rows and columns
    - Auto-generated column names
    """
    # Skip the first row (often contains formatting) and use predefined headers
    df = pd.read_excel(BytesIO(content), skiprows=1, names=[
        "SKU", "Name", "Description", "Price", "Quantity In Stock",
        "Reorder Point", "Reorder Quantity", "Unit Of Measure", "Category"
    ])

    # Strip BOM characters that break column matching
    df.columns = [col.strip().lstrip('\ufeff') for col in df.columns]
    
    return df.to_dict(orient="records")

Memory-Efficient Pre-processing

Once parsed, we perform a critical single-pass analysis of the entire dataset. This step eliminates the need for multiple scans and sets up optimal batch operations:

# From mercury/services/inventory.py

async def load_inventory_from_file(self, user: User, has_sku: bool, file: UploadFile):
    # Parse entire file into memory (acceptable for files up to 100MB)
    inventory_list = parse_document(file.filename, await file.read())
    
    # Single pass extraction - crucial optimization
    unique_categories = set()
    provided_skus = set()
    category_counts = defaultdict(int)

    for row in inventory_list:
        # Normalize category names for consistency
        cat = row.get("Category", "general").lower()
        unique_categories.add(cat)
        category_counts[cat] += 1

        # Collect user-provided SKUs for validation
        if has_sku and row.get("SKU"):
            provided_skus.add(row["SKU"])

This single loop gives us complete knowledge about the import, allowing us to:

The SKU Collision Problem - A Tale of Two Strategies

This dealt with me a lot. Initially, I assumed there won't be any reoccurence with regards the SKU and it worked for the first few entries - 100, 500, 1000. As it grew, it started failing with a duplicate error with the SKU and it became frustrating. My naive solution was to do an if...else but that wasn't efficient enough.

Stock Keeping Units (SKUs) must be unique within a business's inventory. The has_sku boolean parameter creates a strategic fork in our processing logic, each optimized for different scenarios.

image ### Strategy 1: User-Provided SKUs - Trust but Verify

When users provide their own SKUs, we implement a fail-fast validation strategy:

# Parallel data fetching for optimal performance
if provided_skus:
    provided_sku_check_task = InventoryItem.find(
        In(InventoryItem.SKU, list(provided_skus)),
        InventoryItem.owned_by == user_business.id
    ).project(InventoryItemProj).to_list()

# Execute multiple database operations concurrently
category_task = InventoryCategory.find(
    In(InventoryCategory.name, list(unique_categories)),
    InventoryCategory.owned_by == user_business.id
).to_list()

all_existing_skus_task = InventoryItem.find(
    InventoryItem.owned_by == user_business.id
).project(InventoryItemProj).to_list()

# Await all operations simultaneously
existing_categories, all_existing_sku_docs = await asyncio.gather(
    category_task, all_existing_skus_task
)

if provided_sku_check_task:
    provided_conflicts = await provided_sku_check_task
    conflicting_skus = {doc.SKU for doc in provided_conflicts}
    
    if conflicting_skus:
        raise HTTPException(
            status_code=409,
            detail=f"The following SKUs already exist: {', '.join(conflicting_skus)}"
        )

Strategy 2: System-Generated SKUs - The Scaling Challenge

This is where the real engineering complexity lies. Generating unique identifiers at scale requires careful orchestration to avoid the dreaded N+1 query problem.

My Initial Naive Approach (Don't Do This):

# This will kill your database
for item in inventory_list:
    while True:
        candidate_sku = generate_sku(item.category)
        if not await check_sku_exists(candidate_sku):  # N database calls!
            item.sku = candidate_sku
            break

Improved Solution: Batched Pre-generation

async def _prepare_sku_batches(self, category_counts, existing_skus):
    """
    Pre-generate SKUs in batches with intelligent buffering.
    Transforms O(N) database calls into O(categories) calls.
    """
    sku_batches = {}
    tasks = []

    # Create concurrent generation tasks for each category
    for category, count in category_counts.items():
        # Buffer calculation: balance memory vs. collision risk
        buffer_size = max(count + 10, int(count * 1.2))
        task = self._generate_sku_batch(category, buffer_size)
        tasks.append((category, task))

    # Execute all generation tasks in parallel
    for category, task in tasks:
        generated_skus = await task
        # Filter against known existing SKUs
        unique_skus = [sku for sku in generated_skus if sku not in existing_skus]
        sku_batches[category] = unique_skus
        
    return sku_batches

Batched Asynchronous SKU Generation

The heart of Mercury is the sophisticated SKU generation pipeline that can create thousands of unique identifiers with minimal database load.

The Generation Algorithm

async def _generate_sku_batch(self, category: str, count: int) -> List[str]:
    """
    Generate a batch of SKUs with intelligent retry and validation.
    
    Performance characteristics:
    - Time: O(count + log(existing_skus))
    - Database queries: 1 per batch (not per SKU)
    - Memory: O(count) 
    """
    skus = []
    attempts = 0
    max_attempts = count * 2  # Prevent infinite loops

    while len(skus) < count and attempts < max_attempts:
        # Generate candidates in memory first
        batch_size = min(50, count - len(skus))
        candidate_skus = []
        
        for _ in range(batch_size):
            # SKU format: CATG-12345 (category prefix + 5 digits)
            prefix = category[:4].upper()
            number = ''.join(random.choices(string.digits, k=5))
            candidate_skus.append(f"{prefix}-{number}")

        # Single database query to check entire batch
        existing_check = await InventoryItem.find(
            In(InventoryItem.SKU, candidate_skus)
        ).project(InventoryItemProj).to_list()

        # Filter out existing SKUs
        existing_set = {doc.SKU for doc in existing_check}
        unique_skus = [sku for sku in candidate_skus if sku not in existing_set]
        
        skus.extend(unique_skus)
        attempts += batch_size

    return skus

Handling Edge Cases: Regeneration Logic

What happens when our initial batch doesn't generate enough unique SKUs? Mercury includes sophisticated regeneration logic:

# Check for insufficient batches and regenerate
regeneration_tasks = []
for category, count in category_counts.items():
    current_count = len(sku_batches[category])
    if current_count < count:
        shortage = count - current_count
        # Generate extra to handle potential duplicates
        regenerate_count = max(shortage + 5, int(shortage * 1.5))
        task = self._generate_sku_batch(category, regenerate_count)
        regeneration_tasks.append((category, shortage, task))

# Process regeneration results
for category, needed_count, task in regeneration_tasks:
    additional_skus = await task
    existing_category_skus = set(sku_batches[category]) | existing_skus

    # Add until we have enough, avoiding internal duplicates
    for sku in additional_skus:
        if sku not in existing_category_skus:
            sku_batches[category].append(sku)
            existing_category_skus.add(sku)
            if len(sku_batches[category]) >= category_counts[category]:
                break

SKU Distribution Strategy

During processing, we use a smart distribution mechanism to pull from our pre-generated batches:

async def _get_next_available_sku(self, category: str, sku_batches: Dict[str, List[str]],
                                  sku_counters: Dict[str, int], all_used_skus: Set[str]) -> str:
    """
    Efficiently distribute SKUs from pre-generated batches with fallback logic.
    """
    # Try to use pre-generated batch first
    if category in sku_batches and sku_counters[category] < len(sku_batches[category]):
        candidate_sku = sku_batches[category][sku_counters[category]]
        if candidate_sku not in all_used_skus:
            sku_counters[category] += 1
            return candidate_sku

    # Fallback: generate individual SKU if batch is exhausted
    max_attempts = 10
    for _ in range(max_attempts):
        candidate_sku = await generate_unique_sku(category)
        if candidate_sku not in all_used_skus:
            return candidate_sku

    raise ValueError(f"Failed to generate unique SKU for category {category}")

Resilient Database Insertion with Atomic Retry Logic

Even with perfect pre-generation, race conditions can still occur. Our final stage implements a sophisticated retry mechanism that handles partial failures gracefully.

The Challenge: Race Conditions in Distributed Systems

Consider this scenario: 1. Process A generates SKU "SHOE-12345" 2. Process B generates the same SKU "SHOE-12345" (due to random collision) 3. Process A successfully inserts its item 4. Process B attempts insertion and fails with duplicate key error

Our system must handle this elegantly without losing data or corrupting state.

Optimistic Insertion with Intelligent Retry

async def _insert_batch_with_retry(self, items_to_insert: List[InventoryItem], 
                                   business_id: PydanticObjectId,
                                   max_retries: int = 3) -> List[InventoryItem]:
    """
    Insert batches with atomic retry logic for handling race conditions.
    
    Key features:
    - Ordered=False for maximum throughput
    - Specific error code handling (11000 = duplicate key)
    - Partial success tracking
    - Atomic SKU regeneration for failed items
    """
    successful_items = []
    remaining_items = items_to_insert.copy()

    for attempt in range(max_retries + 1):
        if not remaining_items:
            break

        try:
            # Attempt optimistic batch insertion
            result = await InventoryItem.insert_many(remaining_items, ordered=False)
            
            # Success: update item IDs and track insertions
            for item, item_id in zip(remaining_items, result.inserted_ids):
                item.id = item_id
                successful_items.append(item)
                
            remaining_items = []  # All succeeded

        except BulkWriteError as e:
            # Parse detailed error information
            write_errors = e.details.get('writeErrors', [])
            inserted_count = e.details.get('nInserted', 0)
            
            # Handle partial success: some items were inserted
            if inserted_count > 0:
                error_indices = {error['index'] for error in write_errors}
                
                for idx, item in enumerate(remaining_items):
                    if idx not in error_indices and idx < inserted_count:
                        successful_items.append(item)

            # Isolate and handle duplicate key errors specifically
            duplicate_items = []
            for error in write_errors:
                if error.get('code') == 11000:  # MongoDB duplicate key error
                    error_index = error['index']
                    if error_index < len(remaining_items):
                        duplicate_item = remaining_items[error_index]
                        duplicate_items.append((error_index, duplicate_item))

            if not duplicate_items:
                # Non-duplicate errors are not retryable
                logger.error(f"Non-duplicate bulk write error: {e}")
                break

            # Regenerate SKUs for duplicate items atomically
            new_remaining_items = []
            for error_index, item in duplicate_items:
                if attempt < max_retries:
                    category_name = await self._get_category_name(item.category_id)
                    new_sku = await self._generate_guaranteed_unique_sku(
                        category_name, business_id
                    )
                    item.SKU = new_sku
                    new_remaining_items.append(item)
                    
                    logger.info(f"Reassigned SKU for duplicate item: {new_sku}")

            remaining_items = new_remaining_items

    return successful_items

Guaranteed Unique SKU Generation

When retries are needed, we use an even more robust generation method:

async def _generate_guaranteed_unique_sku(self, category_name: str, 
                                          business_id: PydanticObjectId,
                                          max_attempts: int = 50) -> str:
    """
    Generate a SKU with database verification, used for conflict resolution.
    
    This method trades performance for absolute guarantees.
    """
    for attempt in range(max_attempts):
        candidate_sku = await generate_unique_sku(category_name)

        # Verify uniqueness with direct database check
        existing = await InventoryItem.find_one({
            "SKU": candidate_sku,
            "owned_by": business_id
        })

        if not existing:
            return candidate_sku

        # Add small delay to prevent thundering herd
        if attempt > 10:
            await asyncio.sleep(0.01)

    # Ultimate fallback: timestamp-based uniqueness
    import time
    fallback_sku = f"{await generate_unique_sku(category_name)}-{int(time.time() * 1000) % 10000}"
    return fallback_sku

Key Optimizations

1. Batching Strategy

BATCH_SIZE = 200  # Optimal balance of memory vs. transaction overhead

for i in range(0, total_items, BATCH_SIZE):
    batch_items = inventory_list[i:i + BATCH_SIZE]
    # Process batch atomically

2. MongoDB Projection Optimization

# Only fetch required fields to reduce network overhead
.project(InventoryItemProj)  # Only SKU and ID fields

# Defined as:
class InventoryItemProj(BaseModel):
    SKU: str
    id: Optional[PydanticObjectId] = None

3. Concurrent Database Operations

# Execute multiple independent queries simultaneously
category_task, sku_task, conflict_task = await asyncio.gather(
    fetch_categories(),
    fetch_existing_skus(), 
    check_conflicts()
)

4. Memory Management

# Use generators for large datasets to prevent memory exhaustion
def process_in_chunks(items, chunk_size=1000):
    for i in range(0, len(items), chunk_size):
        yield items[i:i + chunk_size]

Configuration Tuning

# Production configuration values
BATCH_SIZE = 200              # Balance memory vs. transaction cost
MAX_RETRIES = 3               # Prevent infinite retry loops  
SKU_BUFFER_MULTIPLIER = 1.2   # Account for collisions
MAX_FILE_SIZE = 100_000_000   # 100MB limit
SKU_GENERATION_BATCH_SIZE = 50 # Optimal for MongoDB $in queries

Conclusion & Notes

Now, I wasn't particular about the processing speed but ensuring it worked. Now that it works and the collision of SKUs is eliminated, I'm exploring better ways to handle this -> handle uploads and processing in the background so users can focus on other things.

Thanks to Claude, I was able to combine the scattered pieces for this implementation.