import { Observable } from '../Observable'; import { innerFrom } from '../observable/innerFrom'; import { Subject } from '../Subject'; import { operate } from '../util/lift'; import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber'; export function groupBy(keySelector, elementOrOptions, duration, connector) { return operate(function (source, subscriber) { var element; if (!elementOrOptions || typeof elementOrOptions === 'function') { element = elementOrOptions; } else { (duration = elementOrOptions.duration, element = elementOrOptions.element, connector = elementOrOptions.connector); } var groups = new Map(); var notify = function (cb) { groups.forEach(cb); cb(subscriber); }; var handleError = function (err) { return notify(function (consumer) { return consumer.error(err); }); }; var activeGroups = 0; var teardownAttempted = false; var groupBySourceSubscriber = new OperatorSubscriber(subscriber, function (value) { try { var key_1 = keySelector(value); var group_1 = groups.get(key_1); if (!group_1) { groups.set(key_1, (group_1 = connector ? connector() : new Subject())); var grouped = createGroupedObservable(key_1, group_1); subscriber.next(grouped); if (duration) { var durationSubscriber_1 = createOperatorSubscriber(group_1, function () { group_1.complete(); durationSubscriber_1 === null || durationSubscriber_1 === void 0 ? void 0 : durationSubscriber_1.unsubscribe(); }, undefined, undefined, function () { return groups.delete(key_1); }); groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber_1)); } } group_1.next(element ? element(value) : value); } catch (err) { handleError(err); } }, function () { return notify(function (consumer) { return consumer.complete(); }); }, handleError, function () { return groups.clear(); }, function () { teardownAttempted = true; return activeGroups === 0; }); source.subscribe(groupBySourceSubscriber); function createGroupedObservable(key, groupSubject) { var result = new Observable(function (groupSubscriber) { activeGroups++; var innerSub = groupSubject.subscribe(groupSubscriber); return function () { innerSub.unsubscribe(); --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe(); }; }); result.key = key; return result; } }); } //# sourceMappingURL=groupBy.js.map