RX Observable.TakeWhile проверяет условие перед каждым элементом, но мне нужно выполнить проверку после

Observable.TakeWhile позволяет запускать последовательность, пока условие истинно (с использованием делегата, чтобы мы могли выполнять вычисления на реальных объектах последовательности), но он проверяет это условие перед каждым элементом. Как я могу выполнить одну и ту же проверку, но ПОСЛЕ каждого элемента?

Следующий код демонстрирует проблему

void RunIt() { List listOfCommands = new List(); listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 }); var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount); obs.Subscribe(x => { Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount); }); } class SomeCommand { public int CurrentIndex; public int TotalCount; } 

Эти результаты

 1 of 3 2 of 3 

Я не могу получить третий элемент

Глядя на этот пример, вы можете думать, что все, что мне нужно сделать, это изменить мое состояние так:

 var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount); 

Но тогда наблюдаемый никогда не завершится (потому что в моем реальном коде мира stream не заканчивается после этих трех команд)

Там нет встроенных операторов, чтобы делать то, что вы просите, но вот тот, который использует Publish для запуска двух запросов, а только подписывается на базовый наблюдаемый один раз:

 // Emits matching values, but includes the value that failed the filter public static IObservable TakeWhileInclusive( this IObservable source, Func predicate) { return source.Publish(co => co.TakeWhile(predicate) .Merge(co.SkipWhile(predicate).Take(1))); } 

А потом:

 var obs = listOfCommands.ToObservable() .TakeWhileInclusive(c.CurrentIndex != c.TotalCount); 

Окончательное редактирование:

Я основал свое решение от реализации TakeWhileInclusive Сергея в этом streamе – Как заполнить Rx Observable в зависимости от состояния в событии

 public static IObservable TakeUntil( this IObservable source, Func predicate) { return Observable .Create(o => source.Subscribe(x => { o.OnNext(x); if (predicate(x)) o.OnCompleted(); }, o.OnError, o.OnCompleted )); } 

Оператор TakeUntil может использовать каждый элемент, пока вторичный источник не TakeUntil значение; в этом случае мы можем указать, что второй stream будет первым значением после прохождения предиката:

 public static IObservable TakeWhileInclusive( this IObservable source, Func predicate) { return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1)); } 

Я думаю, что вы после TakeWhile , а не TakeUntil :

 var list = (new List(){1,2,3,4,5,6,7,8,9,10}); var takeWhile = list .ToObservable() .Select((_, i) => Tuple.Create(i, _)) .TakeWhile(tup => tup.Item1 < list.Count) .Do(_ => Console.WriteLine("Outputting {0}", _.Item2)); 

Хорошо, вещь, которую вы хотите, не существует из коробки, по крайней мере, я не знаю ничего с этим конкретным синтаксисом. Тем не менее, вы можете довольно легко обманывать его (и это не слишком противно):

 var fakeCmds = Enumerable .Range(1, 100) .Select(i => new SomeCommand() {CurrentIndex = i, TotalCount = 10}) .ToObservable(); var beforeMatch = fakeCmds .TakeWhile(c => c.CurrentIndex != c.TotalCount); var theMatch = fakeCmds .SkipWhile(c => c.CurrentIndex != c.TotalCount) .TakeWhile(c => c.CurrentIndex == c.TotalCount); var upToAndIncluding = Observable.Concat(beforeMatch, theMatch); 

Combo, используя новый SkipUntil и TakeUntil :

SkipUntil return source.Publish(s => s.SkipUntil(s.Where(predicate)));

TakeUntil (включительно) return source.Publish(s => s.TakeUntil(s.SkipUntil(predicate)));

Полный источник : https://gist.github.com/GeorgeTsiokos/a4985b812c4048c428a981468a965a86