U
    e=                     @  s  d dl mZ d dlZd dlZd dlmZmZmZ d dlm	Z	m
Z
mZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZ d	d
lmZ d	dlm Z m!Z!m"Z" d	dl#m$Z$ d	dl%m&Z&m'Z' d	dl(m)Z) d	dl*m+Z+ ej,dkr
d dlm-Z-m.Z. nd dl/m-Z-m.Z. edZ0edddZ1e-dZ2ddddddZ3ddddddZ4G dd  d ee1 eZ5G d!d" d"e+Z6G d#d$ d$Z7ed0d&d'd(d)d*d+Z8d,d-d.d/Z9dS )1    )annotationsN)	AwaitableCallable	Generator)FIRST_COMPLETEDFutureThreadPoolExecutorwait)AbstractContextManagercontextmanager)isawaitable)TracebackType)AnyAsyncContextManagerContextManagerGenericIterableTypeVarcastoverload   )
_eventloop)get_async_backendget_cancelled_exc_classthreadlocals)Event)CancelScopecreate_task_group)AsyncBackend)
TaskStatus)      )TypeVarTupleUnpackT_RetvalT_coT)	covariantPosArgsT1Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]]Unpack[PosArgsT]funcargsreturnc                 G  s@   zt j}t j}W n tk
r.   tddY nX |j| ||dS )z
    Call a coroutine function from a worker thread.

    :param func: a coroutine function
    :param args: positional arguments for the callable
    :return: the return value of the coroutine function

    9This function can only be run from an AnyIO worker threadNtoken)r   current_async_backendcurrent_tokenAttributeErrorRuntimeErrorZrun_async_from_threadr+   r,   async_backendr0    r7   5/tmp/pip-unpacked-wheel-7mhetrzx/anyio/from_thread.pyrun&   s    
r9   &Callable[[Unpack[PosArgsT]], T_Retval]c                 G  s@   zt j}t j}W n tk
r.   tddY nX |j| ||dS )z
    Call a function in the event loop thread from a worker thread.

    :param func: a callable
    :param args: positional arguments for the callable
    :return: the return value of the callable

    r.   Nr/   )r   r1   r2   r3   r4   Zrun_sync_from_threadr5   r7   r7   r8   run_sync<   s    
r;   c                   @  sr   e Zd ZU ded< ded< ded< dZded	< d
ddddZddddZddddZdddddddZdS )_BlockingAsyncContextManagerzFuture[T_co]_enter_futurezFuture[bool | None]_exit_futurer   _exit_event)NNNzMtuple[type[BaseException] | None, BaseException | None, TracebackType | None]_exit_exc_infoAsyncContextManager[T_co]BlockingPortal)async_cmportalc                 C  s   || _ || _d S N)	_async_cm_portal)selfrC   rD   r7   r7   r8   __init__Z   s    z%_BlockingAsyncContextManager.__init__bool | Noner-   c              
     s   zt  | _| j I d H }W n0 tk
rL } z| j|  W 5 d }~X Y nX | j| d z| j
 I d H  W 5 | jj| j	 I d H }|  S X  rE   )r   r?   rF   
__aenter__BaseExceptionr=   set_exception
set_result	__aexit__r@   r	   )rH   valueexcresultr7   r7   r8   run_async_cm^   s    z)_BlockingAsyncContextManager.run_async_cmr%   c                 C  s"   t  | _| j| j| _| j S rE   )r   r=   rG   start_task_soonrT   r>   rS   rH   r7   r7   r8   	__enter__u   s    z&_BlockingAsyncContextManager.__enter__type[BaseException] | NoneBaseException | NoneTracebackType | None)&_BlockingAsyncContextManager__exc_type'_BlockingAsyncContextManager__exc_value'_BlockingAsyncContextManager__tracebackr-   c                 C  s&   |||f| _ | j| jj | j S rE   )r@   rG   callr?   setr>   rS   )rH   r[   r\   r]   r7   r7   r8   __exit__z   s    z%_BlockingAsyncContextManager.__exit__N)	__name__
__module____qualname____annotations__r@   rI   rT   rW   r`   r7   r7   r7   r8   r<   R   s   

r<   c                   @  s,   e Zd ZddddZddddd	d
ZdS )_BlockingPortalTaskStatusr   futurec                 C  s
   || _ d S rE   )_future)rH   rg   r7   r7   r8   rI      s    z"_BlockingPortalTaskStatus.__init__NobjectNone)rQ   r-   c                 C  s   | j | d S rE   )rh   rO   )rH   rQ   r7   r7   r8   started   s    z!_BlockingPortalTaskStatus.started)N)ra   rb   rc   rI   rk   r7   r7   r7   r8   re      s   re   c                   @  sh  e Zd ZdZd dddZddddZd ddd	Zd
ddddddZddddZddddZ	d>dddddZ
dddddddd Zdddd!ddd"d#d$Zed%d&d'd(d)d*Zed+d&d'd(d,d*Zdd&d'd(d-d*Zed.d/d%d&d!dd0d1d2Zed.d/d+d&d!dd0d3d2Zd.d/dd&d!dd0d4d2Zd.d/d5d!d!d6d0d7d8Zd9d:d;d<d=Zd.S )?rB   zLAn object that lets external threads run code in an asynchronous event loop.rK   c                 C  s
   t   S rE   )r   Zcreate_blocking_portal)clsr7   r7   r8   __new__   s    zBlockingPortal.__new__rj   c                 C  s&   t  | _t | _t | _t | _d S rE   )		threading	get_ident_event_loop_thread_idr   _stop_eventr   _task_groupr   _cancelled_exc_classrV   r7   r7   r8   rI      s    
zBlockingPortal.__init__c                   s   | j  I d H  | S rE   )rr   rL   rV   r7   r7   r8   rL      s    zBlockingPortal.__aenter__rX   rY   rZ   rJ   )exc_typeexc_valexc_tbr-   c                   s$   |   I d H  | j|||I d H S rE   )stoprr   rP   )rH   rt   ru   rv   r7   r7   r8   rP      s    zBlockingPortal.__aexit__c                 C  s,   | j d krtd| j t kr(tdd S )NzThis portal is not runningz7This method cannot be called from the event loop thread)rp   r4   rn   ro   rV   r7   r7   r8   _check_running   s    
zBlockingPortal._check_runningc                   s   | j  I dH  dS )z#Sleep until :meth:`stop` is called.N)rq   r	   rV   r7   r7   r8   sleep_until_stopped   s    z"BlockingPortal.sleep_until_stoppedFbool)cancel_remainingr-   c                   s$   d| _ | j  |r | jj  dS )a.  
        Signal the portal to shut down.

        This marks the portal as no longer accepting new calls and exits from
        :meth:`sleep_until_stopped`.

        :param cancel_remaining: ``True`` to cancel all the remaining tasks, ``False``
            to let them finish before returning

        N)rp   rq   r_   rr   Zcancel_scopecancel)rH   r{   r7   r7   r8   rw      s    
zBlockingPortal.stopz<Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval]ztuple[Unpack[PosArgsT]]zdict[str, Any]Future[T_Retval])r+   r,   kwargsrg   r-   c           	   
     s   ddd fdd}zzT|||}t |rft , | rF   n
|| |I d H }W 5 Q R X n|}W nd jk
r   |  |  Y nR tk
r } z"| s|| t	|t
s W 5 d }~X Y nX | s|| W 5 d  X d S )Nr}   rj   )fr-   c                   s*   |   r&jd t fkr& j d S rE   )	cancelledrp   rn   ro   r^   r|   )r   ZscoperH   r7   r8   callback   s
    z+BlockingPortal._call_func.<locals>.callback)r   r   r   r|   add_done_callbackrs   set_running_or_notify_cancelrM   rN   
isinstance	ExceptionrO   )	rH   r+   r,   r~   rg   r   Zretval_or_awaitableretvalrR   r7   r   r8   
_call_func   s*    




zBlockingPortal._call_funcri   )r+   r,   r~   namerg   r-   c                 C  s   t dS )a%  
        Spawn a new task using the given callable.

        Implementors must ensure that the future is resolved when the task finishes.

        :param func: a callable
        :param args: positional arguments to be passed to the callable
        :param kwargs: keyword arguments to be passed to the callable
        :param name: name of the task (will be coerced to a string if not ``None``)
        :param future: a future that will resolve to the return value of the callable,
            or the exception raised during its execution

        N)NotImplementedError)rH   r+   r,   r~   r   rg   r7   r7   r8   _spawn_task_from_thread   s    z&BlockingPortal._spawn_task_from_threadr(   r)   r$   r*   c                 G  s   d S rE   r7   rH   r+   r,   r7   r7   r8   r^     s    zBlockingPortal.callr:   c                 G  s   d S rE   r7   r   r7   r7   r8   r^     s    c                 G  s   t t| j|f|  S )a3  
        Call the given function in the event loop thread.

        If the callable returns a coroutine object, it is awaited on.

        :param func: any callable
        :raises RuntimeError: if the portal is not running or if this method is called
            from within the event loop thread

        )r   r$   rU   rS   r   r7   r7   r8   r^     s    N)r   )r+   r,   r   r-   c                G  s   d S rE   r7   rH   r+   r   r,   r7   r7   r8   rU   "  s    zBlockingPortal.start_task_soonc                G  s   d S rE   r7   r   r7   r7   r8   rU   +  s    c                G  s$   |    t }| ||i || |S )a  
        Start a task in the portal's task group.

        The task will be run inside a cancel scope which can be cancelled by cancelling
        the returned future.

        :param func: the target function
        :param args: positional arguments passed to ``func``
        :param name: name of the task (will be coerced to a string if not ``None``)
        :return: a future that resolves with the return value of the callable if the
            task completes successfully, or with the exception raised in the task
        :raises RuntimeError: if the portal is not running or if this method is called
            from within the event loop thread
        :rtype: concurrent.futures.Future[T_Retval]

        .. versionadded:: 3.0

        )rx   r   r   )rH   r+   r   r,   r   r7   r7   r8   rU   4  s    z"Callable[..., Awaitable[T_Retval]]ztuple[Future[T_Retval], Any]c                  s\   ddd fdd}|    t  t }t }|| | ||d|i|| |  fS )a  
        Start a task in the portal's task group and wait until it signals for readiness.

        This method works the same way as :meth:`.abc.TaskGroup.start`.

        :param func: the target function
        :param args: positional arguments passed to ``func``
        :param name: name of the task (will be coerced to a string if not ``None``)
        :return: a tuple of (future, task_status_value) where the ``task_status_value``
            is the value passed to ``task_status.started()`` from within the target
            function
        :rtype: tuple[concurrent.futures.Future[T_Retval], Any]

        .. versionadded:: 3.0

        r}   rj   )rg   r-   c                   sH      sD|  r   n*|  r2 |   ntd} | d S )Nz1Task exited without calling task_status.started())doner   r|   	exceptionrN   r4   )rg   rR   Ztask_status_futurer7   r8   	task_doneh  s    
z,BlockingPortal.start_task.<locals>.task_donetask_status)rx   r   re   r   r   rS   )rH   r+   r   r,   r   r   r   r7   r   r8   
start_taskQ  s    
zBlockingPortal.start_taskrA   zContextManager[T_co])cmr-   c                 C  s
   t || S )a  
        Wrap an async context manager as a synchronous context manager via this portal.

        Spawns a task that will call both ``__aenter__()`` and ``__aexit__()``, stopping
        in the middle until the synchronous context manager exits.

        :param cm: an asynchronous context manager
        :return: a synchronous context manager

        .. versionadded:: 2.1

        )r<   )rH   r   r7   r7   r8   wrap_async_context_manager|  s    z)BlockingPortal.wrap_async_context_manager)F)ra   rb   rc   __doc__rm   rI   rL   rP   rx   ry   rw   r   r   r   r^   rU   r   r   r7   r7   r7   r8   rB      s4   	*!+rB   asynciostrzdict[str, Any] | Nonez$Generator[BlockingPortal, Any, None])backendbackend_optionsr-   c                 #  s   dd fdd}t   td}|jtj|| |d}ztttt  | gtd W n& t	k
rz    
  |
   Y nX   r  }d}z*z
|V  W n t	k
r   d	} Y nX W 5 z||j| W n tk
r   Y nX X |  W 5 Q R X d
S )a|  
    Start a new event loop in a new thread and run a blocking portal in its main task.

    The parameters are the same as for :func:`~anyio.run`.

    :param backend: name of the backend
    :param backend_options: backend options
    :return: a context manager that yields a blocking portal

    .. versionchanged:: 3.0
        Usage as a context manager is now required.

    rj   rK   c               
     sD   t  4 I d H &}   r0 |  |  I d H  W 5 Q I d H R X d S rE   )rB   r   rO   ry   )Zportal_rf   r7   r8   
run_portal  s    
z)start_blocking_portal.<locals>.run_portalr   )r   r   )Zreturn_whenFTN)r   r   Zsubmitr   r9   r	   r   r   r   rM   r|   r   rS   r^   rw   r4   )r   r   r   executorZ
run_futurerD   Zcancel_remaining_tasksr7   rf   r8   start_blocking_portal  s>    


r   rj   rK   c                  C  s6   z
t j} W n tk
r(   tddY nX |   dS )aa  
    Check if the cancel scope of the host task's running the current worker thread has
    been cancelled.

    If the host task's current cancel scope has indeed been cancelled, the
    backend-specific cancellation exception will be raised.

    :raises RuntimeError: if the current thread was not spawned by
        :func:`.to_thread.run_sync`

    r.   N)r   r1   r3   r4   check_cancelled)r6   r7   r7   r8   r     s    
r   )r   N):
__future__r   sysrn   collections.abcr   r   r   concurrent.futuresr   r   r   r	   
contextlibr
   r   inspectr   typesr   typingr   r   r   r   r   r   r   r   Z_corer   Z_core._eventloopr   r   r   Z_core._synchronizationr   Z_core._tasksr   r   abcr   Z
abc._tasksr   version_infor"   r#   Ztyping_extensionsr$   r%   r'   r9   r;   r<   re   rB   r   r   r7   r7   r7   r8   <module>   s@   (3     :