Asynchronous output

AntaoAlmadaAntaoAlmada Antao AlmadaPTMember ✭✭

I'm looking into ways of handling IObservable in Workbooks. I want it to behave like the following code using Task:

Task.Run(() => {
    for(var i = 0; i < 10; i++)
    {
        Console.WriteLine(i);
        Task.Delay(TimeSpan.FromSeconds(1)).Wait();
    }
}).Wait();

It writes the numbers 0 to 9 with 1 second interval.

An equivalent with observables would be:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(10)
    .Subscribe(i => Console.WriteLine(i));

I understand that Subscribe() returns IDisposable its representation is shown but the Wait() returns void. If removed, the Task representation is then shown.

Would it be possible to have an integration that makes the Subscribe have the same behaviors as Wait? I started developing one but I'm not sure how to proceed: https://github.com/NetFabric/ReactiveBooks/blob/develop/NetFabric.ReactiveBooks/ObservableRepresentationProvider.cs

Thanks!

Posts

  • abockabock Aaron Bockover USXamarin Team, Insider Xamurai

    I think we're going to have to actually support this scenario in the client itself a little better. Under the hood, the agent can post messages/results to the client at any time, but we have no way of associating messages/results with a cell/submission apart from what we do with console output.

    I will think about this a little bit and we'll provide a solution for agent integrations to use, and may just be able to cleanly support observables anyway like we do for async.

  • abockabock Aaron Bockover USXamarin Team, Insider Xamurai

    Also note that in your example where you are starting a thread and waiting on it, you can use await directly in workbook cells (they are implicitly async methods, returning Task<object> under the hood).

    for (int i = 0; i < 10; i++) {
        Console.WriteLine (i);
        await Task.Delay (TimeSpan.FromSeconds (1));
    }
    

    In fact, that snippet comes straight from a regression test workbook. It tests the message posting and cell-association behavior I was talking about. You'll see the cell that's executing update "live" as the Console.WriteLine is executed in the agent.

  • KentBoogaartKentBoogaart Kent Boogaart AUMember ✭✭

    Not familiar with Workbooks and can't even get Rx referenced so I can test myself, but shouldn't this work:

    var o = Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Take(10)
        .Do(Console.WriteLine);
    
    await o;
    
  • abockabock Aaron Bockover USXamarin Team, Insider Xamurai

    @KentBoogaart that particular example works just fine, but only because o is awaitable and the delegate passed to Do happens to write to Console.Out, which is a special case that I mentioned before.

    The issue here is what to do with IObservable in general, and perhaps what to do with the results of any delegate passed to Do in your example which does not happen to write to the console.

  • AntaoAlmadaAntaoAlmada Antao Almada PTMember ✭✭

    @KentBoogaart It only works with Rx 4.0 Preview.

    I've been trying await on all previous Workbooks versions and it would only return on OnComplete. It's great to see that this behavior changed.

  • abockabock Aaron Bockover USXamarin Team, Insider Xamurai
    edited October 14

    Just wanted to give an update: after thinking about this off and on for the week, I've prototyped support for the bits needed to move forward with rich support for IObservable. We will release this support in the first preview of the 1.4 series in the next couple of weeks.

    The following integration API would be added (tentative of course, but this is roughly what's needed):
    https://gist.github.com/abock/7bbefbaf033516f49f6c3731acb445c7

    And the following is roughly what the integration itself would look like:

    public class SampleIntegration : IAgentIntegration, IObserver<IEvaluation>
    {
        readonly IAgent agent;
    
        public void IntegrateWith (IAgent agent)
        {
            this.agent = agent;
            agent.RegisterEvaluationContextHandler (HandleNewEvaluationContext);
        }
    
        void HandleNewEvaluationContext (IEvaluationContext evaluationContext)
            => evaluationContext.Evaluations.Subscribe (this);
    
        void IObservable<IEvaluation> OnNext (IEvaluation evaluation)
        {
            // a new cell is about to execute or has finished executing (switch on evaluation.Phase)
            // the rest of this method is pseudo-code
    
            if (evaluation.Result is IObservable)
                evaluation.Subscribe (value => {
                    // the ID from the evaluation would tie the out-of-band result to the cell
                    agent.PublishEvaluationResult (evaluation.Id, value);
                });
        }
    
        void IObservable<IEvaluation> OnError (Exception evaluation)
        {
            // the current (last call to OnNext) evaluation threw an exception
        }
    
        void IObservable<IEvaluation> OnCompleted ()
        {
            // the last cell has finished evaluating
        }
    }
    

    Basically, the evaluation process in the agent no longer returns a single result - instead it posts its [by default] single result back to the client. And the evaluator itself is now an observable stream of evaluations which can be acted upon by integrations. In turn, integrations can post out-of-band results back to a specified cell (the observed evaluation ID).

    Please don't hesitate to provide feedback on the process or the tentative API I've outlined here.

    Cheers!

Sign In or Register to comment.