var _extends = Object.assign || function (target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i]; for (var key in source) { if (Object.prototype.hasOwnProperty.call(source, key)) { target[key] = source[key]; } } } return target; }; import { is, check, remove, MATCH, internalErr, SAGA_ACTION } from './utils'; import { buffers } from './buffers'; import { asap } from './scheduler'; var CHANNEL_END_TYPE = '@@redux-saga/CHANNEL_END'; export var END = { type: CHANNEL_END_TYPE }; export var isEnd = function isEnd(a) { return a && a.type === CHANNEL_END_TYPE; }; export function emitter() { var subscribers = []; function subscribe(sub) { subscribers.push(sub); return function () { return remove(subscribers, sub); }; } function emit(item) { var arr = subscribers.slice(); for (var i = 0, len = arr.length; i < len; i++) { arr[i](item); } } return { subscribe: subscribe, emit: emit }; } export var INVALID_BUFFER = 'invalid buffer passed to channel factory function'; export var UNDEFINED_INPUT_ERROR = 'Saga was provided with an undefined action'; if (process.env.NODE_ENV !== 'production') { UNDEFINED_INPUT_ERROR += '\nHints:\n - check that your Action Creator returns a non-undefined value\n - if the Saga was started using runSaga, check that your subscribe source provides the action to its listeners\n '; } export function channel() { var buffer = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : buffers.fixed(); var closed = false; var takers = []; check(buffer, is.buffer, INVALID_BUFFER); function checkForbiddenStates() { if (closed && takers.length) { throw internalErr('Cannot have a closed channel with pending takers'); } if (takers.length && !buffer.isEmpty()) { throw internalErr('Cannot have pending takers with non empty buffer'); } } function put(input) { checkForbiddenStates(); check(input, is.notUndef, UNDEFINED_INPUT_ERROR); if (closed) { return; } if (!takers.length) { return buffer.put(input); } for (var i = 0; i < takers.length; i++) { var cb = takers[i]; if (!cb[MATCH] || cb[MATCH](input)) { takers.splice(i, 1); return cb(input); } } } function take(cb) { checkForbiddenStates(); check(cb, is.func, "channel.take's callback must be a function"); if (closed && buffer.isEmpty()) { cb(END); } else if (!buffer.isEmpty()) { cb(buffer.take()); } else { takers.push(cb); cb.cancel = function () { return remove(takers, cb); }; } } function flush(cb) { checkForbiddenStates(); // TODO: check if some new state should be forbidden now check(cb, is.func, "channel.flush' callback must be a function"); if (closed && buffer.isEmpty()) { cb(END); return; } cb(buffer.flush()); } function close() { checkForbiddenStates(); if (!closed) { closed = true; if (takers.length) { var arr = takers; takers = []; for (var i = 0, len = arr.length; i < len; i++) { arr[i](END); } } } } return { take: take, put: put, flush: flush, close: close, get __takers__() { return takers; }, get __closed__() { return closed; } }; } export function eventChannel(subscribe) { var buffer = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : buffers.none(); var matcher = arguments[2]; /** should be if(typeof matcher !== undefined) instead? see PR #273 for a background discussion **/ if (arguments.length > 2) { check(matcher, is.func, 'Invalid match function passed to eventChannel'); } var chan = channel(buffer); var close = function close() { if (!chan.__closed__) { if (unsubscribe) { unsubscribe(); } chan.close(); } }; var unsubscribe = subscribe(function (input) { if (isEnd(input)) { close(); return; } if (matcher && !matcher(input)) { return; } chan.put(input); }); if (chan.__closed__) { unsubscribe(); } if (!is.func(unsubscribe)) { throw new Error('in eventChannel: subscribe should return a function to unsubscribe'); } return { take: chan.take, flush: chan.flush, close: close }; } export function stdChannel(subscribe) { var chan = eventChannel(function (cb) { return subscribe(function (input) { if (input[SAGA_ACTION]) { cb(input); return; } asap(function () { return cb(input); }); }); }); return _extends({}, chan, { take: function take(cb, matcher) { if (arguments.length > 1) { check(matcher, is.func, "channel.take's matcher argument must be a function"); cb[MATCH] = matcher; } chan.take(cb); } }); }