Hey folks!
Continuing the My Broker B3 series, today we build trading-broker-asset — the Java service that consumes the quotes published to Kafka by trading-broker-market-data and transforms them into structured data in MySQL, with high-performance caching in Redis.
If the Python service is the collector, this one is the processor.
️ This Service's Role in the Ecosystem
[trading-broker-market-data]
│
│ Kafka: trading-assets-market-data-v1
▼
[trading-broker-asset]
│
┌────┴────┐
▼ ▼
MySQL Redis
(assets) market:price:{TICKER}
│
[b3-matching-engine] (reads from here)
[trading-broker-order] (validates ticker from here)
Two critical consumers depend on the data this service maintains: the Matching Engine (for price decisions) and the Order API (to validate whether a ticker exists and is active).
MVP Focus
In this phase, I prioritized:
- Robust Kafka consumer with error handling
- Asset upsert: creates on first event, updates on subsequent ones
- Redis cache with 10-minute TTL
- REST API for catalog queries
- Status filter: only
ACTIVE assets are returned
️ Tech Stack
| Technology | Usage |
| Java 21 + Spring Boot 3.5.11 | Service core |
| Spring Kafka | Quote event consumption |
| MySQL + Flyway | Versioned asset catalog |
| Spring Data Redis | Real-time price cache |
| SpringDoc OpenAPI | Swagger UI documentation |
️ Implementation Pillars
1. The Database Schema (Flyway)
CREATE TABLE assets (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
ticker VARCHAR(20) NOT NULL UNIQUE,
name VARCHAR(200),
current_price DECIMAL(19, 4),
last_update DATETIME,
status ENUM('ACTIVE', 'INACTIVE') NOT NULL DEFAULT 'ACTIVE',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Why Flyway? Schema versioning ensures that any environment (local, CI, production) has exactly the same database structure, without surprises.
2. The Kafka Consumer
@KafkaListener(
topics = "trading-assets-market-data-v1",
groupId = "trading-broker-asset"
)
public void consume(AssetMarketDataDTO dto) {
log.info("Market data received for ticker: {}", dto.getTicker());
try {
assetService.updateAsset(dto);
} catch (Exception e) {
log.error("Failed to process market data for ticker {}: {}",
dto.getTicker(), e.getMessage(), e);
// Rethrow so Kafka can apply retry policy instead of silently discarding
throw e;
}
}
Why rethrow the exception? If processing fails silently, the offset is committed and the message is discarded — the asset is never updated. Rethrowing lets Kafka apply its retry policy.
3. The Asset Upsert
@Transactional
public void updateAsset(AssetMarketDataDTO dto) {
Asset asset = assetRepository.findByTicker(dto.getTicker())
.orElse(Asset.builder()
.ticker(dto.getTicker())
.status(AssetStatus.ACTIVE)
.build());
asset.setName(dto.getName());
asset.setCurrentPrice(dto.getPrice());
// Uses real market timestamp from event — not LocalDateTime.now()
asset.setLastUpdate(dto.getUpdatedAt() != null ? dto.getUpdatedAt() : LocalDateTime.now());
assetRepository.save(asset);
// Update Redis cache after persisting
marketPriceCacheService.updatePrice(dto.getTicker(), dto.getPrice());
}
Important decision: we use the real updatedAt from the Kafka event, not LocalDateTime.now(). This ensures lastUpdate in the database reflects when the price was generated by the market, not when it was processed by the service.
4. Redis Cache with TTL
private static final Duration CACHE_TTL = Duration.ofMinutes(10);
public void updatePrice(String ticker, BigDecimal price) {
String key = "market:price:" + ticker;
// TTL of 10 minutes — prevents stale prices for inactive or delisted assets
redisTemplate.opsForValue().set(key, price.toString(), CACHE_TTL);
}
Why TTL? If an asset is deactivated or delisted, its price will expire from Redis in 10 minutes. Without TTL, the Matching Engine could use prices for assets that no longer exist.
5. Status Filter in Queries
public List<AssetDTO> findAllActive() {
// Query directly by status — not findAll() + filter in memory
return assetRepository.findAllByStatus(AssetStatus.ACTIVE).stream()
.map(AssetDTO::fromEntity)
.toList();
}
public Optional<AssetDTO> findByTicker(String ticker) {
// Only returns ACTIVE assets — inactive returns 404
return assetRepository.findByTickerAndStatus(ticker.toUpperCase(), AssetStatus.ACTIVE)
.map(AssetDTO::fromEntity);
}
When trading-broker-order validates a ticker before creating an order, it calls GET /api/v1/assets/{ticker}. If the asset is inactive, it gets a 404 and the order is rejected. Correct and predictable.
REST API
| Method | Endpoint | Description |
| GET | /api/v1/assets | List all active assets |
| GET | /api/v1/assets/{ticker} | Find asset by ticker (404 if inactive) |
Swagger UI: http://localhost:8083/swagger-ui.html
✅ Validating the Execution
With the application running and trading-broker-market-data publishing to Kafka:
- ✅ Consumer connected to the
trading-assets-market-data-v1 topic
- ✅ Assets being created/updated in MySQL in real time
- ✅ Redis being updated with TTL after each event
- ✅ Logs showing
Updating ticker PETR4 with price R$ 47.37
What's Next?
With the asset catalog working, the next step is b3-market-sync-api — which synchronizes prices directly to the B3's Redis — followed by b3-matching-engine-api, which uses those prices to execute orders.
About the Series
⬅️ Previous Post: Tooling Tips: MongoDB and Kafka
➡️ Next Post: Syncing the Real Market: Consuming Brapi and Feeding Redis with Spring Boot
Series Index: Series Roadmap
Links: