Hey folks!
Continuing the My Broker B3 series, today we'll build the first microservice in the ecosystem: the trading-broker-market-data.
This service has a simple and well-defined responsibility: fetch real quotes for Brazilian assets from the Brapi API and distribute them to the ecosystem — persisting the history in MongoDB and publishing updated prices to Kafka for other services to consume.
️ This Service's Role in the Ecosystem
[brapi.dev] ◀── HTTP ── [broker-market-data-api]
│
┌─────────────┴──────────────┐
▼ ▼
MongoDB Kafka
(price_history) (trading-assets-market-data-v1)
│
[broker-asset-api] (consumer)
It's the entry point for real data into the system. Without it, the ecosystem operates on static prices.
MVP Focus
The goal here is clear: have real quotes available in Kafka for other services to react to. In this phase, I prioritized:
- Scheduled ingestion of 50 assets (Blue Chips, REITs and ETFs)
- Historical persistence in MongoDB for audit
- Kafka event publishing for downstream consumers
- Rate limit protection for the Brapi API (free plan)
- Immediate execution on service startup
️ Tech Stack
| Technology | Usage |
| Python 3.12 | Service runtime |
| requests | HTTP client for Brapi API |
| pymongo | MongoDB persistence |
| confluent-kafka | Kafka producer |
| schedule | Periodic scheduling |
| python-dotenv | Environment variables |
️ The Implementation
The Ingestion Loop
The heart of the service is the run_ingestion() function, which iterates over the watchlist, fetches each asset's price and distributes the data:
def run_ingestion():
"""Fetches market data from Brapi, persists to MongoDB and publishes to Kafka."""
logger.info("Market data ingestion job started")
token = os.getenv("BRAPI_TOKEN")
if not token:
logger.error("BRAPI_TOKEN not set. Aborting.")
return
# Producer created per run to avoid crash on import if Kafka is unavailable
producer = Producer({
'bootstrap.servers': os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
'client.id': 'broker-market-data-api'
})
mongo = MongoRepository()
success_count = 0
error_count = 0
for ticker in WATCHLIST:
try:
url = f"https://brapi.dev/api/quote/{ticker}?token={token}"
response = requests.get(url, timeout=10)
response.raise_for_status()
result = response.json()["results"][0]
payload = {
"ticker": result.get("symbol"),
"name": result.get("longName"),
"short_name": result.get("shortName"),
"price": result.get("regularMarketPrice"),
"volume": result.get("regularMarketVolume"),
"updated_at": result.get("regularMarketTime")
}
# 1. Persist historical record in MongoDB
mongo.save(payload.copy())
# 2. Publish event to Kafka
producer.produce(
topic=TOPIC_NAME,
key=ticker,
value=json.dumps(payload).encode('utf-8'),
callback=delivery_report
)
producer.poll(0)
success_count += 1
time.sleep(0.5) # Rate limiting — respect Brapi free plan
except Exception as e:
error_count += 1
logger.error(f"Error processing ticker {ticker}: {e}")
producer.flush()
logger.info(f"Ingestion completed: {success_count} succeeded, {error_count} failed")
Important design decisions:
Why create the Producer inside the function? If it were at module level, an unavailable Kafka at import time would crash the script before executing any logic. Creating it inside the function means a Kafka failure doesn't prevent execution — only that run fails.
Why try/catch per ticker? If one asset fails (timeout, invalid ticker), the loop continues for the rest. The error_count at the end shows how many failed without stopping the work.
The time.sleep(0.5) — respects the rate limit of Brapi's free plan. For 50 assets, the full run takes ~25 seconds.
The Scheduling
if __name__ == "__main__":
logger.info(f"Starting — sync every {SYNC_INTERVAL_MINUTES} minutes")
# Run immediately on startup, then on schedule
run_ingestion()
schedule.every(SYNC_INTERVAL_MINUTES).minutes.do(run_ingestion)
while True:
schedule.run_pending()
time.sleep(1)
Why run immediately on startup? If the service restarts, we don't want to wait 30 minutes for data in Kafka. The first execution is immediate, subsequent ones follow the schedule.
The Kafka Payload
{
"ticker": "PETR4",
"name": "Petróleo Brasileiro S.A. - Petrobras",
"short_name": "PETROBRAS PN",
"price": 35.50,
"volume": 15234567,
"updated_at": 1714234567
}
Topic: trading-assets-market-data-v1
Key: asset symbol (e.g. PETR4) — ensures events for the same asset go to the same partition
The Watchlist
50 assets covering the main B3 segments:
WATCHLIST = [
# Blue Chips
"PETR4", "PETR3", "VALE3", "ITUB4", "BBDC4", "BBAS3", ...
# REITs / FIIs
"MXRF11", "HGLG11", "KNIP11", "XPLG11", ...
# ETFs
"BOVA11", "IVVB11"
]
Security: Token Out of Code
# .env — never committed
BRAPI_TOKEN=your_token_here
# application — no fallback, required variable
token = os.getenv("BRAPI_TOKEN")
if not token:
logger.error("BRAPI_TOKEN not set. Aborting.")
return
Never expose credentials as default values in public repositories.
✅ Validating the Execution
With the service running locally:
- ✅ First run executes immediately on startup
- ✅ MongoDB receives documents with
created_at automatically
- ✅ Kafka receives messages with key = ticker
- ✅ Scheduler waits 30 minutes for the next round
What's Next?
With broker-market-data-api publishing quotes to Kafka, the next step is b3-market-sync-api — the Java service that consumes these events and synchronizes prices to Redis, feeding the Matching Engine with real-time data.
About the Series
⬅️ Previous Post: Infrastructure as Code
➡️ Next Post: Syncing the Real Market: Consuming Brapi and Feeding Redis with Spring Boot
Series Index: Series Roadmap
Links: