
    leJiF,                        d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZ G d d          Zi Zd Z ed           G d d                      Z G d de          Z ed           G d de                      Z ed           G d de                      Z G d d          Z G d d          ZdS )z$Async I/O backend support utilities.    N)deque)Empty)sleep)WeakKeyDictionary)detect_environment)states)TimeoutError)THREAD_TIMEOUT_MAXz@Celery must be restarted because a shutdown signal was detected.)AsyncBackendMixinBaseResultConsumerDrainerregister_drainerc                   ,    e Zd ZdZd Zd Zd ZddZdS )EventletAdaptedEventzq
    An adapted eventlet event, designed to match the API of `threading.Event` and
    `gevent.event.Event`.
    c                 @    dd l }|                                | _        d S Nr   )eventletEventevt)selfr   s     b/var/www/html/movieo_spanner_bot/venv/lib/python3.11/site-packages/celery/backends/asynchronous.py__init__zEventletAdaptedEvent.__init__    s     >>##    c                 4    | j                                         S N)r   readyr   s    r   is_setzEventletAdaptedEvent.is_set$   s    x~~r   c                 4    | j                                         S r   )r   sendr   s    r   setzEventletAdaptedEvent.set'   s    x}}r   Nc                 6    | j                             |          S r   )r   waitr   timeouts     r   r#   zEventletAdaptedEvent.wait*   s    x}}W%%%r   r   )__name__
__module____qualname____doc__r   r   r!   r#    r   r   r   r      s_         
$ $ $       & & & & & &r   r   c                       fd}|S )z5Decorator used to register a new result drainer type.c                     | t           <   | S r   )drainers)clsnames    r   _innerz register_drainer.<locals>._inner3   s    
r   r*   )r/   r0   s   ` r   r   r   1   s#         Mr   defaultc                   :    e Zd ZdZd Zd Zd Zd
dZddZd	 Z	dS )r   zResult draining service.c                     || _         d S r   )result_consumer)r   r4   s     r   r   zDrainer.__init__=   s    .r   c                     d S r   r*   r   s    r   startzDrainer.start@       r   c                     d S r   r*   r   s    r   stopzDrainer.stopC   r7   r   N   c              #   6  K   |p| j         j        }t          j                    }	 |r-t          j                    |z
  |k    rt	          j                    	 |                     |||          V  n# t          j        $ r Y nw xY w|r
 |             |j        rd S v)Nr:   r%   )r4   drain_eventstime	monotonicsocketr%   wait_forr   )r   pr%   intervalon_intervalr#   
time_starts          r   drain_events_untilzDrainer.drain_events_untilF   s      8t+8^%%
	 '4>++j8GCCn&&&mmAtXm>>>>>>>    w 	s   A0 0BBc                      ||           d S Nr<   r*   r   rB   r#   r%   s       r   rA   zDrainer.wait_forW   s    Wr   c                 (    t          j                    S r   )	threadingr   r   s    r   _eventzDrainer._eventZ   s       r   )Nr:   NNr   )
r&   r'   r(   r)   r   r6   r9   rF   rA   rL   r*   r   r   r   r   9   s~        ""/ / /       "   ! ! ! ! !r   r   c                   T     e Zd ZdZdZdZdZd Z fdZd Z	d Z
d Zd	dZd Z xZS )
greenletDrainerNc                 j    | j                                          |                                 | _         d S r   )_drain_complete_eventr!   rL   r   s    r   _send_drain_complete_eventz*greenletDrainer._send_drain_complete_eventd   s-    "&&(((%)[[]]"""r   c                     t                      j        |i | |                                 | _        |                                 | _        |                                 | _        |                                 | _        d S r   )superr   rL   _started_stopped	_shutdownrP   )r   argskwargs	__class__s      r   r   zgreenletDrainer.__init__h   sb    $)&)))%)[[]]"""r   c                    | j                                          	 | j                                        s_	 | j                            d           |                                  n# t          j        $ r Y nw xY w| j                                        _n# t          $ r}|| _
         d }~ww xY w|                                  	 | j                                         d S # t          $ r"}t          j        d|            Y d }~d S d }~ww xY w# |                                  	 | j                                         w # t          $ r!}t          j        d|            Y d }~w d }~ww xY wxY w)Nr:   r<   zFailed to set shutdown event: )rT   r!   rU   r   r4   r=   rQ   r@   r%   	Exception_excrV   RuntimeErrorloggingerror)r   es     r   runzgreenletDrainer.runp   s   	Dm**,, (55a5@@@335555~   D	 m**,,   	 	 	DI	 ++---D""$$$$$ D D DBqBBCCCCCCCCCD ++---D""$$$$ D D DBqBBCCCCCCCCDs   B /A% $B %A74B 6A77B D 
B+B&&B++D C 
D
(DD
E+#D=<E+=
E(E#E+#E((E+c                     |                                   | j                                        s:|                     | j                  | _        | j                                         d S d S r   )_ensure_not_shut_downrT   r   spawnra   _gr#   r   s    r   r6   zgreenletDrainer.start   sc    ""$$$}##%% 	!jj**DGM     	! 	!r   c                 v    | j                                          | j                            t                     d S r   )rU   r!   rV   r#   r
   r   s    r   r9   zgreenletDrainer.stop   s3    ./////r   c                     |                                   |j        s1| j                            |           |                                  d S d S rH   )r6   r   rP   r#   rc   rI   s       r   rA   zgreenletDrainer.wait_for   sU    

w 	)&++G+<<<&&(((((	) 	)r   c                 |    | j                                         r"| j        | j        t          t                    dS )aJ  Currently used to ensure the drainer has not run to completion.

        Raises if the shutdown event has been signaled (either due to an exception
        or stop() being called).

        The _shutdown event acts as synchronization to ensure _exc is properly
        set before it is read from, avoiding need for locks.
        N)rV   r   r\   r[   E_CELERY_RESTART_REQUIREDr   s    r   rc   z%greenletDrainer._ensure_not_shut_down   sB     >  "" 	;y$i 9:::		; 	;r   r   )r&   r'   r(   rd   r\   re   rP   rQ   r   ra   r6   r9   rA   rc   __classcell__)rY   s   @r   rN   rN   ^   s        ED	B 3 3 33 3 3 3 3D D D(! ! !0 0 0) ) ) ); ; ; ; ; ; ;r   rN   r   c                       e Zd Zd Zd ZdS )eventletDrainerc                 B    ddl m}m}  ||          } |d           |S )Nr   )r   rd   )r   r   rd   )r   funcr   rd   gs        r   rd   zeventletDrainer.spawn   s9    ))))))))E$KKar   c                     t                      S r   )r   r   s    r   rL   zeventletDrainer._event   s    #%%%r   Nr&   r'   r(   rd   rL   r*   r   r   rl   rl      s2          & & & & &r   rl   geventc                       e Zd Zd Zd ZdS )geventDrainerc                 b    dd l }|                    |          }|                    d           |S r   )rr   rd   r   )r   rn   rr   ro   s       r   rd   zgeventDrainer.spawn   s0    LLQr   c                 "    ddl m}  |            S )Nr   )r   )gevent.eventr   )r   r   s     r   rL   zgeventDrainer._event   s    &&&&&&uwwr   Nrq   r*   r   r   rt   rt      s2              r   rt   c                   z    e Zd ZdZd ZddZddZd ZddZdd	Z	d
 Z
d Zd Z	 ddZ	 ddZed             ZdS )r   z.Mixin for backends that enables the async API.c                 $    || j         j        |<   d S r   )r4   buckets)r   resultbuckets      r   _collect_intozAsyncBackendMixin._collect_into   s    /5$V,,,r   Tc              +   D  K   |                                   |j        }|st                      t                      }|D ][}t	          |d          s|                    |           (|j        r|                    |           E|                     ||           \ | j        |fd|i|D ]K}|rG|	                                }t	          |d          s|j
        |j        fV  n|j
        |j        fV  |GL|r(|	                                }|j
        |j        fV  |&d S d S )N_cacheno_ack)_ensure_not_eagerresultsStopIterationr   hasattrappendr   r}   _wait_for_pendingpopleftidchildren)r   r{   r   rX   r   r|   node_s           r   iter_nativezAsyncBackendMixin.iter_native   s        . 	"//!  	1 	1D4** 1d#### 1d####""40000''HHvHHH 	/ 	/A /~~''tX.. /'4=00000'4;....  /  	'>>##D'4;&&&&  	' 	' 	' 	' 	'r   Fc                     |r| j         j                                         	 |                     |           n-# t          $ r  |                     |j        ||           Y nw xY w|S )N)weak)r4   drainerr6   _maybe_resolve_from_bufferr   _add_pending_resultr   )r   r{   r   start_drainers       r   add_pending_resultz$AsyncBackendMixin.add_pending_result   s     	1 (..000	C++F3333 	C 	C 	C$$VYT$BBBBB	Cs   8 'A"!A"c                 j    |                     | j                            |j                             d S r   )_maybe_set_cache_pending_messagestaker   r   r{   s     r   r   z,AsyncBackendMixin._maybe_resolve_from_buffer   s/     6 ; ;FI F FGGGGGr   c                     | j         \  }}||vr.|j        |vr'||r|n||<   | j                            |           d S d S d S r   )_pending_resultsr   r4   consume_from)r   task_idr{   r   concreteweak_s         r   r   z%AsyncBackendMixin._add_pending_result   sc    /%%FIX$=$=5;d(UU'2 --g66666  $=$=r   c                 `      j         j                                          fd|D             S )Nc                 @    g | ]}                     |d           S )F)r   r   )r   ).0r{   r   r   s     r   
<listcomp>z9AsyncBackendMixin.add_pending_results.<locals>.<listcomp>   s>     ' ' ' ''T'OO ' ' 'r   )r4   r   r6   )r   r   r   s   ` `r   add_pending_resultsz%AsyncBackendMixin.add_pending_results   sL    $**,,,' ' ' ' '%' ' ' 	'r   c                 d    |                      |j                   |                     |           |S r   )_remove_pending_resultr   on_result_fulfilledr   s     r   remove_pending_resultz'AsyncBackendMixin.remove_pending_result   s1    ##FI...  (((r   c                 F    | j         D ]}|                    |d            d S r   )r   popr   r   mappings      r   r   z(AsyncBackendMixin._remove_pending_result   s5    , 	' 	'GKK&&&&	' 	'r   c                 D    | j                             |j                   d S r   )r4   
cancel_forr   r   s     r   r   z%AsyncBackendMixin.on_result_fulfilled  s!    ''	22222r   Nc                 z    |                                    | j        |fi |D ]}|                    ||          S )N)callback	propagate)r   r   maybe_throw)r   r{   r   r   rX   r   s         r   wait_for_pendingz"AsyncBackendMixin.wait_for_pending  sT       ''99&99 	 	A!!8y!IIIr   c                 0     | j         j        |f|||d|S )N)r%   rD   
on_message)r4   r   )r   r{   r%   rD   r   rX   s         r   r   z#AsyncBackendMixin._wait_for_pending  s=     6t#5
##

 
 
 
 	
r   c                     dS NTr*   r   s    r   is_asynczAsyncBackendMixin.is_async  s    tr   )T)FT)Fr   NNN)r&   r'   r(   r)   r}   r   r   r   r   r   r   r   r   r   r   propertyr   r*   r   r   r   r      s       886 6 6' ' ' ':   H H H7 7 7 7' ' ' '
  
' ' '3 3 3 37J J J J FJ
 
 
 
   X  r   r   c                   p    e Zd ZdZd Zd Zd ZddZd Zd Z	d	 Z
d
 ZddZ	 ddZddZd Zd Zd ZdS )r   z2Manager responsible for consuming result messages.c                     || _         || _        || _        || _        || _        d | _        t                      | _        t          t                               |           | _
        d S r   )backendappacceptr   r   r   r   rz   r-   r   r   )r   r   r   r   pending_resultspending_messagess         r   r   zBaseResultConsumer.__init__  sZ     /!1(** 2 4 45d;;r   c                     t                      r   NotImplementedError)r   initial_task_idrX   s      r   r6   zBaseResultConsumer.start*      !###r   c                     d S r   r*   r   s    r   r9   zBaseResultConsumer.stop-  r7   r   Nc                     t                      r   r   r$   s     r   r=   zBaseResultConsumer.drain_events0  r   r   c                     t                      r   r   r   r   s     r   r   zBaseResultConsumer.consume_from3  r   r   c                     t                      r   r   r   s     r   r   zBaseResultConsumer.cancel_for6  r   r   c                     | j                                          t                      | _         d | _        |                                  d S r   )rz   clearr   r   on_after_forkr   s    r   _after_forkzBaseResultConsumer._after_fork9  sB    (**r   c                     d S r   r*   r   s    r   r   z BaseResultConsumer.on_after_fork?  r7   r   c                 <    | j                             |||          S )Nr%   rD   )r   rF   )r   rB   r%   rD   s       r   rF   z%BaseResultConsumer.drain_events_untilB  s(    |..wK / 9 9 	9r   c              +     K    | j         |fd|i| | j        |c}| _        	 |                     |j        ||          D ]}d V  t	          d           n"# t
          j        $ r t          d          w xY w	 || _        d S # || _        w xY w)Nr%   r   r   zThe operation timed out.)on_wait_for_pendingr   rF   on_readyr   r@   r%   r	   )r   r{   r%   rD   r   rX   	prev_on_mr   s           r   r   z$BaseResultConsumer._wait_for_pendingF  s       	! CCCFCCC%)_j"	4?		(,,OW + - - -   a	
 ~ 	; 	; 	;9:::	; (DOOOiDO''''s   3A B A66B 	Bc                     d S r   r*   )r   r{   r%   rX   s       r   r   z&BaseResultConsumer.on_wait_for_pendingV  r7   r   c                 <    |                      |j        |           d S r   )on_state_changepayload)r   messages     r   on_out_of_band_resultz(BaseResultConsumer.on_out_of_band_resultY  s     W_g66666r   c                 h    | j         D ]}	 ||         c S # t          $ r Y w xY wt          |          r   )r   KeyErrorr   s      r   _get_pending_resultz&BaseResultConsumer._get_pending_result\  sX    , 	 	Gw''''   ws   
""c                    | j         r|                      |           |d         t          j        v r|d         }	 |                     |          }|                    |           | j        }	 |                    |          }|                    |           n;# t          $ r Y n/w xY w# t          $ r | j	        
                    ||           Y nw xY wt          d           d S )Nstatusr   r   )r   r   READY_STATESr   r   rz   r   r   r   r   putr   )r   metar   r   r{   rz   r|   s          r   r   z"BaseResultConsumer.on_state_changed  s   ? 	"OOD!!!>V0009oG*11':: ''---,*$[[00F
 MM&))))	     D  : : : &**7D99999: 	as#   B' ,B 
B$#B$'%CCr   )NNr   )r&   r'   r(   r)   r   r6   r9   r=   r   r   r   r   rF   r   r   r   r   r   r*   r   r   r   r     s       <<	< 	< 	<$ $ $  $ $ $ $$ $ $$ $ $    9 9 9 9
 FJ( ( ( (    7 7 7         r   r   )r)   r^   r@   rK   r>   collectionsr   queuer   r   weakrefr   kombu.utils.compatr   celeryr   celery.exceptionsr	   celery.utils.threadsr
   ri   __all__r   r-   r   r   rN   rl   rt   r   r   r*   r   r   <module>r      sr   * *                          % % % % % % 1 1 1 1 1 1       * * * * * * 3 3 3 3 3 3^ & & & & & & & &(    )!! !! !! !! !! !! !! !!HE; E; E; E; E;g E; E; E;P *	& 	& 	& 	& 	&o 	& 	& 	& (
 
 
 
 
O 
 
 
X X X X X X X Xv^ ^ ^ ^ ^ ^ ^ ^ ^ ^r   