Building a Resilient Real-Time WebSocket Stream in Flutter (RxDart + Clean Architecture + BLoC)

BackerLeader posted 2 min read

Most tutorials show how to connect to a WebSocket.

Very few show what happens when:

  • The network drops
  • DNS fails
  • The server stops sending data
  • The stream silently hangs

This article focuses on handling those cases properly.


The Problem

A basic WebSocket implementation:

final channel = WebSocketChannel.connect(uri);
channel.stream.listen(...);

Works… until real-world conditions appear:

Network loss → no error  
Stream freezes silently  
App never reconnects  
Users see stale data  

The Goal

We want a stream that:

  • reconnects automatically
  • handles both errors and silent failures
  • retries with exponential backoff
  • does not crash the app
  • integrates cleanly with BLoC

Step 1: Deferred Connection (Rx.defer)

Instead of creating the connection immediately, we defer it:

return Rx.defer(() async* {
  final channel = _websocketService.connect();
  await channel. ready;
  yield* channel.stream;
});

This ensures that each retry creates a fresh connection.


Step 2: Handle Connection Errors

If the connection fails:

try {
  final channel = _websocketService.connect();
} catch (e) {
  yield* Stream.error(e);
}

Convert failures into stream errors so retry logic can handle them.


Step 3: Handle Silent Failures (Timeout)

WebSockets may stop emitting without throwing errors.

.timeout(
  const Duration(seconds: 5),
  onTimeout: (sink) {
    sink.addError("Timeout");
  },
)

This forces a retry when no data is received.


Step 4: Control Update Frequency

Real-time streams can overwhelm the UI.

.sampleTime(const Duration(seconds: 1))

This emits the latest value at a fixed interval.


Step 5: Retry Logic (RetryWhenStream)

RetryWhenStream(
  streamFactory,
  (error, _) {
    final delay = Duration(seconds: (1 << attempt).clamp(1, 60));
    attempt++;
    return Rx.timer(null, delay);
  },
);

This implements exponential backoff:

Retry after 1s → 2s → 4s → 8s → ...

Final Remote Data Source

Stream<AllMidsModel> connectToStream() {
  var attempt = 0;

  return RetryWhenStream(
    () {
      return Rx.defer(() async* {
        try {
          final channel = _websocketService.connect();
          await channel.ready;

          yield* channel.stream
              .map((event) {
                final json = jsonDecode(event as String);

                if (json['channel'] == 'allMids') {
                  return AllMidsModel.fromJson(json['data']);
                }
                return null;
              })
              .whereType<AllMidsModel>()
              .timeout(
                const Duration(seconds: 5),
                onTimeout: (sink) => sink.addError("Timeout"),
              )
              .sampleTime(const Duration(seconds: 1))
              .doOnData((_) => attempt = 0);
        } catch (e) {
          yield* Stream.error(e);
        }
      });
    },
    (error, _) {
      final delay = Duration(seconds: (1 << attempt).clamp(1, 60));
      attempt++;
      return Rx.timer(null, delay);
    },
  );
}

Clean Architecture Integration

Data Layer

Handles WebSocket connection, retry logic, and parsing. Returns a stream of models.


Domain Layer

abstract interface class PriceRepository {
  Either<Failure, Stream<AllMids>> subscribe();
}

Responsible for:

  • mapping models to entities
  • converting exceptions into failures

Use Case

class SubscribeToStream implements StreamUseCase<AllMids, NoParams> {
  final PriceRepository repo;

  @override
  call(_) => repo.subscribe();
}

Presentation Layer (BLoC)

Instead of passing streams to the UI, BLoC consumes the stream:

on<SubscribeToPriceStream>(
  _onSubscribe,
  transformer: restartable(),
);
await emit.forEach(
  stream,
  onData: (data) => PriceUpdated(data),
  onError: (e, _) => PriceStreamError(e.toString()),
);

Why restartable()?

It ensures:

  • new events cancel previous subscriptions
  • No duplicate streams remain active

UI Layer

switch (state) {
  case PriceUpdated(:final data):
    return Text("${data.price}");

  case PriceStreamError(:final msg):
    return Text(msg);

  case StartingPriceStream():
    return CircularProgressIndicator();
}

The UI reacts to states only. It has no knowledge of streams.


What This Solves

  • automatic reconnection
  • silent failure handling
  • controlled UI updates
  • crash prevention
  • clean separation of concerns

Key Takeaways

  • WebSockets do not always fail with errors
  • You must handle both errors and inactivity
  • Retry logic must recreate the connection
  • Streams should be owned by BLoC, not UI
  • RxDart is powerful when used with intent

Final Thought

Real-time systems are not about receiving data.

They are about handling failure conditions reliably, so users never notice them.

1 Comment

2 votes

More Posts

From Spaghetti to Structure: Why I Migrated My Production Flutter App to Clean Architecture

Lordhacker756verified - Mar 31

How I Built a React Portfolio in 7 Days That Landed ₹1.2L in Freelance Work

Dharanidharan - Feb 9

Local MongoDB like database in flutter

Somen Das - May 5, 2025

How a Bug Report Led Me to Give Flutter Developers Full Control of Crisp Chat Modals on iOS

alamin.karno - Mar 15

Flutter Performance Optimization 2026 (Make Your App 10x Faster + Best Practices)

techwithsam - Mar 22
chevron_left

Related Jobs

View all jobs →

Commenters (This Week)

1 comment
1 comment

Contribute meaningful comments to climb the leaderboard and earn badges!