Skip to content

Multicasting Async Iterables

Share values from a single async iterable source across multiple consumers. Async iterables are single-consumer by default — once a value is read, it's gone. Multicasting fans out each value to every active subscriber.

When to Use

  • Broadcasting events to multiple independent handlers
  • Sharing a single WebSocket or SSE connection with multiple consumers
  • Implementing pub/sub patterns with async iterables
  • Avoiding duplicate work when multiple consumers need the same stream

The Pattern

Shared WebSocket connection

javascript
const messages = fromEvent(ws, "message");
const shared = multicast(messages);

// Chat display
for await (const msg of shared.subscribe()) {
  appendMessage(msg.data);
}
javascript
// Notification handler (separate consumer, same source)
for await (const msg of shared.subscribe()) {
  if (isNotification(msg.data)) {
    showNotification(msg.data);
  }
}

Shared server-sent events with replay

javascript
const events = fromEvent(eventSource, "message");
const shared = multicastReplay(events, 5);

// Late subscriber gets the last 5 events immediately,
// then receives new events as they arrive
for await (const event of shared.subscribe()) {
  processEvent(event);
}

Multicast wrapper using Repeater

Create a multicast() function that maintains a set of subscriber push functions and fans out each source value:

javascript
import { Repeater } from "@repeaterjs/repeater";

function multicast(source) {
  const subscribers = new Set();
  let started = false;
  
  function start() {
    if (started) return;
    started = true;
    
    (async () => {
      for await (const value of source) {
        for (const push of subscribers) {
          push(value);
        }
      }
    })();
  }
  
  return {
    subscribe() {
      return new Repeater(async (push, stop) => {
        subscribers.add(push);
        start();
        await stop;
        subscribers.delete(push);
      });
    },
  };
}

How it works:

  1. Single source consumption — the source async iterable is iterated once, regardless of how many subscribers exist.
  2. Lazy start — iteration begins on the first subscribe() call.
  3. Fan-out — each value from the source is pushed to every active subscriber's Repeater.
  4. Cleanup — when a subscriber breaks out of its for await loop, its push function is removed from the set.

Multiple consumers

javascript
const source = fromEvent(socket, "message");
const shared = multicast(source);

// Consumer 1
(async () => {
  for await (const msg of shared.subscribe()) {
    updateUI(msg);
  }
})();

// Consumer 2
(async () => {
  for await (const msg of shared.subscribe()) {
    logToAnalytics(msg);
  }
})();

With late subscribers and replay

To let late subscribers receive the most recent value immediately, add a replay buffer:

javascript
function multicastReplay(source, bufferSize = 1) {
  const subscribers = new Set();
  const buffer = [];
  let started = false;
  
  function start() {
    if (started) return;
    started = true;
    
    (async () => {
      for await (const value of source) {
        buffer.push(value);
        if (buffer.length > bufferSize) {
          buffer.shift();
        }
        for (const push of subscribers) {
          push(value);
        }
      }
    })();
  }
  
  return {
    subscribe() {
      return new Repeater(async (push, stop) => {
        for (const value of buffer) {
          push(value);
        }
        subscribers.add(push);
        start();
        await stop;
        subscribers.delete(push);
      });
    },
  };
}

Key differences from basic multicast:

  • Replay buffer — new subscribers immediately receive the last bufferSize values.
  • buffer.shift() — keeps the buffer bounded to prevent memory leaks.

Trade-offs

ApproachProsCons
multicast + RepeaterSimple, no deps beyond RepeaterNo replay, no backpressure
multicastReplayLate subscriber supportUnbounded if not capped
RxJS share / shareReplayBattle-tested, rich operatorsLarge bundle
Manual listener registryNo dependencyVerbose, leak-prone

Backpressure note: push() without await means values are not backpressured per subscriber. If one consumer is slow, it accumulates a queue. For most multicast use cases this is acceptable — backpressure across independent consumers is rarely desirable. If needed, use SlidingBuffer(1) on each subscriber's Repeater to drop stale values for slow consumers.