c# - In Rx, how do I ensure no notifications are lost due to exceptions -
i want make sure rx notifications not lost when they're processed consumer's do
delegate. have producer generates messages need process, , retry processing if fails.
desired marble diagram:
producer 1 2 3 4 5 consumer 1 x 2 3 x x 4 5 downstream 1 2 3 4 5
the standard retry
won't here since it'll resubscribe producer after error. lose notification failed processing , continue next notification.
retry
marble diagram:
producer 1 2 3 4 5 consumer 1 x 3 x 5
so far, have code, doesn't seem right me:
static void retrywithbacklog() { var data = enumerable.range(1, 100).toobservable(); var backlog = new subject<long>(); backlog .merge(data) .retry() .do(l => { try { processnotification(l); } catch (exception) { backlog.onnext(l); } }) .subscribe(); console.readkey(); }
background
the do
operation perform network request based on producer's notifications. these might fail, need retry without losing information transmitted. of course blindly while try/catch
network request, requests should not happen long network connection known down (see this question).
solution
static void retrybeforepassingdownstream() { var data = enumerable.range(1, 100).toobservable(); data .replay(l => l.selectmany(processnotification).retry(), 1) .subscribe(downstream); console.readkey(); } static iobservable<int> processnotification(int notification) { console.writeline("process: {0}", i); // either: // throw new exception("error"); // or: return observable.return(notification); } static void downstream(int i) { console.writeline("downstream: {0}", i); }
the point of retry
operator works on cold observables resubscribing, causing subscription side effects each time "retries". instance, observable might send web request each time observer subscribes. in example, retry
operator doesn't make sense, considering interval
never calls onerror
.
retry
semantics can technically exist separately in observable , observer. in situation, couldn't call processnotification
until succeeds in do
operator?
or problem downstream observers won't observe notification when do
throws? guess that's not meant, because in case swallow exception.
or problem there's chance notification change in way or filtered out upstream operators if notification replayed? in case use do
followed catch
, recursion achieve same thing without requiring subject
, can't figure out why you'd ever want it.
update
the do
operator not asynchronous. use selectmany
instead it's part of query. way, cancelling subscription cancel request. if request method returns task<t>
instead of iobservable<t>
, consider using overload accepts cancellationtoken
well.
there's easier way make cold observable replays last notification , can retried using retry
operator, follows.
(untested)
data.replay(r => r.yourstatemachine.selectmany(sendrequestasync).retry(), 1);
james' answer here can fill in yourstatemachine.
Comments
Post a Comment