Market Data Integrator: Consuming Real-Time Data with Python, MongoDB, and Kafka

posted 3 min read

Hey folks!

Continuing the My Broker B3 series, today we'll build the first microservice in the ecosystem: the trading-broker-market-data.

This service has a simple and well-defined responsibility: fetch real quotes for Brazilian assets from the Brapi API and distribute them to the ecosystem — persisting the history in MongoDB and publishing updated prices to Kafka for other services to consume.


️ This Service's Role in the Ecosystem

[brapi.dev] ◀── HTTP ── [broker-market-data-api]
                                  │
                    ┌─────────────┴──────────────┐
                    ▼                             ▼
             MongoDB                           Kafka
        (price_history)          (trading-assets-market-data-v1)
                                               │
                                    [broker-asset-api] (consumer)

It's the entry point for real data into the system. Without it, the ecosystem operates on static prices.


MVP Focus

The goal here is clear: have real quotes available in Kafka for other services to react to. In this phase, I prioritized:

  • Scheduled ingestion of 50 assets (Blue Chips, REITs and ETFs)
  • Historical persistence in MongoDB for audit
  • Kafka event publishing for downstream consumers
  • Rate limit protection for the Brapi API (free plan)
  • Immediate execution on service startup

️ Tech Stack

Technology Usage
Python 3.12 Service runtime
requests HTTP client for Brapi API
pymongo MongoDB persistence
confluent-kafka Kafka producer
schedule Periodic scheduling
python-dotenv Environment variables

️ The Implementation

The Ingestion Loop

The heart of the service is the run_ingestion() function, which iterates over the watchlist, fetches each asset's price and distributes the data:

def run_ingestion():
    """Fetches market data from Brapi, persists to MongoDB and publishes to Kafka."""
    logger.info("Market data ingestion job started")

    token = os.getenv("BRAPI_TOKEN")
    if not token:
        logger.error("BRAPI_TOKEN not set. Aborting.")
        return

    # Producer created per run to avoid crash on import if Kafka is unavailable
    producer = Producer({
        'bootstrap.servers': os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
        'client.id': 'broker-market-data-api'
    })

    mongo = MongoRepository()
    success_count = 0
    error_count = 0

    for ticker in WATCHLIST:
        try:
            url = f"https://brapi.dev/api/quote/{ticker}?token={token}"
            response = requests.get(url, timeout=10)
            response.raise_for_status()

            result = response.json()["results"][0]
            payload = {
                "ticker": result.get("symbol"),
                "name": result.get("longName"),
                "short_name": result.get("shortName"),
                "price": result.get("regularMarketPrice"),
                "volume": result.get("regularMarketVolume"),
                "updated_at": result.get("regularMarketTime")
            }

            # 1. Persist historical record in MongoDB
            mongo.save(payload.copy())

            # 2. Publish event to Kafka
            producer.produce(
                topic=TOPIC_NAME,
                key=ticker,
                value=json.dumps(payload).encode('utf-8'),
                callback=delivery_report
            )
            producer.poll(0)

            success_count += 1
            time.sleep(0.5)  # Rate limiting — respect Brapi free plan

        except Exception as e:
            error_count += 1
            logger.error(f"Error processing ticker {ticker}: {e}")

    producer.flush()
    logger.info(f"Ingestion completed: {success_count} succeeded, {error_count} failed")

Important design decisions:

Why create the Producer inside the function? If it were at module level, an unavailable Kafka at import time would crash the script before executing any logic. Creating it inside the function means a Kafka failure doesn't prevent execution — only that run fails.

Why try/catch per ticker? If one asset fails (timeout, invalid ticker), the loop continues for the rest. The error_count at the end shows how many failed without stopping the work.

The time.sleep(0.5) — respects the rate limit of Brapi's free plan. For 50 assets, the full run takes ~25 seconds.

The Scheduling

if __name__ == "__main__":
    logger.info(f"Starting — sync every {SYNC_INTERVAL_MINUTES} minutes")

    # Run immediately on startup, then on schedule
    run_ingestion()

    schedule.every(SYNC_INTERVAL_MINUTES).minutes.do(run_ingestion)

    while True:
        schedule.run_pending()
        time.sleep(1)

Why run immediately on startup? If the service restarts, we don't want to wait 30 minutes for data in Kafka. The first execution is immediate, subsequent ones follow the schedule.

The Kafka Payload

{
  "ticker": "PETR4",
  "name": "Petróleo Brasileiro S.A. - Petrobras",
  "short_name": "PETROBRAS PN",
  "price": 35.50,
  "volume": 15234567,
  "updated_at": 1714234567
}

Topic: trading-assets-market-data-v1
Key: asset symbol (e.g. PETR4) — ensures events for the same asset go to the same partition


The Watchlist

50 assets covering the main B3 segments:

WATCHLIST = [
    # Blue Chips
    "PETR4", "PETR3", "VALE3", "ITUB4", "BBDC4", "BBAS3", ...
    # REITs / FIIs
    "MXRF11", "HGLG11", "KNIP11", "XPLG11", ...
    # ETFs
    "BOVA11", "IVVB11"
]

Security: Token Out of Code

# .env — never committed
BRAPI_TOKEN=your_token_here
# application — no fallback, required variable
token = os.getenv("BRAPI_TOKEN")
if not token:
    logger.error("BRAPI_TOKEN not set. Aborting.")
    return

Never expose credentials as default values in public repositories.


✅ Validating the Execution

With the service running locally:

  • ✅ First run executes immediately on startup
  • ✅ MongoDB receives documents with created_at automatically
  • ✅ Kafka receives messages with key = ticker
  • ✅ Scheduler waits 30 minutes for the next round

What's Next?

With broker-market-data-api publishing quotes to Kafka, the next step is b3-market-sync-api — the Java service that consumes these events and synchronizes prices to Redis, feeding the Matching Engine with real-time data.


About the Series

⬅️ Previous Post: Infrastructure as Code

➡️ Next Post: Syncing the Real Market: Consuming Brapi and Feeding Redis with Spring Boot

Series Index: Series Roadmap


Links:

1 Comment

1 vote

More Posts

Dashboard Operasional Armada Rental Mobil dengan Python + FastAPI

Masbadar - Mar 12

Optimizing the Clinical Interface: Data Management for Efficient Medical Outcomes

Huifer - Jan 26

Breaking the AI Data Bottleneck: How Hammerspace's AI Data Platform Eliminates Migration Nightmares

Tom Smithverified - Mar 16

I Wrote a Script to Fix Audible's Unreadable PDF Filenames

snapsynapseverified - Apr 20

5 Web Dev Pitfalls That Are Silently Killing Your Projects (With Real Fixes)

Dharanidharan - Mar 3
chevron_left

Related Jobs

View all jobs →

Commenters (This Week)

2 comments
2 comments
1 comment

Contribute meaningful comments to climb the leaderboard and earn badges!