Real‑Time NgRx for Angular 20+: WebSocket Streams, Optimistic Updates, and Typed Actions for Telemetry Dashboards

Real‑Time NgRx for Angular 20+: WebSocket Streams, Optimistic Updates, and Typed Actions for Telemetry Dashboards

From Charter ad analytics to United kiosks, here’s the NgRx pattern I ship for real‑time streams without jitter, dropped frames, or double‑writes.

If your dashboard jitters, your state graph is lying to you.
Back to all posts

I’ve shipped real-time dashboards where a jittery chart can cost money or trust. At a leading telecom provider, we pushed ad-delivery events at volume; at a major airline, kiosks needed offline-tolerant flows with hardware in the loop. The pattern below is how I stabilize NgRx for WebSocket telemetry in Angular 20+ without blowing up UX or SSR.

As companies plan 2025 Angular roadmaps (Angular 21 beta on deck), teams ask me to rescue WebSocket-driven dashboards that leak subscriptions, drop events, or double-write state. If you need an Angular expert to harden your stream pipeline, this is the playbook I run.

Why Real-Time Angular Needs Typed NgRx in 2025

Where real-time breaks first

I see three pain points when I’m brought in as an Angular consultant: streams without backpressure, optimistic updates without ACK correlation, and selectors that force full-list re-renders. Add multi-tenant filtering and you’ve got duplicate data and jittering charts.

  • Unbounded streams flood change detection

  • ACK handling isn’t idempotent

  • Selectors thrash lists/charts

NgRx + Signals is the sweet spot

Keep your stream logic in NgRx Effects (Observables are perfect here) and expose state to components with store.selectSignal. Use SignalStore locally for view-only flags (panel open, filters) while NgRx owns cross-app telemetry.

  • Effects handle streams

  • Signals power views

Architecture: WebSocket Effects, Entity Upserts, ACK Correlation, and Signals

// telemetry.models.ts
export type TelemetryEvent =
  | { type: 'metric'; id: string; ts: number; name: string; value: number; tenantId: string }
  | { type: 'status'; id: string; ts: number; state: 'online' | 'offline' | 'error'; tenantId: string }
  | { type: 'ack'; correlationId: string; ok: boolean; error?: string };

export interface Command {
  kind: 'restart' | 'calibrate' | 'throttle';
  targetId: string;
}

Data flow at a glance

  • Inbound telemetry is parsed and routed by Effects into typed NgRx actions. EntityAdapter performs upsertOne to keep lists stable.

  • Outbound commands are optimistic: we write intent to state, send to the socket with a correlationId, and confirm/rollback on ACK.

  • Components consume with selectSignal and PrimeNG virtualization to avoid reflow storms.

  • WebSocket → Effect → typed action → reducer (Entity upsert) → selectSignal

Typed event schema

Use discriminated unions for message types. Your effects become a simple switch on event.type and TypeScript enforces exhaustiveness.

WebSocket Service with Exponential Backoff (Reset on Success)

// telemetry.socket.ts
import { Injectable, inject, PLATFORM_ID } from '@angular/core';
import { isPlatformBrowser } from '@angular/common';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable, EMPTY, timer } from 'rxjs';
import { shareReplay, retry, tap } from 'rxjs/operators';
import { TelemetryEvent } from './telemetry.models';

@Injectable({ providedIn: 'root' })
export class TelemetrySocket {
  private socket$?: WebSocketSubject<TelemetryEvent>;
  private platformId = inject(PLATFORM_ID);

  connect(url: string): Observable<TelemetryEvent> {
    if (!isPlatformBrowser(this.platformId)) return EMPTY;

    this.socket$ = webSocket<TelemetryEvent>({
      url,
      deserializer: e => JSON.parse(e.data),
    });

    return this.socket$.pipe(
      // retry with exponential backoff; resets after any success
      retry({
        delay: (err, retryCount) => timer(Math.min(500 * 2 ** (retryCount - 1), 10_000)),
        resetOnSuccess: true,
      }),
      shareReplay({ bufferSize: 1, refCount: true })
    );
  }

  send(msg: unknown) { this.socket$?.next(msg as any); }
  close() { this.socket$?.complete(); }
}

Stable reconnects beat infinite retries

At a broadcast media network, a naive retry caused connection storms during a broker restart. Backoff with reset-on-success fixed it.

  • Backoff 0.5s → 10s

  • Reset on any successful message

Browser-only guards

Guard connects with isPlatformBrowser and feature flags (Firebase Remote Config) so SSR passes don’t attempt sockets.

  • Avoid SSR connects

  • Feature-flag URL

Typed Actions and Effects for Telemetry (NgRx 17+)

// telemetry.actions.ts
import { createActionGroup, props, emptyProps } from '@ngrx/store';
import { TelemetryEvent, Command } from './telemetry.models';

export const TelemetryActions = createActionGroup({
  source: 'Telemetry',
  events: {
    'Connect': props<{ url: string }>(),
    'Connected': emptyProps(),
    'Disconnect': emptyProps(),
    'Message Received': props<{ event: TelemetryEvent }>(),
    'Send Command': props<{ command: Command; correlationId: string }>(),
    'Ack Received': props<{ correlationId: string; ok: boolean; error?: string }>(),
    'Connection Error': props<{ error: unknown }>(),
  },
});

// telemetry.effects.ts
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { Injectable, inject } from '@angular/core';
import { map, switchMap, takeUntil, filter, tap, mergeMap, timer, take } from 'rxjs';
import { TelemetryActions } from './telemetry.actions';
import { TelemetrySocket } from './telemetry.socket';

@Injectable()
export class TelemetryEffects {
  private actions$ = inject(Actions);
  private socket = inject(TelemetrySocket);

  connect$ = createEffect(() =>
    this.actions$.pipe(
      ofType(TelemetryActions.connect),
      switchMap(({ url }) =>
        this.socket.connect(url).pipe(
          tap(() => {/* connection is alive */}),
          map(event => TelemetryActions.messageReceived({ event })),
          takeUntil(this.actions$.pipe(ofType(TelemetryActions.disconnect)))
        )
      )
    )
  );

  routeInbound$ = createEffect(() =>
    this.actions$.pipe(
      ofType(TelemetryActions.messageReceived),
      map(({ event }) => {
        if (event.type === 'ack') {
          return TelemetryActions.ackReceived({ correlationId: event.correlationId, ok: event.ok, error: event.error });
        }
        // fall-through: handle metric/status via reducer upserts
        return { type: '[Telemetry] Upsert From Event', event } as const;
      })
    )
  );

  sendCommand$ = createEffect(() =>
    this.actions$.pipe(
      ofType(TelemetryActions.sendCommand),
      tap(({ command, correlationId }) => this.socket.send({ ...command, correlationId })),
      mergeMap(({ correlationId }) =>
        // timeout in 5s if no ACK; rollback path
        timer(5000).pipe(
          takeUntil(
            this.actions$.pipe(
              ofType(TelemetryActions.ackReceived),
              filter(a => a.correlationId === correlationId),
              take(1)
            )
          ),
          map(() => TelemetryActions.ackReceived({ correlationId, ok: false, error: 'timeout' }))
        )
      )
    )
  );
}

Action groups keep intent clear

Create an action group for connection lifecycle, inbound messages, optimistic commands, and ACKs.

Effect routing by event.type

Use a single connection effect to map inbound JSON to strongly-typed actions; branch by discriminant.

Reducers: Entity Upserts and Optimistic Pending Map

// telemetry.reducer.ts
import { createReducer, on } from '@ngrx/store';
import { createEntityAdapter, EntityState } from '@ngrx/entity';
import { TelemetryActions } from './telemetry.actions';
import { TelemetryEvent, Command } from './telemetry.models';

export interface DeviceMetric { id: string; name: string; value: number; tenantId: string; ts: number; }
export interface PendingCommand { command: Command; ts: number }

export interface TelemetryState extends EntityState<DeviceMetric> {
  pending: Record<string, PendingCommand>;
}

export const adapter = createEntityAdapter<DeviceMetric>({ selectId: m => m.id });
const initialState: TelemetryState = adapter.getInitialState({ pending: {} });

function eventToMetric(e: TelemetryEvent): DeviceMetric | null {
  if (e.type !== 'metric') return null;
  return { id: e.id, name: e.name, value: e.value, tenantId: e.tenantId, ts: e.ts };
}

export const telemetryReducer = createReducer(
  initialState,
  on({ type: '[Telemetry] Upsert From Event' } as any, (state, { event }) => {
    const metric = eventToMetric(event as TelemetryEvent);
    return metric ? adapter.upsertOne(metric, state) : state;
  }),
  on(TelemetryActions.sendCommand, (state, { correlationId, command }) => ({
    ...state,
    pending: { ...state.pending, [correlationId]: { command, ts: Date.now() } },
  })),
  on(TelemetryActions.ackReceived, (state, { correlationId, ok }) => {
    const { [correlationId]: _drop, ...pending } = state.pending;
    // Optionally apply server-confirmed state changes when ok===true
    return { ...state, pending };
  })
);

export const { selectAll: selectAllMetrics } = adapter.getSelectors();

EntityAdapter keeps lists stable

Use EntityAdapter to upsert incoming metrics; reference stability removes table/chart jitter.

  • No array churn

  • O(1) upserts

Pending map by correlationId

Store pending commands keyed by correlationId; drop them on ACK. If a duplicate ACK arrives, it’s a no-op.

  • Rollback on NACK/timeout

  • Idempotent ACKs

Bridging NgRx to Signals and PrimeNG Virtual Tables

// telemetry.selectors.ts
import { createSelector } from '@ngrx/store';
import { selectAllMetrics } from './telemetry.reducer';

export const selectMetricsByTenant = (tenantId: string) =>
  createSelector(selectAllMetrics, list => list.filter(m => m.tenantId === tenantId));
// telemetry.component.ts
import { Component, computed, inject, signal } from '@angular/core';
import { Store } from '@ngrx/store';
import { selectMetricsByTenant } from './telemetry.selectors';

@Component({ selector: 'telemetry-table', templateUrl: './telemetry.component.html' })
export class TelemetryComponent {
  private store = inject(Store);
  tenantId = signal('acme');

  metrics = this.store.selectSignal(selectMetricsByTenant(this.tenantId()));
  count = computed(() => this.metrics().length);
}
<!-- telemetry.component.html -->
<p-table
  [value]="metrics()"
  [virtualScroll]="true"
  [rows]="50"
  [scrollHeight]="'60vh'"
  [trackBy]="(i, row) => row.id"
  styleClass="p-datatable-sm"
>
  <ng-template pTemplate="header">
    <tr><th>Device</th><th>Metric</th><th>Value</th><th>Time</th></tr>
  </ng-template>
  <ng-template pTemplate="body" let-row>
    <tr>
      <td>{{ row.id }}</td>
      <td>{{ row.name }}</td>
      <td>{{ row.value | number:'1.0-2' }}</td>
      <td>{{ row.ts | date:'shortTime' }}</td>
    </tr>
  </ng-template>
</p-table>

Signal selectors for components

Expose derived state with store.selectSignal; compute lightweight counts with computed to keep CD minimal.

PrimeNG data virtualization

Virtual scrolling + trackBy eliminates full-list re-renders during bursts.

  • Prevent reflow

  • TrackBy id

Multi‑Tenant Guards and Offline‑Tolerant UX

Tenant isolation

At a global entertainment company and Charter, we isolate selectors by tenantId and load slices on demand with feature keys.

  • Per-tenant selectors

  • No cross-tenant leaks

Kiosk-friendly flows

for a major airline’s airport kiosks, we queued commands locally when the broker flapped, replayed on reconnect, and simulated peripherals (card readers, printers, scanners) via Docker to test flows without hardware.

  • Outbox queue

  • Hardware simulation

Testing and CI Guardrails for Streams

# .github/workflows/ci.yml
name: ci
on: [pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: pnpm/action-setup@v3
        with: { version: 9 }
      - run: pnpm install --frozen-lockfile
      - run: pnpm nx run-many -t lint,test --configuration=ci
      - run: pnpm nx e2e dashboard-e2e --configuration=ci

Marble tests for effects

Use jasmine-marbles to assert timeout rollbacks and reconnection delays. Deterministic tests catch flapping behavior before prod.

  • ACK timeout path

  • Reconnect logic

CI that fails fast

In Nx workspaces, block merges on effect tests and a WebSocket smoke e2e. Capture metrics with Firebase Analytics/GA4 and Sentry/OpenTelemetry.

  • Unit + e2e on PR

  • Telemetry smoke

When to Hire an Angular Developer for Real‑Time Dashboards

Bring in help if you see

I typically stabilize stream-heavy apps in 2–4 weeks: typed actions/effects, optimistic ACK correlation, Signal-based selectors, and CI guardrails. If you need a remote Angular contractor with a global entertainment company/United/Charter experience, I’m available for targeted engagements.

  • Charts jitter >1/sec

  • Duplicate or out‑of‑order events

  • ACKs never clearing pending

  • Memory growth over time

Example Instrumentation and Performance Tuning

Batch UI updates

For high-frequency metrics (telematics), batch chart updates inside requestAnimationFrame; update Entity on every message, but render charts at 16ms. D3/Highcharts handle windowed series efficiently.

  • rAF throttling

  • Windowed charts

Measure what matters

Track frame stability and memory; instrument event throughput and ACK latency. In IntegrityLens (12k+ interviews processed), we keep 99.98% uptime by watching throughput + error budgets.

  • Angular DevTools flame charts

  • Core Web Vitals

Related Resources

Key takeaways

  • Model WebSocket messages with discriminated unions and typed actions/effects for compile-time safety.
  • Use NgRx Entity + correlationIds for optimistic updates and idempotent ACK handling.
  • Bridge NgRx to Angular Signals via selectSignal; keep effects Observable-based for streams.
  • Implement exponential backoff, jitter, and “reset on success” reconnect for stable telemetry.
  • Gate WebSockets to browser-only and batch UI updates to avoid dashboard jitter.

Implementation checklist

  • Define a typed TelemetryEvent union and action group for connect/message/ack/error.
  • Implement a WebSocket service with exponential backoff + reset-on-success.
  • Create NgRx Effects for connect, message routing, ACK, and timeout rollback.
  • Use EntityAdapter for real-time upserts; track pending commands by correlationId.
  • Bridge state to components with store.selectSignal and PrimeNG virtualization.
  • Guard SSR by gating WebSocket connects behind isPlatformBrowser.
  • Add marble tests for effects and a CI job to run them on every PR.
  • Instrument with Angular DevTools, Firebase Analytics/GA4, and OpenTelemetry for errors.

Questions we hear from teams

How long does it take to stabilize a real-time Angular dashboard?
Typical rescue engagements run 2–4 weeks: assess streams, add typed actions/effects, implement optimistic ACKs, and wire Signal selectors with virtualization. Larger refactors or multi-tenant isolation add 2–4 weeks.
Do I need Signals if I’m using NgRx?
Yes—for components. Keep streams in NgRx Effects and expose state with store.selectSignal. Use SignalStore for local UI flags. This hybrid plays nicely with Angular 20 change detection.
How much does it cost to hire an Angular developer for this work?
It depends on scope and team size. I offer fixed-scope audits and weekly retainers. Most teams see ROI quickly by eliminating jitter, duplicate writes, and on-call incidents. Contact me for a tailored estimate.
Will this work with SSR or Firebase Hosting?
Yes. Gate WebSockets to isPlatformBrowser and feature-flag connection URLs (Firebase Remote Config). SSR renders deterministically; sockets attach only in the browser.
How do you test WebSockets and effects?
Marble tests cover ACK timeouts and reconnection backoff. Cypress e2e stubs the socket server for deterministic flows. CI blocks merges until tests and telemetry smoke checks pass.

Ready to level up your Angular experience?

Let AngularUX review your Signals roadmap, design system, or SSR deployment plan.

Hire Matthew — Remote Angular Expert, Available Now Rescue a Chaotic Codebase with gitPlumbers

NG Wave

Angular Component Library

A comprehensive collection of 110+ animated, interactive, and customizable Angular components. Converted from React Bits with full feature parity, built with Angular Signals, GSAP animations, and Three.js for stunning visual effects.

Explore Components
NG Wave Component Library

Related resources