
Real‑Time NgRx for Angular 20+: WebSocket Streams, Optimistic Updates, and Typed Actions for Telemetry Dashboards
From Charter ad analytics to United kiosks, here’s the NgRx pattern I ship for real‑time streams without jitter, dropped frames, or double‑writes.
If your dashboard jitters, your state graph is lying to you.Back to all posts
I’ve shipped real-time dashboards where a jittery chart can cost money or trust. At a leading telecom provider, we pushed ad-delivery events at volume; at a major airline, kiosks needed offline-tolerant flows with hardware in the loop. The pattern below is how I stabilize NgRx for WebSocket telemetry in Angular 20+ without blowing up UX or SSR.
As companies plan 2025 Angular roadmaps (Angular 21 beta on deck), teams ask me to rescue WebSocket-driven dashboards that leak subscriptions, drop events, or double-write state. If you need an Angular expert to harden your stream pipeline, this is the playbook I run.
Why Real-Time Angular Needs Typed NgRx in 2025
Where real-time breaks first
I see three pain points when I’m brought in as an Angular consultant: streams without backpressure, optimistic updates without ACK correlation, and selectors that force full-list re-renders. Add multi-tenant filtering and you’ve got duplicate data and jittering charts.
Unbounded streams flood change detection
ACK handling isn’t idempotent
Selectors thrash lists/charts
NgRx + Signals is the sweet spot
Keep your stream logic in NgRx Effects (Observables are perfect here) and expose state to components with store.selectSignal. Use SignalStore locally for view-only flags (panel open, filters) while NgRx owns cross-app telemetry.
Effects handle streams
Signals power views
Architecture: WebSocket Effects, Entity Upserts, ACK Correlation, and Signals
// telemetry.models.ts
export type TelemetryEvent =
| { type: 'metric'; id: string; ts: number; name: string; value: number; tenantId: string }
| { type: 'status'; id: string; ts: number; state: 'online' | 'offline' | 'error'; tenantId: string }
| { type: 'ack'; correlationId: string; ok: boolean; error?: string };
export interface Command {
kind: 'restart' | 'calibrate' | 'throttle';
targetId: string;
}Data flow at a glance
- Inbound telemetry is parsed and routed by Effects into typed NgRx actions. EntityAdapter performs upsertOne to keep lists stable.
- Outbound commands are optimistic: we write intent to state, send to the socket with a correlationId, and confirm/rollback on ACK.
- Components consume with selectSignal and PrimeNG virtualization to avoid reflow storms.
WebSocket → Effect → typed action → reducer (Entity upsert) → selectSignal
Typed event schema
Use discriminated unions for message types. Your effects become a simple switch on event.type and TypeScript enforces exhaustiveness.
WebSocket Service with Exponential Backoff (Reset on Success)
// telemetry.socket.ts
import { Injectable, inject, PLATFORM_ID } from '@angular/core';
import { isPlatformBrowser } from '@angular/common';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable, EMPTY, timer } from 'rxjs';
import { shareReplay, retry, tap } from 'rxjs/operators';
import { TelemetryEvent } from './telemetry.models';
@Injectable({ providedIn: 'root' })
export class TelemetrySocket {
private socket$?: WebSocketSubject<TelemetryEvent>;
private platformId = inject(PLATFORM_ID);
connect(url: string): Observable<TelemetryEvent> {
if (!isPlatformBrowser(this.platformId)) return EMPTY;
this.socket$ = webSocket<TelemetryEvent>({
url,
deserializer: e => JSON.parse(e.data),
});
return this.socket$.pipe(
// retry with exponential backoff; resets after any success
retry({
delay: (err, retryCount) => timer(Math.min(500 * 2 ** (retryCount - 1), 10_000)),
resetOnSuccess: true,
}),
shareReplay({ bufferSize: 1, refCount: true })
);
}
send(msg: unknown) { this.socket$?.next(msg as any); }
close() { this.socket$?.complete(); }
}Stable reconnects beat infinite retries
At a broadcast media network, a naive retry caused connection storms during a broker restart. Backoff with reset-on-success fixed it.
Backoff 0.5s → 10s
Reset on any successful message
Browser-only guards
Guard connects with isPlatformBrowser and feature flags (Firebase Remote Config) so SSR passes don’t attempt sockets.
Avoid SSR connects
Feature-flag URL
Typed Actions and Effects for Telemetry (NgRx 17+)
// telemetry.actions.ts
import { createActionGroup, props, emptyProps } from '@ngrx/store';
import { TelemetryEvent, Command } from './telemetry.models';
export const TelemetryActions = createActionGroup({
source: 'Telemetry',
events: {
'Connect': props<{ url: string }>(),
'Connected': emptyProps(),
'Disconnect': emptyProps(),
'Message Received': props<{ event: TelemetryEvent }>(),
'Send Command': props<{ command: Command; correlationId: string }>(),
'Ack Received': props<{ correlationId: string; ok: boolean; error?: string }>(),
'Connection Error': props<{ error: unknown }>(),
},
});
// telemetry.effects.ts
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Injectable, inject } from '@angular/core';
import { map, switchMap, takeUntil, filter, tap, mergeMap, timer, take } from 'rxjs';
import { TelemetryActions } from './telemetry.actions';
import { TelemetrySocket } from './telemetry.socket';
@Injectable()
export class TelemetryEffects {
private actions$ = inject(Actions);
private socket = inject(TelemetrySocket);
connect$ = createEffect(() =>
this.actions$.pipe(
ofType(TelemetryActions.connect),
switchMap(({ url }) =>
this.socket.connect(url).pipe(
tap(() => {/* connection is alive */}),
map(event => TelemetryActions.messageReceived({ event })),
takeUntil(this.actions$.pipe(ofType(TelemetryActions.disconnect)))
)
)
)
);
routeInbound$ = createEffect(() =>
this.actions$.pipe(
ofType(TelemetryActions.messageReceived),
map(({ event }) => {
if (event.type === 'ack') {
return TelemetryActions.ackReceived({ correlationId: event.correlationId, ok: event.ok, error: event.error });
}
// fall-through: handle metric/status via reducer upserts
return { type: '[Telemetry] Upsert From Event', event } as const;
})
)
);
sendCommand$ = createEffect(() =>
this.actions$.pipe(
ofType(TelemetryActions.sendCommand),
tap(({ command, correlationId }) => this.socket.send({ ...command, correlationId })),
mergeMap(({ correlationId }) =>
// timeout in 5s if no ACK; rollback path
timer(5000).pipe(
takeUntil(
this.actions$.pipe(
ofType(TelemetryActions.ackReceived),
filter(a => a.correlationId === correlationId),
take(1)
)
),
map(() => TelemetryActions.ackReceived({ correlationId, ok: false, error: 'timeout' }))
)
)
)
);
}Action groups keep intent clear
Create an action group for connection lifecycle, inbound messages, optimistic commands, and ACKs.
Effect routing by event.type
Use a single connection effect to map inbound JSON to strongly-typed actions; branch by discriminant.
Reducers: Entity Upserts and Optimistic Pending Map
// telemetry.reducer.ts
import { createReducer, on } from '@ngrx/store';
import { createEntityAdapter, EntityState } from '@ngrx/entity';
import { TelemetryActions } from './telemetry.actions';
import { TelemetryEvent, Command } from './telemetry.models';
export interface DeviceMetric { id: string; name: string; value: number; tenantId: string; ts: number; }
export interface PendingCommand { command: Command; ts: number }
export interface TelemetryState extends EntityState<DeviceMetric> {
pending: Record<string, PendingCommand>;
}
export const adapter = createEntityAdapter<DeviceMetric>({ selectId: m => m.id });
const initialState: TelemetryState = adapter.getInitialState({ pending: {} });
function eventToMetric(e: TelemetryEvent): DeviceMetric | null {
if (e.type !== 'metric') return null;
return { id: e.id, name: e.name, value: e.value, tenantId: e.tenantId, ts: e.ts };
}
export const telemetryReducer = createReducer(
initialState,
on({ type: '[Telemetry] Upsert From Event' } as any, (state, { event }) => {
const metric = eventToMetric(event as TelemetryEvent);
return metric ? adapter.upsertOne(metric, state) : state;
}),
on(TelemetryActions.sendCommand, (state, { correlationId, command }) => ({
...state,
pending: { ...state.pending, [correlationId]: { command, ts: Date.now() } },
})),
on(TelemetryActions.ackReceived, (state, { correlationId, ok }) => {
const { [correlationId]: _drop, ...pending } = state.pending;
// Optionally apply server-confirmed state changes when ok===true
return { ...state, pending };
})
);
export const { selectAll: selectAllMetrics } = adapter.getSelectors();EntityAdapter keeps lists stable
Use EntityAdapter to upsert incoming metrics; reference stability removes table/chart jitter.
No array churn
O(1) upserts
Pending map by correlationId
Store pending commands keyed by correlationId; drop them on ACK. If a duplicate ACK arrives, it’s a no-op.
Rollback on NACK/timeout
Idempotent ACKs
Bridging NgRx to Signals and PrimeNG Virtual Tables
// telemetry.selectors.ts
import { createSelector } from '@ngrx/store';
import { selectAllMetrics } from './telemetry.reducer';
export const selectMetricsByTenant = (tenantId: string) =>
createSelector(selectAllMetrics, list => list.filter(m => m.tenantId === tenantId));// telemetry.component.ts
import { Component, computed, inject, signal } from '@angular/core';
import { Store } from '@ngrx/store';
import { selectMetricsByTenant } from './telemetry.selectors';
@Component({ selector: 'telemetry-table', templateUrl: './telemetry.component.html' })
export class TelemetryComponent {
private store = inject(Store);
tenantId = signal('acme');
metrics = this.store.selectSignal(selectMetricsByTenant(this.tenantId()));
count = computed(() => this.metrics().length);
}<!-- telemetry.component.html -->
<p-table
[value]="metrics()"
[virtualScroll]="true"
[rows]="50"
[scrollHeight]="'60vh'"
[trackBy]="(i, row) => row.id"
styleClass="p-datatable-sm"
>
<ng-template pTemplate="header">
<tr><th>Device</th><th>Metric</th><th>Value</th><th>Time</th></tr>
</ng-template>
<ng-template pTemplate="body" let-row>
<tr>
<td>{{ row.id }}</td>
<td>{{ row.name }}</td>
<td>{{ row.value | number:'1.0-2' }}</td>
<td>{{ row.ts | date:'shortTime' }}</td>
</tr>
</ng-template>
</p-table>Signal selectors for components
Expose derived state with store.selectSignal; compute lightweight counts with computed to keep CD minimal.
PrimeNG data virtualization
Virtual scrolling + trackBy eliminates full-list re-renders during bursts.
Prevent reflow
TrackBy id
Multi‑Tenant Guards and Offline‑Tolerant UX
Tenant isolation
At a global entertainment company and Charter, we isolate selectors by tenantId and load slices on demand with feature keys.
Per-tenant selectors
No cross-tenant leaks
Kiosk-friendly flows
for a major airline’s airport kiosks, we queued commands locally when the broker flapped, replayed on reconnect, and simulated peripherals (card readers, printers, scanners) via Docker to test flows without hardware.
Outbox queue
Hardware simulation
Testing and CI Guardrails for Streams
# .github/workflows/ci.yml
name: ci
on: [pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: pnpm/action-setup@v3
with: { version: 9 }
- run: pnpm install --frozen-lockfile
- run: pnpm nx run-many -t lint,test --configuration=ci
- run: pnpm nx e2e dashboard-e2e --configuration=ciMarble tests for effects
Use jasmine-marbles to assert timeout rollbacks and reconnection delays. Deterministic tests catch flapping behavior before prod.
ACK timeout path
Reconnect logic
CI that fails fast
In Nx workspaces, block merges on effect tests and a WebSocket smoke e2e. Capture metrics with Firebase Analytics/GA4 and Sentry/OpenTelemetry.
Unit + e2e on PR
Telemetry smoke
When to Hire an Angular Developer for Real‑Time Dashboards
Bring in help if you see
I typically stabilize stream-heavy apps in 2–4 weeks: typed actions/effects, optimistic ACK correlation, Signal-based selectors, and CI guardrails. If you need a remote Angular contractor with a global entertainment company/United/Charter experience, I’m available for targeted engagements.
Charts jitter >1/sec
Duplicate or out‑of‑order events
ACKs never clearing pending
Memory growth over time
Example Instrumentation and Performance Tuning
Batch UI updates
For high-frequency metrics (telematics), batch chart updates inside requestAnimationFrame; update Entity on every message, but render charts at 16ms. D3/Highcharts handle windowed series efficiently.
rAF throttling
Windowed charts
Measure what matters
Track frame stability and memory; instrument event throughput and ACK latency. In IntegrityLens (12k+ interviews processed), we keep 99.98% uptime by watching throughput + error budgets.
Angular DevTools flame charts
Core Web Vitals
Key takeaways
- Model WebSocket messages with discriminated unions and typed actions/effects for compile-time safety.
- Use NgRx Entity + correlationIds for optimistic updates and idempotent ACK handling.
- Bridge NgRx to Angular Signals via selectSignal; keep effects Observable-based for streams.
- Implement exponential backoff, jitter, and “reset on success” reconnect for stable telemetry.
- Gate WebSockets to browser-only and batch UI updates to avoid dashboard jitter.
Implementation checklist
- Define a typed TelemetryEvent union and action group for connect/message/ack/error.
- Implement a WebSocket service with exponential backoff + reset-on-success.
- Create NgRx Effects for connect, message routing, ACK, and timeout rollback.
- Use EntityAdapter for real-time upserts; track pending commands by correlationId.
- Bridge state to components with store.selectSignal and PrimeNG virtualization.
- Guard SSR by gating WebSocket connects behind isPlatformBrowser.
- Add marble tests for effects and a CI job to run them on every PR.
- Instrument with Angular DevTools, Firebase Analytics/GA4, and OpenTelemetry for errors.
Questions we hear from teams
- How long does it take to stabilize a real-time Angular dashboard?
- Typical rescue engagements run 2–4 weeks: assess streams, add typed actions/effects, implement optimistic ACKs, and wire Signal selectors with virtualization. Larger refactors or multi-tenant isolation add 2–4 weeks.
- Do I need Signals if I’m using NgRx?
- Yes—for components. Keep streams in NgRx Effects and expose state with store.selectSignal. Use SignalStore for local UI flags. This hybrid plays nicely with Angular 20 change detection.
- How much does it cost to hire an Angular developer for this work?
- It depends on scope and team size. I offer fixed-scope audits and weekly retainers. Most teams see ROI quickly by eliminating jitter, duplicate writes, and on-call incidents. Contact me for a tailored estimate.
- Will this work with SSR or Firebase Hosting?
- Yes. Gate WebSockets to isPlatformBrowser and feature-flag connection URLs (Firebase Remote Config). SSR renders deterministically; sockets attach only in the browser.
- How do you test WebSockets and effects?
- Marble tests cover ACK timeouts and reconnection backoff. Cypress e2e stubs the socket server for deterministic flows. CI blocks merges until tests and telemetry smoke checks pass.
Ready to level up your Angular experience?
Let AngularUX review your Signals roadmap, design system, or SSR deployment plan.
NG Wave
Angular Component Library
A comprehensive collection of 110+ animated, interactive, and customizable Angular components. Converted from React Bits with full feature parity, built with Angular Signals, GSAP animations, and Three.js for stunning visual effects.
Explore Components