Pergunta

This is an academic exercise, I'm new to Reactive Extensions and trying to get my head around the technology. I set myself a goal of making an IObservable that returns successive digits of Pi (I happen to be really interested in Pi right at the moment for unrelated reasons). Reactive Extensions contains operators for making observables, the guidance they give is that you should "almost never need to create your own IObsevable". But I can't see how I can do this with the ready-made operators and methods. Let me elucidate a bit more.

I was planning to use an algorithm that would involve the expansion of a Taylor series for Arctan. To get the next digit of Pi, I'd expand a few more terms in the series.

So I need the series expansion going on asynchronously, occasionally throwing out the next computed digit to the IObserver. I obviosly don't want to restart the computation from scratch for each new digit.

Is there a way to implement this behaviour using RX's built-in operators, or am I going to have to code an IObservable from scratch? What strategy suggests itself?

Foi útil?

Solução

Simplest way is to create an Enumerable and then convert it:

IEnumerable<int> Pi()
{
    // algorithm here
    for (int i = 0; i < 1000; i++)
    {
        yield return i;
    }
}

Usage (for a cold observable, that is every new 'subscription' starts creating Pi from scratch):

var cold = Pi().ToObservable(Scheduler.ThreadPool);
cold.Take(5).Subscribe(Console.WriteLine);

If you want to make it hot (everyone shares the same underlying calculation), you can just do this:

var hot = cold.Publish().RefCount();

Which will start the calculation after the first subscriber, and stop it when they all disconnect. Here's a simple test:

hot.Subscribe(p => Console.WriteLine("hot1: " + p));
Thread.Sleep(5);    
hot.Subscribe(p => Console.WriteLine("hot2: " + p));    

Which should show hot1 printing only for a little while, then hot2 joining in after a short wait but printing the same numbers. If this was done with cold, the two subscriptions would each start from 0.

Outras dicas

For something like this, the simplest method would be to use a Subject. Subject is both an IObservable and IObserver, which sounds a bit strange but it allows you to use them like this:

class PiCalculator
{
    private readonly Subject<int> resultStream = new Subject<int>();

    public IObservable<int> ResultStream
    {
        get { return resultStream; }
    }

    public void Start()
    {
        // Whatever the algorithm actually is
        for (int i = 0; i < 1000; i++)
        {
            resultStream.OnNext(i);
        }
    }
}

So inside your algorithm, you just call OnNext on the subject whenever you want to produce the next value.

Then to use it, you just need something like:

var piCalculator = new PiCalculator();
piCalculator.ResultStream.Subscribe(n => Console.WriteLine((n)));
piCalculator.Start();
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top