From Stream to Database: Processing Market Data with Spring Boot, Redis, and Flyway

posted 3 min read

Hey folks!

Continuing the My Broker B3 series, today we build trading-broker-asset — the Java service that consumes the quotes published to Kafka by trading-broker-market-data and transforms them into structured data in MySQL, with high-performance caching in Redis.

If the Python service is the collector, this one is the processor.


️ This Service's Role in the Ecosystem

[trading-broker-market-data]
         │
         │ Kafka: trading-assets-market-data-v1
         ▼
[trading-broker-asset]
         │
    ┌────┴────┐
    ▼         ▼
  MySQL     Redis
(assets)  market:price:{TICKER}
              │
    [b3-matching-engine]   (reads from here)
    [trading-broker-order] (validates ticker from here)

Two critical consumers depend on the data this service maintains: the Matching Engine (for price decisions) and the Order API (to validate whether a ticker exists and is active).


MVP Focus

In this phase, I prioritized:

  • Robust Kafka consumer with error handling
  • Asset upsert: creates on first event, updates on subsequent ones
  • Redis cache with 10-minute TTL
  • REST API for catalog queries
  • Status filter: only ACTIVE assets are returned

️ Tech Stack

Technology Usage
Java 21 + Spring Boot 3.5.11 Service core
Spring Kafka Quote event consumption
MySQL + Flyway Versioned asset catalog
Spring Data Redis Real-time price cache
SpringDoc OpenAPI Swagger UI documentation

️ Implementation Pillars

1. The Database Schema (Flyway)

CREATE TABLE assets (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    ticker VARCHAR(20) NOT NULL UNIQUE,
    name VARCHAR(200),
    current_price DECIMAL(19, 4),
    last_update DATETIME,
    status ENUM('ACTIVE', 'INACTIVE') NOT NULL DEFAULT 'ACTIVE',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Why Flyway? Schema versioning ensures that any environment (local, CI, production) has exactly the same database structure, without surprises.

2. The Kafka Consumer

@KafkaListener(
        topics = "trading-assets-market-data-v1",
        groupId = "trading-broker-asset"
)
public void consume(AssetMarketDataDTO dto) {
    log.info("Market data received for ticker: {}", dto.getTicker());
    try {
        assetService.updateAsset(dto);
    } catch (Exception e) {
        log.error("Failed to process market data for ticker {}: {}",
                dto.getTicker(), e.getMessage(), e);
        // Rethrow so Kafka can apply retry policy instead of silently discarding
        throw e;
    }
}

Why rethrow the exception? If processing fails silently, the offset is committed and the message is discarded — the asset is never updated. Rethrowing lets Kafka apply its retry policy.

3. The Asset Upsert

@Transactional
public void updateAsset(AssetMarketDataDTO dto) {
    Asset asset = assetRepository.findByTicker(dto.getTicker())
            .orElse(Asset.builder()
                    .ticker(dto.getTicker())
                    .status(AssetStatus.ACTIVE)
                    .build());

    asset.setName(dto.getName());
    asset.setCurrentPrice(dto.getPrice());
    // Uses real market timestamp from event — not LocalDateTime.now()
    asset.setLastUpdate(dto.getUpdatedAt() != null ? dto.getUpdatedAt() : LocalDateTime.now());

    assetRepository.save(asset);

    // Update Redis cache after persisting
    marketPriceCacheService.updatePrice(dto.getTicker(), dto.getPrice());
}

Important decision: we use the real updatedAt from the Kafka event, not LocalDateTime.now(). This ensures lastUpdate in the database reflects when the price was generated by the market, not when it was processed by the service.

4. Redis Cache with TTL

private static final Duration CACHE_TTL = Duration.ofMinutes(10);

public void updatePrice(String ticker, BigDecimal price) {
    String key = "market:price:" + ticker;
    // TTL of 10 minutes — prevents stale prices for inactive or delisted assets
    redisTemplate.opsForValue().set(key, price.toString(), CACHE_TTL);
}

Why TTL? If an asset is deactivated or delisted, its price will expire from Redis in 10 minutes. Without TTL, the Matching Engine could use prices for assets that no longer exist.

5. Status Filter in Queries

public List<AssetDTO> findAllActive() {
    // Query directly by status — not findAll() + filter in memory
    return assetRepository.findAllByStatus(AssetStatus.ACTIVE).stream()
            .map(AssetDTO::fromEntity)
            .toList();
}

public Optional<AssetDTO> findByTicker(String ticker) {
    // Only returns ACTIVE assets — inactive returns 404
    return assetRepository.findByTickerAndStatus(ticker.toUpperCase(), AssetStatus.ACTIVE)
            .map(AssetDTO::fromEntity);
}

When trading-broker-order validates a ticker before creating an order, it calls GET /api/v1/assets/{ticker}. If the asset is inactive, it gets a 404 and the order is rejected. Correct and predictable.


REST API

Method Endpoint Description
GET /api/v1/assets List all active assets
GET /api/v1/assets/{ticker} Find asset by ticker (404 if inactive)

Swagger UI: http://localhost:8083/swagger-ui.html


✅ Validating the Execution

With the application running and trading-broker-market-data publishing to Kafka:

  • ✅ Consumer connected to the trading-assets-market-data-v1 topic
  • ✅ Assets being created/updated in MySQL in real time
  • ✅ Redis being updated with TTL after each event
  • ✅ Logs showing Updating ticker PETR4 with price R$ 47.37

What's Next?

With the asset catalog working, the next step is b3-market-sync-api — which synchronizes prices directly to the B3's Redis — followed by b3-matching-engine-api, which uses those prices to execute orders.


About the Series

⬅️ Previous Post: Tooling Tips: MongoDB and Kafka

➡️ Next Post: Syncing the Real Market: Consuming Brapi and Feeding Redis with Spring Boot

Series Index: Series Roadmap


Links:

More Posts

I’m a Senior Dev and I’ve Forgotten How to Think Without a Prompt

Karol Modelskiverified - Mar 19

From Subjective Narratives to Objective Data: Re-engineering the Elderly Care Communication Loop

Huifer - Jan 28

Optimizing the Clinical Interface: Data Management for Efficient Medical Outcomes

Huifer - Jan 26

Breaking the AI Data Bottleneck: How Hammerspace's AI Data Platform Eliminates Migration Nightmares

Tom Smithverified - Mar 16

From Anxiety to Analytics: Using Data Trends to Transform Family Healthcare Communication

Huifer - Feb 1
chevron_left

Related Jobs

View all jobs →

Commenters (This Week)

1 comment
1 comment
1 comment

Contribute meaningful comments to climb the leaderboard and earn badges!