Building Mercury has been fun. I have learned more about software development and MongoDB in particular. For instance, I had never had to make use of MongoDB aggregations until Mercury, and it has been fascinating.

This little article is just to talk about MongoDB's .watch(). In a few words, what the method does is to watch the changes going on in and around a collection. For example, it can log the streams of insert, update, and delete operations happening to the selected collection.

Okay, what was I building?

So, I wanted to stream the changes going on in the database and communicate it via a websocket channel to the frontend guys. There's an activity tab that should update the user on changes especially as the application is a multi-user-tied-to-one-account application.

So...

You need to enable pre-image and post-image. I enabled it for all my collections from my mongosh console by running:

const collections = db.getCollectionNames().filter(c => !c.startsWith('system.'));

collections.forEach(collectionName => {
  db.runCommand({
    "collMod": collectionName,
    "changeStreamPreAndPostImages": { "enabled": true }
  });
});

I started by building out the websocket connection manager:

import asyncio

from fastapi import WebSocket
from typing import Dict
from mercury.infra.logger import logger


class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.lock = asyncio.Lock()

    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        async with self.lock:
            self.active_connections[user_id] = websocket
        logger.info(f"Client {user_id} connected. Active connections: {list(self.active_connections.keys())}")

    async def disconnect(self, user_id: str):
        async with self.lock:
            if user_id in self.active_connections:
                del self.active_connections[user_id]
        logger.info(f"Client {user_id} disconnected. Active connections: {list(self.active_connections.keys())}")

    async def send_to_client(self, user_id: str, message: str):
        async with self.lock:
            if user_id in self.active_connections:
                websocket = self.active_connections[user_id]
                await websocket.send_text(message)
                logger.info(f"Message sent to client {user_id}")
            else:
                logger.info(f"No active connection found for user {user_id}")

It's pretty straightforward. It handles the connection, keeping track and sending back to the designated users.

Then, the main action - the websocket route. For changes to be listened to, a connection needs to be established for a user. I'm not going to go deep at all, I'm just going to illustrate it. Pardon me, these days, I have become lazy.

Let's go however. I started by connecting a valid user aka storing the user's credential for subsequent communication:

if not client_id and not token:
    await websocket.close(code=1008, reason="Client ID and token required")
    return

if app.get_client_type_from_client_id(client_id) is None:
    await websocket.close(code=1000, reason="Invalid client ID")

user = app.authentication_service.decode_jwt(
    token
)

db = await app.database.get_db()

valid_user = await db.users.find_one({
    "email": user["user_id"]
})

if valid_user is None:
    await websocket.close(code=1000, reason="User does not exist")
    return

await app.websocket.connect(str(valid_user["_id"]), websocket)

First, I try to ascertain the validity of the client's ID, token and user before proceeding to connect the user, I make use of the ID as it's unique for each of them.

Let the watch party begin

In a try..except block, I start the main party. I need to be up to date with all the operations but I don't need all the information being passed around as well. Therefore, I wrote an aggregation pipeline to return just the fields I'm interested in:

pipeline = [
    {'$match': {'operationType': {'$in': ['insert', 'update', 'delete']}}},
    {"$project": {
        "operationType": 1,
        "documentKey": 1,
        "ns": 1,
        "fullDocument.creator_id": 1,
        "fullDocument.owner._id": 1,
        "fullDocument.owned_by": 1,
        "fullDocument.quantity_in_stock": 1,
        "fullDocument.reorder_point": 1,
        "fullDocumentBeforeChange.owned_by": 1,
        "fullDocumentBeforeChange.creator_id": 1
    }}
]

In some cases, I need the full document before a change is effected. An example is a delete operation where I need to get the document details to update the stream. I defined them in an options dict:

options = {
    'full_document': 'updateLookup',
    'full_document_before_change': 'required'
}

Then I begin the watch operation and store the operation type operation_type, document key (ID) document_key, and the collection collection:

async with db.watch(pipeline, **options) as change_stream:
    async for change in change_stream:
        operation_type = change['operationType']
        document_key = change['documentKey']["_id"]
        collection = change["ns"]["coll"]

With all these in place, I defined the specific actions to be carried out (mainly websocket broadcast) for each action. Here's the one for the delete operation:

if operation_type == 'delete':
    full_doc = change.get('fullDocumentBeforeChange', {})

    match collection:
        case "inventory_items":
            business = await db.businesses.find_one({
                "_id": full_doc.get("owned_by")
            })
            recipient = business["owner"].id if business else None
        case "orders":
            business = await db.businesses.find_one({
                "_id": full_doc.get("creator_id")
            })
            recipient = business["owner"].id if business else None
        case _:
            logger.info(f"Unhandled collection for delete: {collection}")
            recipient = None

    if recipient:
        message = f"Change detected: {operation_type} on document {document_key} in collection {collection}"
        logger.info(f"Attempting to send message to recipient: {recipient}")
        await app.websocket.send_to_client(str(recipient), message)
    else:
        logger.info(f"No recipient found for change: {operation_type} in {collection}")

The action for insert and update are similar si they're grouped togehter:

elif operation_type in ['insert', 'update']:
    full_doc = change.get('fullDocument', {})

    match collection:
        case "orders":
            business = await db.businesses.find_one({
                "_id": full_doc.get("creator_id")
            }, {"owner": 1})
            recipient = business["owner"].id if business else None
        case "inventory_items":
            business = await db.businesses.find_one({
                "_id": full_doc.get("owned_by")
            }, {"owner": 1})
            recipient = business["owner"].id if business else None

            if full_doc.get("quantity_in_stock") <= full_doc.get("reorder_point"):
                await app.websocket.send_to_client(str(recipient),
                                                "Stock low, please restock the inventory to continue issuing orders.")

        case _:
            logger.info(f"Unhandled collection for {operation_type}: {collection}")
            recipient = None

    if recipient:
        message = f"Change detected: {operation_type} on document {document_key} in collection {collection}"
        logger.info(f"Attempting to send message to recipient: {recipient}")
        await app.websocket.send_to_client(str(recipient), message)
    else:
        logger.info(f"No recipient found for change: {operation_type} in {collection}")

    else:
        logger.info(f"Unhandled operation type: {operation_type}")

The end:

except asyncio.CancelledError:
    # Handle cancellation gracefully
    logger.info("Task was cancelled. Cleaning up...")
except WebSocketDisconnect:
    await app.websocket.disconnect(str(valid_user["_id"]))
    logger.info(f"User {valid_user["_id"]} is disconnected")

I connected an active session:

Client 67a9f47786c624adede7---- connected. Active connections: ['67a9f47786c624adede7----']
Change detected: delete on document 67aa00fce173bf9d4ba2---- in collection orders

If you were...

If you were to build a streaming endpoint like this, how would you approach it?