Appearance
Multicasting Async Iterables
Share values from a single async iterable source across multiple consumers. Async iterables are single-consumer by default — once a value is read, it's gone. Multicasting fans out each value to every active subscriber.
When to Use
- Broadcasting events to multiple independent handlers
- Sharing a single WebSocket or SSE connection with multiple consumers
- Implementing pub/sub patterns with async iterables
- Avoiding duplicate work when multiple consumers need the same stream
The Pattern
Shared WebSocket connection
javascript
const messages = fromEvent(ws, "message");
const shared = multicast(messages);
// Chat display
for await (const msg of shared.subscribe()) {
appendMessage(msg.data);
}javascript
// Notification handler (separate consumer, same source)
for await (const msg of shared.subscribe()) {
if (isNotification(msg.data)) {
showNotification(msg.data);
}
}Shared server-sent events with replay
javascript
const events = fromEvent(eventSource, "message");
const shared = multicastReplay(events, 5);
// Late subscriber gets the last 5 events immediately,
// then receives new events as they arrive
for await (const event of shared.subscribe()) {
processEvent(event);
}Multicast wrapper using Repeater
Create a multicast() function that maintains a set of subscriber push functions and fans out each source value:
javascript
import { Repeater } from "@repeaterjs/repeater";
function multicast(source) {
const subscribers = new Set();
let started = false;
function start() {
if (started) return;
started = true;
(async () => {
for await (const value of source) {
for (const push of subscribers) {
push(value);
}
}
})();
}
return {
subscribe() {
return new Repeater(async (push, stop) => {
subscribers.add(push);
start();
await stop;
subscribers.delete(push);
});
},
};
}How it works:
- Single source consumption — the source async iterable is iterated once, regardless of how many subscribers exist.
- Lazy start — iteration begins on the first
subscribe()call. - Fan-out — each value from the source is pushed to every active subscriber's
Repeater. - Cleanup — when a subscriber breaks out of its
for awaitloop, itspushfunction is removed from the set.
Multiple consumers
javascript
const source = fromEvent(socket, "message");
const shared = multicast(source);
// Consumer 1
(async () => {
for await (const msg of shared.subscribe()) {
updateUI(msg);
}
})();
// Consumer 2
(async () => {
for await (const msg of shared.subscribe()) {
logToAnalytics(msg);
}
})();With late subscribers and replay
To let late subscribers receive the most recent value immediately, add a replay buffer:
javascript
function multicastReplay(source, bufferSize = 1) {
const subscribers = new Set();
const buffer = [];
let started = false;
function start() {
if (started) return;
started = true;
(async () => {
for await (const value of source) {
buffer.push(value);
if (buffer.length > bufferSize) {
buffer.shift();
}
for (const push of subscribers) {
push(value);
}
}
})();
}
return {
subscribe() {
return new Repeater(async (push, stop) => {
for (const value of buffer) {
push(value);
}
subscribers.add(push);
start();
await stop;
subscribers.delete(push);
});
},
};
}Key differences from basic multicast:
- Replay buffer — new subscribers immediately receive the last
bufferSizevalues. buffer.shift()— keeps the buffer bounded to prevent memory leaks.
Trade-offs
| Approach | Pros | Cons |
|---|---|---|
multicast + Repeater | Simple, no deps beyond Repeater | No replay, no backpressure |
multicastReplay | Late subscriber support | Unbounded if not capped |
RxJS share / shareReplay | Battle-tested, rich operators | Large bundle |
| Manual listener registry | No dependency | Verbose, leak-prone |
Backpressure note: push() without await means values are not backpressured per subscriber. If one consumer is slow, it accumulates a queue. For most multicast use cases this is acceptable — backpressure across independent consumers is rarely desirable. If needed, use SlidingBuffer(1) on each subscriber's Repeater to drop stale values for slow consumers.