import { Subject } from '../Subject'; import { asyncScheduler } from '../scheduler/async'; import { Subscription } from '../Subscription'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; import { arrRemove } from '../util/arrRemove'; import { popScheduler } from '../util/args'; import { executeSchedule } from '../util/executeSchedule'; export function windowTime(windowTimeSpan, ...otherArgs) { var _a, _b; const scheduler = (_a = popScheduler(otherArgs)) !== null && _a !== void 0 ? _a : asyncScheduler; const windowCreationInterval = (_b = otherArgs[0]) !== null && _b !== void 0 ? _b : null; const maxWindowSize = otherArgs[1] || Infinity; return operate((source, subscriber) => { let windowRecords = []; let restartOnClose = false; const closeWindow = (record) => { const { window, subs } = record; window.complete(); subs.unsubscribe(); arrRemove(windowRecords, record); restartOnClose && startWindow(); }; const startWindow = () => { if (windowRecords) { const subs = new Subscription(); subscriber.add(subs); const window = new Subject(); const record = { window, subs, seen: 0, }; windowRecords.push(record); subscriber.next(window.asObservable()); executeSchedule(subs, scheduler, () => closeWindow(record), windowTimeSpan); } }; if (windowCreationInterval !== null && windowCreationInterval >= 0) { executeSchedule(subscriber, scheduler, startWindow, windowCreationInterval, true); } else { restartOnClose = true; } startWindow(); const loop = (cb) => windowRecords.slice().forEach(cb); const terminate = (cb) => { loop(({ window }) => cb(window)); cb(subscriber); subscriber.unsubscribe(); }; source.subscribe(createOperatorSubscriber(subscriber, (value) => { loop((record) => { record.window.next(value); maxWindowSize <= ++record.seen && closeWindow(record); }); }, () => terminate((consumer) => consumer.complete()), (err) => terminate((consumer) => consumer.error(err)))); return () => { windowRecords = null; }; }); } //# sourceMappingURL=windowTime.js.map