async / await - replace SynchronizationContext as the receiver of the Post call

When you have SynchronizationContext context specified as it happens with the main thread, await subsequent code will Post to the looper used by the SynchronizationContext implementation.
This is one of those magical things happening with the compiler help. We were trying to write our own TaskWaiter to Post to a specific queue where we do have control when things are executed.
We do have something working, not as expected though. :)

We are taking this as reference

http://jake.ginnivan.net/blog/2014/01/10/on-async-and-sync-contexts/

and

http://weblogs.asp.net/dixin/understanding-c-sharp-async-await-2-awaitable-awaiter-pattern

Any suggestion how to do it without changing SynchronizationContext associated with current thread?

tkx

Posts

  • ManuelCostaManuelCosta PTMember ✭✭

    Although we have to test it more, it seems we have something working.
    We are using ISynchronizeInvoke as execution context and "Post" is carried out by BeginInvoke.
    ISynchronizeInvoke executionContext = ...;

    Task.Run (new Action (() => {
        Debug.WriteLine ("Task 1");
        })).ConfigureAwait (executionContext);
    

    Note: we also used the following code as reference

    https://github.com/OmerMor/AsyncBridge/tree/master/src/AsyncBridge/Runtime.CompilerServices

    using System;
    using System.Runtime.CompilerServices;
    using System.ComponentModel;
    using System.Threading.Tasks;
    using System.Threading;
    using System.Reflection;
    using System.Security;
    
    namespace Nativo.Threading
    {
        public interface IAwaitable
        {
            IAwaiter GetAwaiter();
        }
    
    public interface IAwaiter : INotifyCompletion // or ICriticalNotifyCompletion
    {
        // INotifyCompletion has one method: void OnCompleted(Action continuation);
    
        // ICriticalNotifyCompletion implements INotifyCompletion,
        // also has this method: void UnsafeOnCompleted(Action continuation);
    
        bool IsCompleted { get; }
    
        void GetResult();
    }
    
    public interface IAwaitable<out TResult>
    {
        IAwaiter<TResult> GetAwaiter();
    }
    
    public interface IAwaiter<out TResult> : INotifyCompletion // or ICriticalNotifyCompletion
    {
        bool IsCompleted { get; }
    
        TResult GetResult();
    }
    
    public struct Awaiter<TResult> : IAwaiter<TResult>
    {
        private readonly Task<TResult> task;
        private ISynchronizeInvoke executionContext;
    
        internal Awaiter(Task<TResult> task, ISynchronizeInvoke executionContext){
            if (task == null)
                throw new ArgumentNullException ("task");
    
            if (executionContext == null)
                throw new ArgumentNullException ("executionContext");
    
            this.task = task;
            this.executionContext = executionContext;
    
        }
    
        public void OnCompleted(Action continuation)
        {
            executionContext.BeginInvoke (continuation, null);
        }
    
        public bool IsCompleted
        {
            get
            {
                return false;
            }
        }
    
        /// <summary>
        ///   Fast checks for the end of an await operation to determine whether more needs to be done prior to completing the await.
        /// </summary>
        /// <param name="task"> The awaited task. </param>
        internal static void ValidateEnd(Task task)
        {
            if (task.Status == TaskStatus.RanToCompletion)
                return;
    
            HandleNonSuccess(task);
        }
    
        /// <summary>
        ///   Handles validations on tasks that aren't successfully completed.
        /// </summary>
        /// <param name="task"> The awaited task. </param>
        private static void HandleNonSuccess(Task task)
        {
            if (!task.IsCompleted)
            {
                try
                {
                    task.Wait();
                }
                catch
                {
                }
            }
            if (task.Status == TaskStatus.RanToCompletion)
                return;
            ThrowForNonSuccess(task);
        }
    
        /// <summary>
        ///   Throws an exception to handle a task that completed in a state other than RanToCompletion.
        /// </summary>
        private static void ThrowForNonSuccess(Task task)
        {
            switch (task.Status)
            {
            case TaskStatus.Canceled:
                throw new TaskCanceledException(task);
            case TaskStatus.Faulted:
                throw task.Exception.InnerException;
            default:
                throw new InvalidOperationException("The task has not yet completed.");
            }
        }
    
    
        public TResult GetResult()
        {
            ValidateEnd (task);
    
            return this.task.Result;
        }
    }
    
    public struct Awaiter : IAwaiter
    {
        private readonly Task task;
        private ISynchronizeInvoke executionContext;
    
        internal Awaiter(Task task, ISynchronizeInvoke executionContext){
            if (task == null)
                throw new ArgumentNullException ("task");
    
            if (executionContext == null)
                throw new ArgumentNullException ("executionContext");
    
            this.task = task;
            this.executionContext = executionContext;
    
        }
    
        public void OnCompleted(Action continuation)
        {
            executionContext.BeginInvoke (continuation, null);
        }
    
        [SecurityCritical]
        public void UnsafeOnCompleted(Action continuation)
        {
            executionContext.BeginInvoke (continuation, null);
        }
    
        public bool IsCompleted
        {
            get
            {
                return false;
            }
        }
    
    
        /// <summary>
        ///   Fast checks for the end of an await operation to determine whether more needs to be done prior to completing the await.
        /// </summary>
        /// <param name="task"> The awaited task. </param>
        internal static void ValidateEnd(Task task)
        {
            if (task.Status == TaskStatus.RanToCompletion)
                return;
    
            HandleNonSuccess(task);
        }
    
        /// <summary>
        ///   Handles validations on tasks that aren't successfully completed.
        /// </summary>
        /// <param name="task"> The awaited task. </param>
        private static void HandleNonSuccess(Task task)
        {
            if (!task.IsCompleted)
            {
                try
                {
                    task.Wait();
                }
                catch
                {
                }
            }
            if (task.Status == TaskStatus.RanToCompletion)
                return;
            ThrowForNonSuccess(task);
        }
    
        /// <summary>
        ///   Throws an exception to handle a task that completed in a state other than RanToCompletion.
        /// </summary>
        private static void ThrowForNonSuccess(Task task)
        {
            switch (task.Status)
            {
            case TaskStatus.Canceled:
                throw new TaskCanceledException(task);
            case TaskStatus.Faulted:
                throw task.Exception.InnerException;
            default:
                throw new InvalidOperationException("The task has not yet completed.");
            }
        }
    
        public void GetResult()
        {
            ValidateEnd (task);
        }
    
    
    }
    
    
    internal struct Awaitable<TResult> : IAwaitable<TResult>
    {
        private readonly Task<TResult> task;
        private ISynchronizeInvoke executionContext;
    
        public Awaitable(Task<TResult> task, ISynchronizeInvoke executionContext)
        {
            if (task == null)
                throw new ArgumentNullException ("task");
    
            if (executionContext == null)
                throw new ArgumentNullException ("executionContext");
    
            this.task = task;
            this.executionContext = executionContext;
    
        }
    
        public IAwaiter<TResult> GetAwaiter()
        {
            return new Awaiter<TResult>(this.task, executionContext);
        }
    }
    
    internal struct Awaitable : IAwaitable
    {
        private readonly Task task;
        private ISynchronizeInvoke executionContext;
    
        public Awaitable(Task task, ISynchronizeInvoke executionContext)
        {
            if (task == null)
                throw new ArgumentNullException ("task");
    
            if (executionContext == null)
                throw new ArgumentNullException ("executionContext");
    
            this.task = task;
            this.executionContext = executionContext;
    
        }
    
        public IAwaiter GetAwaiter()
        {
            return new Awaiter(this.task, executionContext);
        }
    }
    
    public static class AwaiterTaskExtensions{
        public static IAwaitable<TResult> ConfigureAwait<TResult>(this Task<TResult> task, ISynchronizeInvoke executionContext){
            return new Awaitable<TResult>(task, executionContext);
        }
    
        public static IAwaitable ConfigureAwait(this Task task, ISynchronizeInvoke executionContext){
            return new Awaitable(task, executionContext);
        }
    }
    }
    
  • ManuelCostaManuelCosta PTMember ✭✭

    And here is an example of an Execution Context that will allow you to pause / resume "Post"s sent by the Awaiter. This execution context runs on the main thread and is just a filter to queue methods before dispatching them to main thread. For example, this is interesting if you are using async / await to pull images from the web and then you want to create an OpenGL texture with the decoded image.
    Given that iOS does not allow you to do GL operation in background, with this design you can request permissions to execute in background, pull the image from the web and queue code that do OpenGL calls to be executed when the app comes to foreground.

    using System;
    using System.Threading;
    using System.Collections.Generic;
    using CoreFoundation;
    using Foundation;
    using System.ComponentModel;
    
    namespace Nativo.Threading
    {
    public class Rendering_ExecutionContext_iOS : IExecutionContext
    {
        #region Private Fields
        private ExecutionState state = ExecutionState.Stop;
        private Queue<MethodInfo> delegateQueue = new Queue<MethodInfo> ();
        private Thread thread;
        private object syncObject = new object();
    
        #endregion
    
        #region Public Events
        public event EventHandler<ExecutionStateChangedEventArgs> ExecutionStateChanged;
        public event EventHandler<ThreadExceptionEventArgs> ExceptionThrown;
        #endregion
    
        #region Aux Class
        internal class MethodInfo{
    
            internal MethodInfo(Delegate method, object[] args,  ManualResetEvent sem){
                this.Method = method;
                this.Args = args;
                this.Sem = sem;
            }
    
            internal Delegate Method{ get; private set;}
            internal object[] Args{ get; private set;}
            internal ManualResetEvent Sem{ get; private set;}
        }
        #endregion
    
        #region Constructor
        public Rendering_ExecutionContext_iOS ()
        {
            thread = Thread.CurrentThread;
        }
        #endregion
    
        #region Private Methods
        private void InternalExecute(bool isDraining){
            try {
                MethodInfo exeInfo;
    
                lock (syncObject) {
    
                    if (delegateQueue.Count == 0 || (!isDraining && state != ExecutionState.Executing))
                        return;
    
                    exeInfo = delegateQueue.Dequeue ();
    
                }
    
                exeInfo.Method.DynamicInvoke (exeInfo.Args);
    
                //System.Diagnostics.Debug.WriteLine("Rendering - " + exeInfo.Method.Method.ToString());
    
                if (exeInfo.Sem != null) {
                    exeInfo.Sem.Set ();
                }
    
            } catch (Exception ex) {
                OnExceptionThrownEvent (ex);
            }
        }
    
    
        #endregion
    
        #region Event Triggers
    
        protected virtual void OnExecutionStateChanged(ExecutionState newState){
    
            ExecutionState oldState;
    
            if (newState == state) {
                return;
            }
    
            lock (syncObject) {
    
                oldState = state;
                state = newState;
            }
    
            var executionStateChanged = ExecutionStateChanged;
    
            if (executionStateChanged != null) {
                executionStateChanged (this, new ExecutionStateChangedEventArgs (state, oldState));
            }
        }
    
        protected virtual void OnExceptionThrownEvent(Exception error){
            var exceptionThrown = ExceptionThrown;
    
            if (exceptionThrown != null) {
                exceptionThrown (this, new ThreadExceptionEventArgs (error));
            }
        }
    
        #endregion
    
        #region Public Methods
        public void Start(){
            OnExecutionStateChanged (ExecutionState.Executing);
        }
    
        public void Pause(){
    
            OnExecutionStateChanged (ExecutionState.Paused);
            DrainQueuedMethods ();
        }
    
        public void Resume(){
    
            OnExecutionStateChanged (ExecutionState.Executing);
    
            DrainQueuedMethods ();
        }
    
        public void Stop(){
            OnExecutionStateChanged (ExecutionState.Stopping);
            DrainQueuedMethods ();
        }
    
        public void DrainQueuedMethods(){
            lock (syncObject) {
                while (delegateQueue.Count > 0) {
                    InternalExecute (true);
                }
            }
        }
    
        public IAsyncResult BeginInvoke(Delegate method, params object[] args){
            lock (syncObject) {
                if (state == ExecutionState.Stop || state == ExecutionState.Stopping)
                    return null;
    
                if (method == null)
                    throw new ArgumentNullException ("method");
    
                var methodInfo = new MethodInfo (method, args, null);
    
                delegateQueue.Enqueue (methodInfo);
    
                if (state == ExecutionState.Executing) {
                    DispatchQueue.MainQueue.DispatchAsync (() => {
                        InternalExecute (false);
                    }
                    );
                }
            }
    
            return null;
        }
    
        public object Invoke(Delegate method, params object[] args)
        {
            lock (syncObject) {
                if (state == ExecutionState.Stop || state == ExecutionState.Stopping)
                    return null;
            }
    
            if (method == null)
                throw new ArgumentNullException ("method");
    
            if (InvokeRequired) {
                ManualResetEvent sem = new ManualResetEvent (false);
    
    
                var methodInfo = new MethodInfo (method, args, sem);
    
                lock (syncObject) {
                    delegateQueue.Enqueue (methodInfo);
    
    
                    if (state == ExecutionState.Executing) {
                        DispatchQueue.MainQueue.DispatchAsync (() => {
                            InternalExecute (false);
                        });
                    } else {
    
                    }
    
                }
                sem.WaitOne ();
            } else {
                method.DynamicInvoke (args);
            }
    
            return null;
        }
    
        public object EndInvoke(IAsyncResult result){
    
            return null;
        }
    
        #endregion
    
        #region Properties
        public bool InvokeRequired{
            get{
                return Thread.CurrentThread.ManagedThreadId != thread.ManagedThreadId;
            }
        }
    
        public Thread Thread{
            get{
                return thread;
            }
        }
    
        public ExecutionState State{
            get{
                lock (syncObject) {
                    return state;
                }
            }
        }
    
        #endregion
    }
    
    public class ExecutionStateChangedEventArgs : EventArgs
    {
        //
        // Properties
        //
        public ExecutionState OldState {
    
            get;
    
            private set;
        }
    
        public ExecutionState State {
    
            get;
    
            private set;
        }
    
        //
        // Constructors
        //
        public ExecutionStateChangedEventArgs (ExecutionState state, ExecutionState oldState){
            this.State = state;
            this.OldState = oldState;
        }
    }
    
    public enum ExecutionState
    {
        Stop,
        Executing,
        Paused,
        Stopping
    }
    
    public interface IExecutionContext : ISynchronizeInvoke
    {
        //
        // Properties
        //
        ExecutionState State {
            get;
        }
    
        Thread Thread {
            get;
        }
    
        //
        // Methods
        //
        void Pause ();
    
        void Resume ();
    
        void Start ();
    
        void Stop ();
    
        void DrainQueuedMethods();
        //
        // Events
        //
        event EventHandler<ExecutionStateChangedEventArgs> ExecutionStateChanged;
        event EventHandler<ThreadExceptionEventArgs> ExceptionThrown;
    }
    
    }
    
Sign In or Register to comment.