c# - In Rx, how do I ensure no notifications are lost due to exceptions -

i want make sure rx notifications not lost when they're processed consumer's do delegate. have producer generates messages need process, , retry processing if fails.

desired marble diagram:

producer   1 2 3 4 5 consumer   1 x 2 3 x x 4 5 downstream 1   2 3     4 5 

the standard retry won't here since it'll resubscribe producer after error. lose notification failed processing , continue next notification.

retry marble diagram:

producer  1 2 3 4 5 consumer  1 x 3 x 5 

so far, have code, doesn't seem right me:

static void retrywithbacklog() {   var data = enumerable.range(1, 100).toobservable();   var backlog = new subject<long>();    backlog     .merge(data)     .retry()     .do(l =>     {       try       {         processnotification(l);       }       catch (exception)       {         backlog.onnext(l);       }     })     .subscribe();    console.readkey(); } 

(full code sample)


the do operation perform network request based on producer's notifications. these might fail, need retry without losing information transmitted. of course blindly while try/catch network request, requests should not happen long network connection known down (see this question).


static void retrybeforepassingdownstream() {   var data = enumerable.range(1, 100).toobservable();    data     .replay(l => l.selectmany(processnotification).retry(), 1)     .subscribe(downstream);    console.readkey(); }  static iobservable<int> processnotification(int notification) {   console.writeline("process: {0}", i);   // either:   // throw new exception("error");    // or:   return observable.return(notification); }  static void downstream(int i) {   console.writeline("downstream: {0}", i); } 

the point of retry operator works on cold observables resubscribing, causing subscription side effects each time "retries". instance, observable might send web request each time observer subscribes. in example, retry operator doesn't make sense, considering interval never calls onerror.

retry semantics can technically exist separately in observable , observer. in situation, couldn't call processnotification until succeeds in do operator?

or problem downstream observers won't observe notification when do throws? guess that's not meant, because in case swallow exception.

or problem there's chance notification change in way or filtered out upstream operators if notification replayed? in case use do followed catch , recursion achieve same thing without requiring subject, can't figure out why you'd ever want it.


the do operator not asynchronous. use selectmany instead it's part of query. way, cancelling subscription cancel request. if request method returns task<t> instead of iobservable<t>, consider using overload accepts cancellationtoken well.

there's easier way make cold observable replays last notification , can retried using retry operator, follows.


data.replay(r => r.yourstatemachine.selectmany(sendrequestasync).retry(), 1); 

james' answer here can fill in yourstatemachine.


