Building a Pub/Sub System in .NET: MassTransit, Reactive Extensions, and BlockingCollection

Leader posted 5 min read

Modern applications often need to broadcast events across multiple services.
Think of an order being placed in an e-commerce system. The order service publishes an event, and multiple subscribers react independently:

  1. the billing service charges the customer,

  2. the email service sends a receipt,

  3. the analytics service tracks the order.

This is the essence of the Publish/Subscribe (Pub/Sub) pattern.

In this post we’ll explore what it is, why it matters, and three implementations in .NET:

MassTransit with RabbitMQ (for distributed systems)

Reactive Extensions (Rx) (for in-memory event streams)

BlockingCollection (for producer/consumer pipelines)

What is the Pub/Sub Pattern

  • The Pub/Sub pattern decouples senders from receivers.
  • The Publisher creates a message (e.g., OrderSubmitted) and sends it
    to a broker or channel.
  • The Broker delivers that message to all interested Subscribers.

Each Subscriber receives a copy of the message and processes it independently.

Benefits

  • Decoupling — publishers do not know subscribers
  • Scalability — multiple subscribers can process in parallel
  • Flexibility — add or remove subscribers without changing the
    publisher

Why it matters

This pattern has been around for decades in middleware like IBM MQ, JMS, RabbitMQ, and Kafka.
It is fundamental in:

Event-driven microservices

  • Real-time systems (chat apps, trading platforms)
  • Notification and alerting systems
  • Data pipelines and analytics

IoT device communication

At its core, Pub/Sub enables loose coupling and asynchronous communication.

⚙️ Hosted Services in ASP.NET Core

A hosted service is a background task that runs with your app host.
In .NET you typically inherit from BackgroundService and implement ExecuteAsync.

public class SampleWorker : BackgroundService
{
    private readonly ILogger<SampleWorker> _log;
    public SampleWorker(ILogger<SampleWorker> log) => _log = log;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        _log.LogInformation("Worker started");
        while (!ct.IsCancellationRequested)
        {
            // do work here
            await Task.Delay(TimeSpan.FromSeconds(5), ct);
        }
        _log.LogInformation("Worker stopping");
    }
}

// Register in Program.cs
builder.Services.AddHostedService<SampleWorker>();

In the MassTransit demo below, a PublisherService is implemented as a hosted service to periodically publish messages.

1️⃣ Pub/Sub with MassTransit and RabbitMQ

appsettings.json
{
  "RabbitMq": { "Host": "localhost", "Username": "guest", "Password": "guest" },
  "PublishIntervalSeconds": 5
}

OrderSubmitted.cs

namespace MassTransitDemo;

public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);

OrderSubmittedConsumer.cs

using MassTransit;
using Microsoft.Extensions.Logging;

namespace MassTransitDemo;

public class OrderSubmittedConsumer(ILogger<OrderSubmittedConsumer> logger) : IConsumer<OrderSubmitted>
{
    public Task Consume(ConsumeContext<OrderSubmitted> context)
    {
        var m = context.Message;
        logger.LogInformation("Consumed OrderSubmitted, OrderId {OrderId}, Email {Email}, Total {Total}",
            m.OrderId, m.CustomerEmail, m.Total);
        return Task.CompletedTask;
    }
}

PublisherService.cs

using MassTransit;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace MassTransitDemo;

public class PublisherOptions
{
    public int PublishIntervalSeconds { get; set; } = 5;
}

public class PublisherService(
    ILogger<PublisherService> logger,
    IPublishEndpoint publishEndpoint,
    IOptions<PublisherOptions> options) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var delay = TimeSpan.FromSeconds(options.Value.PublishIntervalSeconds);
        logger.LogInformation("Publisher started, interval {Seconds}s", options.Value.PublishIntervalSeconds);

        while (!stoppingToken.IsCancellationRequested)
        {
            var msg = new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", Random.Shared.Next(50, 300));
            await publishEndpoint.Publish(msg, stoppingToken);
            logger.LogInformation("Published OrderSubmitted, OrderId {OrderId}", msg.OrderId);
            await Task.Delay(delay, stoppingToken);
        }

        logger.LogInformation("Publisher stopping");
    }
}

Program.cs

    using MassTransit;
    using MassTransitDemo;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    
    var builder = Host.CreateApplicationBuilder(args);
    
    builder.Configuration.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
    builder.Services.Configure<PublisherOptions>(builder.Configuration);
    
    builder.Services.AddMassTransit(mt =>
    {
        mt.AddConsumer<OrderSubmittedConsumer>();
    
        mt.UsingRabbitMq((ctx, cfg) =>
        {
            var section = builder.Configuration.GetSection("RabbitMq");
            cfg.Host(section["Host"], h =>
            {
                h.Username(section["Username"]);
                h.Password(section["Password"]);
            });
    
            cfg.ReceiveEndpoint("masstransitdemo-order-submitted", e =>
            {
                e.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
            });
    
            cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
            cfg.UseInMemoryOutbox();
        });
    });
    
    builder.Services.AddHostedService<PublisherService>();
    
    builder.Logging.ClearProviders();
    builder.Logging.AddSimpleConsole(o =>
    {
        o.SingleLine = true;
        o.TimestampFormat = "HH:mm:ss ";
    });
    
    await builder.Build().RunAsync();

Run RabbitMQ

docker run -it --rm -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# UI at http://localhost:15672  user guest  pass guest

MassTransit is a distributed application framework for message brokers like RabbitMQ and Azure Service Bus. It takes care of serialization, consumer lifecycle, retries, and endpoint setup.

2️⃣ Pub/Sub with Reactive Extensions (Rx)

Reactive Extensions (Rx)
is a library for composing asynchronous and event based programs with observable sequences.

Uses IObservable (event source) and IObserver (subscriber)

Provides LINQ-style operators (Where, Select, Buffer) for event streams

Perfect for in-memory event pipelines and UI scenarios

⚠️ Limitation: Rx is in-memory only. Events vanish when the app stops.

Program.cs

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);

class Program
{
    static void Main()
    {
        var bus = new Subject<OrderSubmitted>();

        // Subscriber A
        var subA = bus.Subscribe(order =>
            Console.WriteLine($"[Email] Send receipt to {order.CustomerEmail}"));

        // Subscriber B with filter and transform
        var subB = bus
            .Where(o => o.Total >= 100)
            .Select(o => new { o.OrderId, Vat = o.Total * 0.24m })
            .Subscribe(x =>
                Console.WriteLine($"[Analytics] Order {x.OrderId}, VAT {x.Vat:F2}"));

        // Publish a few messages
        bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 79.90m));
        bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 149.00m));
        bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 220.00m));

        bus.OnCompleted();
        subA.Dispose();
        subB.Dispose();
    }
}

3️⃣ Pub/Sub with BlockingCollection

BlockingCollection is a thread safe producer and consumer collection in .NET.

Supports blocking (consumers wait until data is available)

Supports bounding (limit capacity to avoid flooding)

Useful for background workers, pipelines, or batch processors

Program.cs

using System.Collections.Concurrent;

public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);

class Program
{
    static async Task Main()
    {
        using var queue = new BlockingCollection<OrderSubmitted>(boundedCapacity: 100);

        // Subscriber A
        var emailTask = Task.Run(() =>
        {
            foreach (var order in queue.GetConsumingEnumerable())
                Console.WriteLine($"[Email] Send receipt to {order.CustomerEmail}");
        });

        // Subscriber B
        var analyticsTask = Task.Run(() =>
        {
            foreach (var order in queue.GetConsumingEnumerable())
                Console.WriteLine($"[Analytics] Track order {order.OrderId}, total {order.Total}");
        });

        // Publisher
        foreach (var price in new[] { 49.50m, 129.00m, 250.00m, 15.00m })
        {
            queue.Add(new OrderSubmitted(Guid.NewGuid(), $"user{price}@example.com", price));
        }

        queue.CompleteAdding();
        await Task.WhenAll(emailTask, analyticsTask);
    }
}

⚠️ Limitation: Only works inside a single process. No distribution.

References

MassTransit Documentation

MassTransit GitHub

RabbitMQ Tutorials

Microsoft Docs: Worker Services

Microsoft Docs: Publisher-Subscriber Pattern

System.Reactive GitHub

BlockingCollection Overview

✅ With these three approaches, you now have the full spectrum of Pub/Sub in .NET:

Lightweight in-memory streams with Rx and BlockingCollection

Robust distributed messaging with MassTransit and RabbitMQ

The Pub/Sub pattern has lasted decades because it makes systems more modular, more scalable, and more resilient to change.

If you read this far, tweet to the author to show them you care. Tweet a Thanks

Great article, really clear comparison of different Pub/Sub approaches in .NET

I’m glad you liked the article!
You can find the source code here: https://github.com/stevsharp/DotNet-PubSub-Examples
These examples are based on my own experience.

Good examples
RabbitMQ has mqtt implementation also.
I found that that the hierarchial topics in mqtt are quite flexible and can be used in many situations.
especially the subscriber can even subscribe to the receive messages in an entire tree.
Thanks for this article

Thanks for your kind words! RabbitMQ is indeed a great tool, similar in many ways to Azure Service Bus. On top of that, MassTransit adds a lot of value as a generic gateway and abstraction layer for working with multiple messaging systems.

It is not about RabbitMQ, as much as about the various pub/sub protocols.
I feel pub/sub is a great thing.
I had used emqx, which is also a great server for mqtt, and could build real flexile system.

More Posts

MassTransit in ASP.NET Core: A Practical Guide to Event-Driven .NET

Spyros - Sep 29

Building an Order Processing Saga with MassTransit

Spyros - Aug 12

Understanding MediatR Assembly Registration in .NET

Moses Korir - Jul 4

Understanding the Observer Pattern in C# with IObservable and IObserver

Spyros - Mar 11

Supercharging EF Core Specifications with EF.CompileQuery

Spyros - Aug 5
chevron_left