Appearance
Async Iterators with Repeater
Build async iterables using Repeater from @repeaterjs/repeater. Repeater gives you a structured constructor for async iterables — supply a push function to emit values and a stop promise that resolves when the consumer is done.
When to Use
- Bridging callback or event-based APIs to
for await...of - Streaming data with backpressure (database cursors, file watchers, WebSocket messages)
- Combining multiple async streams into one
- Replacing manual
Symbol.asyncIteratorimplementations
The Pattern
One-shot timer
javascript
function timer(ms) {
return new Repeater((push, stop) => {
setTimeout(push, ms);
});
}Observable property
Convert a property with change events into an async iterable:
javascript
class Property extends EventEmitter {
#value;
get value() {
return this.#value;
}
set value(newValue) {
if (!equals(this.#value, newValue)) {
this.#value = newValue;
this.emit("change", newValue);
}
}
toRepeater(event = "change") {
return new Repeater(async (push, stop) => {
this.on(event, push);
await stop;
this.off(event, push);
});
}
}Connection monitor
Merge multiple event sources into a single stream:
javascript
const connectionSignal = Repeater.merge([
timer(10),
fromEvent(window, "online"),
fromEvent(window, "offline"),
]);
for await (const event of connectionSignal) {
if (navigator.onLine) {
await api.whoami().catch(() => null);
} else {
connection.value = "OFFLINE";
}
}Creating a Repeater
A Repeater takes an executor function with push and stop:
javascript
import { Repeater } from "@repeaterjs/repeater";
const counter = new Repeater(async (push, stop) => {
let i = 0;
const interval = setInterval(() => push(++i), 1000);
await stop;
clearInterval(interval);
});push(value)— enqueue a value for the consumer.await push(value)pauses until the consumer is ready (backpressure).await stop— resolves when the consumer callsiterator.return()or the loop exits. Put cleanup here.stop()— call to signal completion from the producer side.
Consuming with for-await-of
javascript
for await (const count of counter) {
console.log(count);
if (count >= 5) break; // triggers stop, cleanup runs
}Breaking out of the loop or calling iterator.return() automatically triggers the stop promise, running any cleanup registered after await stop.
Backpressure
await push(value) blocks until the consumer reads the value. This prevents fast producers from overwhelming slow consumers:
javascript
function streamRows(cursor) {
return new Repeater(async (push, stop) => {
while (true) {
const rows = await cursor.read(10);
if (rows.length === 0) {
stop();
break;
}
for (const row of rows) {
await push(row); // waits for consumer
}
}
});
}Without await on push, values queue unboundedly. Always await push() unless you explicitly want buffering.
Buffering with SlidingBuffer
When you only care about the latest value (e.g., reactive UI updates), use SlidingBuffer to discard older unconsumed values:
javascript
import {
Repeater,
SlidingBuffer,
} from "@repeaterjs/repeater";
function watchLatest(getter) {
return new Repeater(async (push, stop) => {
const unwatch = watch(getter, push, {
immediate: true,
deep: true,
});
await stop;
unwatch();
}, new SlidingBuffer(1));
}SlidingBuffer(1) keeps only the most recent unconsumed value, dropping anything older. This prevents slow consumers from processing stale data.
Composing streams
Repeater.merge() — interleave values from multiple sources as they arrive:
javascript
const merged = Repeater.merge([
timer(10),
fromEvent(window, "online"),
fromEvent(window, "offline"),
]);
for await (const event of merged) {
// receives values from any source, in arrival order
}Repeater.latest() — combine the latest value from each source, emitting whenever any source updates (like RxJS combineLatest):
javascript
const combined = Repeater.latest([
connectionStatus.toRepeater(),
visibility.toRepeater(),
]);
for await (const [connection, visible] of combined) {
// runs whenever either value changes
}Trade-offs
| Approach | Pros | Cons |
|---|---|---|
Repeater | Cleanup, backpressure | Requires dependency |
| Manual asyncIterator | No dependency | Verbose, leak-prone |
| RxJS | Rich operators | Large bundle |
| Node.js streams | Built-in, pipes | Complex API |
When to use Repeater over alternatives:
- You need clean resource cleanup tied to consumer lifecycle
- You want simple
push/stopinstead ofnext/return/throwprotocol - You are composing a small number of streams without needing a full reactive library