Most tutorials show how to connect to a WebSocket.
Very few show what happens when:
- The network drops
- DNS fails
- The server stops sending data
- The stream silently hangs
This article focuses on handling those cases properly.
The Problem
A basic WebSocket implementation:
final channel = WebSocketChannel.connect(uri);
channel.stream.listen(...);
Works… until real-world conditions appear:
Network loss → no error
Stream freezes silently
App never reconnects
Users see stale data
The Goal
We want a stream that:
- reconnects automatically
- handles both errors and silent failures
- retries with exponential backoff
- does not crash the app
- integrates cleanly with BLoC
Step 1: Deferred Connection (Rx.defer)
Instead of creating the connection immediately, we defer it:
return Rx.defer(() async* {
final channel = _websocketService.connect();
await channel. ready;
yield* channel.stream;
});
This ensures that each retry creates a fresh connection.
Step 2: Handle Connection Errors
If the connection fails:
try {
final channel = _websocketService.connect();
} catch (e) {
yield* Stream.error(e);
}
Convert failures into stream errors so retry logic can handle them.
Step 3: Handle Silent Failures (Timeout)
WebSockets may stop emitting without throwing errors.
.timeout(
const Duration(seconds: 5),
onTimeout: (sink) {
sink.addError("Timeout");
},
)
This forces a retry when no data is received.
Step 4: Control Update Frequency
Real-time streams can overwhelm the UI.
.sampleTime(const Duration(seconds: 1))
This emits the latest value at a fixed interval.
Step 5: Retry Logic (RetryWhenStream)
RetryWhenStream(
streamFactory,
(error, _) {
final delay = Duration(seconds: (1 << attempt).clamp(1, 60));
attempt++;
return Rx.timer(null, delay);
},
);
This implements exponential backoff:
Retry after 1s → 2s → 4s → 8s → ...
Final Remote Data Source
Stream<AllMidsModel> connectToStream() {
var attempt = 0;
return RetryWhenStream(
() {
return Rx.defer(() async* {
try {
final channel = _websocketService.connect();
await channel.ready;
yield* channel.stream
.map((event) {
final json = jsonDecode(event as String);
if (json['channel'] == 'allMids') {
return AllMidsModel.fromJson(json['data']);
}
return null;
})
.whereType<AllMidsModel>()
.timeout(
const Duration(seconds: 5),
onTimeout: (sink) => sink.addError("Timeout"),
)
.sampleTime(const Duration(seconds: 1))
.doOnData((_) => attempt = 0);
} catch (e) {
yield* Stream.error(e);
}
});
},
(error, _) {
final delay = Duration(seconds: (1 << attempt).clamp(1, 60));
attempt++;
return Rx.timer(null, delay);
},
);
}
Clean Architecture Integration
Data Layer
Handles WebSocket connection, retry logic, and parsing. Returns a stream of models.
Domain Layer
abstract interface class PriceRepository {
Either<Failure, Stream<AllMids>> subscribe();
}
Responsible for:
- mapping models to entities
- converting exceptions into failures
Use Case
class SubscribeToStream implements StreamUseCase<AllMids, NoParams> {
final PriceRepository repo;
@override
call(_) => repo.subscribe();
}
Presentation Layer (BLoC)
Instead of passing streams to the UI, BLoC consumes the stream:
on<SubscribeToPriceStream>(
_onSubscribe,
transformer: restartable(),
);
await emit.forEach(
stream,
onData: (data) => PriceUpdated(data),
onError: (e, _) => PriceStreamError(e.toString()),
);
Why restartable()?
It ensures:
- new events cancel previous subscriptions
- No duplicate streams remain active
UI Layer
switch (state) {
case PriceUpdated(:final data):
return Text("${data.price}");
case PriceStreamError(:final msg):
return Text(msg);
case StartingPriceStream():
return CircularProgressIndicator();
}
The UI reacts to states only. It has no knowledge of streams.
What This Solves
- automatic reconnection
- silent failure handling
- controlled UI updates
- crash prevention
- clean separation of concerns
Key Takeaways
- WebSockets do not always fail with errors
- You must handle both errors and inactivity
- Retry logic must recreate the connection
- Streams should be owned by BLoC, not UI
- RxDart is powerful when used with intent
Final Thought
Real-time systems are not about receiving data.
They are about handling failure conditions reliably, so users never notice them.