Posts

How do I get RxViz to display several streams at once?

Image
If you have been reading these blog entries, you have seen some of the things RxViz can do — but perhaps, you have tried to display an array of Observables and been frustrated by this error:   Last expression must be an Observable Well, darn. An  Observable.  One.  I want to put a bunch on the screen, lined up top to bottom. Consider, though, higher-order Observables; that is, Observables that emit other Observables.  Like this ship that carries other ships: RxViz does a beautiful  job of displaying higher-order Observables.  See here for good example. And you might hope that therefore you could display several Observables just by packing them up in a single containing Observable, like this: of(p0, p1, p2) It almost works.  All three Observables are displayed, just not in any predictable order.  Sometimes,   p2  is at the bottom; sometimes at the top.  Why RxViz  has this problem I ...

How do I space out emissions in time?

RxJS offers several throttling and delaying options but not (so far as I have found — hey, it’s a big library!) the simple one: I don’t emissions too close together but if one is early, don’t throw it away, just hold it up for a little while and then emit it in sequence. I am going to solve this one in pieces.  First, I make a function called pause(n) that will return an Observable that is n milliseconds long, but does not emit anything:   function pause(n) {     return timer(n).pipe(filter(() => false));   } Now let’s extend that with pauseAfter(n, v) that emits v and then pauses n milliseconds:     function pauseAfter(n, v) {        return pause(n).pipe(startWith(v));     } And finally, there is an operator-factory:     function throttleBuffer(n) {        return concatMap(v => pauseAfter(n, v));     } The code is here . The only downside to this tec...

How can I tell if an Observable is running or not?

You should think of Observables like pipes: you cannot tell what is happening inside of them by looking at the outside. The question should be rephrased as, given an Observable, how can I create another Observable that emits true when a the first Observable is still running and false when it is not. First, consider the following operator: function activity() {   return pipe(     mapTo(0),     startWith(1),     endWith(-1),     catchError(() => of(-1)),   ); } This will emit 1, then 0 every time the source Observable emits, and then -1 when it finalizes, successfully or not. In marble terms: source: ---a--b--c---. result  1--0--0--0--(-1). Why that weird protocol? Well, mostly because I have a theory that activity() will be useful later, but you can see that function runningSum() {   return scan((acc, v) => acc + v, 0); } source.pipe(runningSum()); will emit 1 whenever the source Observable...

How do I create a timeout?

This inaugural post is in the format I hope to follow going forward: a question, a discussion of the question, a solution or several solutions ( each  linked to a demo), and a teaser for the next question.  The point of the teaser is to stimulate some discussion of possible solutions from readers, but I will be sincerely shocked if the first few posts even get readers, let alone discussion from them.  The assumption is that every reader knows the basics of RxJs.  On the left is a link to the documentation for RxJs; that may clarify some things. If the question is, “How do I create an Operator that allows its source Observable to run for given time and then ends it?”, that’s pretty simple: function timeout(n) {   return takeUntil(timer(n)); } Code is here . But the question is, “How do I create an Operator that copies its source Observable, until the source fails to emit for given time?”, it’s somewhat more complicated. The key insight is that switch...