using Cysharp.Threading.Tasks.Internal; 
 | 
using System; 
 | 
using System.Collections.Concurrent; 
 | 
using System.Collections.Generic; 
 | 
using System.Runtime.ExceptionServices; 
 | 
using System.Threading; 
 | 
using System.Threading.Tasks; 
 | 
  
 | 
namespace Cysharp.Threading.Tasks.Linq 
 | 
{ 
 | 
    public static partial class UniTaskAsyncEnumerable 
 | 
    { 
 | 
        public static IUniTaskAsyncEnumerable<TSource> ToUniTaskAsyncEnumerable<TSource>(this IEnumerable<TSource> source) 
 | 
        { 
 | 
            Error.ThrowArgumentNullException(source, nameof(source)); 
 | 
  
 | 
            return new ToUniTaskAsyncEnumerable<TSource>(source); 
 | 
        } 
 | 
  
 | 
        public static IUniTaskAsyncEnumerable<TSource> ToUniTaskAsyncEnumerable<TSource>(this Task<TSource> source) 
 | 
        { 
 | 
            Error.ThrowArgumentNullException(source, nameof(source)); 
 | 
  
 | 
            return new ToUniTaskAsyncEnumerableTask<TSource>(source); 
 | 
        } 
 | 
  
 | 
        public static IUniTaskAsyncEnumerable<TSource> ToUniTaskAsyncEnumerable<TSource>(this UniTask<TSource> source) 
 | 
        { 
 | 
            return new ToUniTaskAsyncEnumerableUniTask<TSource>(source); 
 | 
        } 
 | 
  
 | 
        public static IUniTaskAsyncEnumerable<TSource> ToUniTaskAsyncEnumerable<TSource>(this IObservable<TSource> source) 
 | 
        { 
 | 
            Error.ThrowArgumentNullException(source, nameof(source)); 
 | 
  
 | 
            return new ToUniTaskAsyncEnumerableObservable<TSource>(source); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    internal class ToUniTaskAsyncEnumerable<T> : IUniTaskAsyncEnumerable<T> 
 | 
    { 
 | 
        readonly IEnumerable<T> source; 
 | 
  
 | 
        public ToUniTaskAsyncEnumerable(IEnumerable<T> source) 
 | 
        { 
 | 
            this.source = source; 
 | 
        } 
 | 
  
 | 
        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) 
 | 
        { 
 | 
            return new _ToUniTaskAsyncEnumerable(source, cancellationToken); 
 | 
        } 
 | 
  
 | 
        class _ToUniTaskAsyncEnumerable : IUniTaskAsyncEnumerator<T> 
 | 
        { 
 | 
            readonly IEnumerable<T> source; 
 | 
            CancellationToken cancellationToken; 
 | 
  
 | 
            IEnumerator<T> enumerator; 
 | 
  
 | 
            public _ToUniTaskAsyncEnumerable(IEnumerable<T> source, CancellationToken cancellationToken) 
 | 
            { 
 | 
                this.source = source; 
 | 
                this.cancellationToken = cancellationToken; 
 | 
            } 
 | 
  
 | 
            public T Current => enumerator.Current; 
 | 
  
 | 
            public UniTask<bool> MoveNextAsync() 
 | 
            { 
 | 
                cancellationToken.ThrowIfCancellationRequested(); 
 | 
  
 | 
                if (enumerator == null) 
 | 
                { 
 | 
                    enumerator = source.GetEnumerator(); 
 | 
                } 
 | 
  
 | 
                if (enumerator.MoveNext()) 
 | 
                { 
 | 
                    return CompletedTasks.True; 
 | 
                } 
 | 
  
 | 
                return CompletedTasks.False; 
 | 
            } 
 | 
  
 | 
            public UniTask DisposeAsync() 
 | 
            { 
 | 
                enumerator.Dispose(); 
 | 
                return default; 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    internal class ToUniTaskAsyncEnumerableTask<T> : IUniTaskAsyncEnumerable<T> 
 | 
    { 
 | 
        readonly Task<T> source; 
 | 
  
 | 
        public ToUniTaskAsyncEnumerableTask(Task<T> source) 
 | 
        { 
 | 
            this.source = source; 
 | 
        } 
 | 
  
 | 
        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) 
 | 
        { 
 | 
            return new _ToUniTaskAsyncEnumerableTask(source, cancellationToken); 
 | 
        } 
 | 
  
 | 
        class _ToUniTaskAsyncEnumerableTask : IUniTaskAsyncEnumerator<T> 
 | 
        { 
 | 
            readonly Task<T> source; 
 | 
            CancellationToken cancellationToken; 
 | 
  
 | 
            T current; 
 | 
            bool called; 
 | 
  
 | 
            public _ToUniTaskAsyncEnumerableTask(Task<T> source, CancellationToken cancellationToken) 
 | 
            { 
 | 
                this.source = source; 
 | 
                this.cancellationToken = cancellationToken; 
 | 
  
 | 
                this.called = false; 
 | 
            } 
 | 
  
 | 
            public T Current => current; 
 | 
  
 | 
            public async UniTask<bool> MoveNextAsync() 
 | 
            { 
 | 
                cancellationToken.ThrowIfCancellationRequested(); 
 | 
  
 | 
                if (called) 
 | 
                { 
 | 
                    return false; 
 | 
                } 
 | 
                called = true; 
 | 
  
 | 
                current = await source; 
 | 
                return true; 
 | 
            } 
 | 
  
 | 
            public UniTask DisposeAsync() 
 | 
            { 
 | 
                return default; 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    internal class ToUniTaskAsyncEnumerableUniTask<T> : IUniTaskAsyncEnumerable<T> 
 | 
    { 
 | 
        readonly UniTask<T> source; 
 | 
  
 | 
        public ToUniTaskAsyncEnumerableUniTask(UniTask<T> source) 
 | 
        { 
 | 
            this.source = source; 
 | 
        } 
 | 
  
 | 
        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) 
 | 
        { 
 | 
            return new _ToUniTaskAsyncEnumerableUniTask(source, cancellationToken); 
 | 
        } 
 | 
  
 | 
        class _ToUniTaskAsyncEnumerableUniTask : IUniTaskAsyncEnumerator<T> 
 | 
        { 
 | 
            readonly UniTask<T> source; 
 | 
            CancellationToken cancellationToken; 
 | 
  
 | 
            T current; 
 | 
            bool called; 
 | 
  
 | 
            public _ToUniTaskAsyncEnumerableUniTask(UniTask<T> source, CancellationToken cancellationToken) 
 | 
            { 
 | 
                this.source = source; 
 | 
                this.cancellationToken = cancellationToken; 
 | 
  
 | 
                this.called = false; 
 | 
            } 
 | 
  
 | 
            public T Current => current; 
 | 
  
 | 
            public async UniTask<bool> MoveNextAsync() 
 | 
            { 
 | 
                cancellationToken.ThrowIfCancellationRequested(); 
 | 
  
 | 
                if (called) 
 | 
                { 
 | 
                    return false; 
 | 
                } 
 | 
                called = true; 
 | 
  
 | 
                current = await source; 
 | 
                return true; 
 | 
            } 
 | 
  
 | 
            public UniTask DisposeAsync() 
 | 
            { 
 | 
                return default; 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    internal class ToUniTaskAsyncEnumerableObservable<T> : IUniTaskAsyncEnumerable<T> 
 | 
    { 
 | 
        readonly IObservable<T> source; 
 | 
  
 | 
        public ToUniTaskAsyncEnumerableObservable(IObservable<T> source) 
 | 
        { 
 | 
            this.source = source; 
 | 
        } 
 | 
  
 | 
        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) 
 | 
        { 
 | 
            return new _ToUniTaskAsyncEnumerableObservable(source, cancellationToken); 
 | 
        } 
 | 
  
 | 
        class _ToUniTaskAsyncEnumerableObservable : MoveNextSource, IUniTaskAsyncEnumerator<T>, IObserver<T> 
 | 
        { 
 | 
            static readonly Action<object> OnCanceledDelegate = OnCanceled; 
 | 
  
 | 
            readonly IObservable<T> source; 
 | 
            CancellationToken cancellationToken; 
 | 
  
 | 
  
 | 
            bool useCachedCurrent; 
 | 
            T current; 
 | 
            bool subscribeCompleted; 
 | 
            readonly Queue<T> queuedResult; 
 | 
            Exception error; 
 | 
            IDisposable subscription; 
 | 
            CancellationTokenRegistration cancellationTokenRegistration; 
 | 
  
 | 
            public _ToUniTaskAsyncEnumerableObservable(IObservable<T> source, CancellationToken cancellationToken) 
 | 
            { 
 | 
                this.source = source; 
 | 
                this.cancellationToken = cancellationToken; 
 | 
                this.queuedResult = new Queue<T>(); 
 | 
  
 | 
                if (cancellationToken.CanBeCanceled) 
 | 
                { 
 | 
                    cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(OnCanceledDelegate, this); 
 | 
                } 
 | 
            } 
 | 
  
 | 
            public T Current 
 | 
            { 
 | 
                get 
 | 
                { 
 | 
                    if (useCachedCurrent) 
 | 
                    { 
 | 
                        return current; 
 | 
                    } 
 | 
  
 | 
                    lock (queuedResult) 
 | 
                    { 
 | 
                        if (queuedResult.Count != 0) 
 | 
                        { 
 | 
                            current = queuedResult.Dequeue(); 
 | 
                            useCachedCurrent = true; 
 | 
                            return current; 
 | 
                        } 
 | 
                        else 
 | 
                        { 
 | 
                            return default; // undefined. 
 | 
                        } 
 | 
                    } 
 | 
                } 
 | 
            } 
 | 
  
 | 
            public UniTask<bool> MoveNextAsync() 
 | 
            { 
 | 
                lock (queuedResult) 
 | 
                { 
 | 
                    useCachedCurrent = false; 
 | 
  
 | 
                    if (cancellationToken.IsCancellationRequested) 
 | 
                    { 
 | 
                        return UniTask.FromCanceled<bool>(cancellationToken); 
 | 
                    } 
 | 
  
 | 
                    if (subscription == null) 
 | 
                    { 
 | 
                        subscription = source.Subscribe(this); 
 | 
                    } 
 | 
  
 | 
                    if (error != null) 
 | 
                    { 
 | 
                        return UniTask.FromException<bool>(error); 
 | 
                    } 
 | 
  
 | 
                    if (queuedResult.Count != 0) 
 | 
                    { 
 | 
                        return CompletedTasks.True; 
 | 
                    } 
 | 
  
 | 
                    if (subscribeCompleted) 
 | 
                    { 
 | 
                        return CompletedTasks.False; 
 | 
                    } 
 | 
  
 | 
                    completionSource.Reset(); 
 | 
                    return new UniTask<bool>(this, completionSource.Version); 
 | 
                } 
 | 
            } 
 | 
  
 | 
            public UniTask DisposeAsync() 
 | 
            { 
 | 
                subscription.Dispose(); 
 | 
                cancellationTokenRegistration.Dispose(); 
 | 
                completionSource.Reset(); 
 | 
                return default; 
 | 
            } 
 | 
  
 | 
            public void OnCompleted() 
 | 
            { 
 | 
                lock (queuedResult) 
 | 
                { 
 | 
                    subscribeCompleted = true; 
 | 
                    completionSource.TrySetResult(false); 
 | 
                } 
 | 
            } 
 | 
  
 | 
            public void OnError(Exception error) 
 | 
            { 
 | 
                lock (queuedResult) 
 | 
                { 
 | 
                    this.error = error; 
 | 
                    completionSource.TrySetException(error); 
 | 
                } 
 | 
            } 
 | 
  
 | 
            public void OnNext(T value) 
 | 
            { 
 | 
                lock (queuedResult) 
 | 
                { 
 | 
                    queuedResult.Enqueue(value); 
 | 
                    completionSource.TrySetResult(true); // include callback execution, too long lock? 
 | 
                } 
 | 
            } 
 | 
  
 | 
            static void OnCanceled(object state) 
 | 
            { 
 | 
                var self = (_ToUniTaskAsyncEnumerableObservable)state; 
 | 
                lock (self.queuedResult) 
 | 
                { 
 | 
                    self.completionSource.TrySetCanceled(self.cancellationToken); 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
} 
 |