
    keJiTH                        d dl Z d dlZd dlmZmZmZmZmZmZm	Z	 d dl
mZ d dlmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZmZ d dlmZ d dlm Z! d dl"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z*  ej+        e,          Z-e* G d dee                      Z.defdZ/ G d dee          Z0 G d d          Z1dS )    N)Any	AwaitableCallable	CoroutineListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODMultiDbConfig)AsyncDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)BackgroundScheduler)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)NoValidDatabaseExceptionUnhealthyDatabaseException)ChannelT
EncodableTKeyT)experimentalc                      e Zd ZdZdefdZd,dZd Zd Zde	fd	Z
d
eddfdZd
efdZdedefdZd
efdZd
edefdZdefdZdefdZd Zd Zdddddedgeeee         f         f         dedee         d ed!ee         f
d"Z d# Z!	 d-d$eee"ge#eedf         f                  fd%Z$d
edefd&Z%d'e&d(e'd)e'fd*Z(d+ Z)dS ).MultiDBClientzx
    Client that operates on multiple logical Redis databases.
    Should be used in Active-Active database setups.
    configc           
      x   |                                 | _        |j        s|                                n|j        | _        |j        | _        |j                            |j	        |j
                  | _        |j        s|                                n|j        | _        |j        |                                n|j        | _        | j                            | j                   |j        | _        |j        | _        |j        | _        | j                            t4          g           t7          | j        | j        | j        | j        |j        |j        | j        | j                  | _        d| _        tA          j!                    | _"        tG                      | _$        || _%        d | _&        g | _'        d | _(        d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF))r#   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_delay_health_check_policyr"   default_failure_detectors_failure_detectorsr%   default_failover_strategy_failover_strategyset_databasesr)   _auto_fallback_intervalr(   _event_dispatcherr$   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r&   r'   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr    s     b/var/www/html/movieo_spanner_bot/venv/lib/python3.11/site-packages/redis/asyncio/multidb/client.py__init__zMultiDBClient.__init__   s    **,, '&F((***% 	 '-&B#7=7Q7W7W&(A8
 8
!
 +*F,,...) 	 '/ ,,...) 	
 	--do>>>'-'D$!'!8$2335K4LMMM 6"5o-"5$6!0!3#'#?	!
 	!
 	!
 !022"&%)"""    rI   returnc                 L   K   | j         s|                                  d {V  | S N)r@   
initializerI   s    rJ   
__aenter__zMultiDBClient.__aenter__K   s8       	$//#########rL   c                    K   | j         r| j                                          | j        r| j                                         | j        D ]}|                                 d S rO   )rF   cancelrH   rG   )rI   exc_type	exc_value	tracebackhc_tasks        rJ   	__aexit__zMultiDBClient.__aexit__P   st      " 	-#**,,,% 	0&--///~ 	 	GNN	 	rL   c                   K   d }|                      |           d{V  t          j        | j                            | j        | j                             | _        d}| j        D ]b\  }}|j        	                    | j
                   |j        j        t          j        k    r$|s"| j                            |           d{V  d}c|st!          d          d| _        dS )zT
        Perform initialization of databases to define their initial state.
        c                 
   K   | rO    )errors    rJ   raise_exception_on_failed_hcz>MultiDBClient.initialize.<locals>.raise_exception_on_failed_hc]   s      KrL   )on_errorNFTz4Initial connection failed - no active database found)_check_databases_healthrA   create_taskrD   run_recurring_asyncr/   rF   r*   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr?   set_active_databaser   r@   )rI   r^   is_active_db_founddatabaseweights        rJ   rP   zMultiDBClient.initializeX   s*     
	 	 	 **4P*QQQQQQQQQ #*"522+, #
 #
 # $ 	* 	*Hf--d.TUUU %77@R7+??IIIIIIIII%)"! 	*F    rL   c                     | j         S )zE
        Returns a sorted (by weight) list of all databases.
        )r*   rQ   s    rJ   get_databaseszMultiDBClient.get_databases}   s     rL   rk   Nc                 l  K   d}| j         D ]\  }}||k    rd} n|st          d          |                     |           d{V  |j        j        t
          j        k    rE| j                             d          d         \  }}| j        	                    |           d{V  dS t          d          )zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r*   
ValueError_check_db_healthrc   rf   rg   rh   	get_top_nr?   ri   r   )rI   rk   existsexisting_db_highest_weighted_dbs         rJ   ri   z!MultiDBClient.set_active_database   s       "o 	 	NKh&& '  	PNOOO##H---------!W^33%)_%>%>q%A%A!%D"';;HEEEEEEEEEF&?
 
 	
rL   c                 B  K   | j         D ]\  }}||k    rt          d          |                     |           d{V  | j                             d          d         \  }}| j                             ||j                   |                     ||           d{V  dS )z;
        Adds a new database to the database list.
        zGiven database already existsNrq   r   )r*   rr   rs   rt   addrl   _change_active_database)rI   rk   rv   rw   rx   highest_weights         rJ   add_databasezMultiDBClient.add_database   s       #o 	B 	BNKh&& !@AAA ' ##H---------.2o.G.G.J.J1.M+^Hho666**85HIIIIIIIIIIIrL   new_databasehighest_weight_databasec                    K   |j         |j         k    r<|j        j        t          j        k    r$| j                            |           d {V  d S d S d S rO   )rl   rc   rf   rg   rh   r?   ri   )rI   r~   r   s      rJ   r{   z%MultiDBClient._change_active_database   sk       "9"@@@$*gn<<';;LIIIIIIIIIII A@<<rL   c                   K   | j                             |          }| j                             d          d         \  }}||k    r<|j        j        t
          j        k    r$| j                            |           d{V  dS dS dS )z<
        Removes a database from the database list.
        rq   r   N)	r*   removert   rc   rf   rg   rh   r?   ri   )rI   rk   rl   rx   r|   s        rJ   remove_databasezMultiDBClient.remove_database   s       ''11.2o.G.G.J.J1.M+^ f$$#+1W^CC';;<OPPPPPPPPPPP %$CCrL   rl   c                    K   d}| j         D ]\  }}||k    rd} n|st          d          | j                             d          d         \  }}| j                             ||           ||_        |                     ||           d{V  dS )z<
        Updates a database from the database list.
        NTrp   rq   r   )r*   rr   rt   update_weightrl   r{   )rI   rk   rl   ru   rv   rw   rx   r|   s           rJ   update_database_weightz$MultiDBClient.update_database_weight   s       "o 	 	NKh&& '  	PNOOO.2o.G.G.J.J1.M+^%%h777 **85HIIIIIIIIIIIrL   failure_detectorc                 :    | j                             |           dS )z>
        Adds a new failure detector to the database.
        N)r6   append)rI   r   s     rJ   add_failure_detectorz"MultiDBClient.add_failure_detector   s"     	&&'788888rL   healthcheckc                    K   | j         4 d{V  | j                            |           ddd          d{V  dS # 1 d{V swxY w Y   dS )z:
        Adds a new health check to the database.
        N)rC   r-   r   )rI   r   s     rJ   add_health_checkzMultiDBClient.add_health_check   s       = 	4 	4 	4 	4 	4 	4 	4 	4&&{333	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4s   >
AAc                 x   K   | j         s|                                  d{V   | j        j        |i | d{V S )zB
        Executes a single command and return its result.
        N)r@   rP   r?   execute_commandrI   argsoptionss      rJ   r   zMultiDBClient.execute_command   sb        	$//#########:T*:DLGLLLLLLLLLrL   c                      t          |           S )z:
        Enters into pipeline mode of the client.
        )PipelinerQ   s    rJ   pipelinezMultiDBClient.pipeline   s     ~~rL   F
shard_hintvalue_from_callablewatch_delayfuncr   watchesr   r   r   c                   K   | j         s|                                  d{V   | j        j        |g|R |||d d{V S )z3
        Executes callable as transaction.
        Nr   )r@   rP   r?   execute_transaction)rI   r   r   r   r   r   s         rJ   transactionzMultiDBClient.transaction   s        	$//#########>T*>

 
 " 3#
 
 
 
 
 
 
 
 
 	
rL   c                 b   K   | j         s|                                  d{V  t          | fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)r@   rP   PubSub)rI   kwargss     rJ   pubsubzMultiDBClient.pubsub  sK        	$//#########d%%f%%%rL   r_   c                    K   	  fd j         D              _        t          j        t          j         j        ddi j                   d{V }n'# t          j        $ r t          j        d          w xY w|D ]g}t          |t                    rP|j	        }t          j        |j        _        t                              d|j                   |r ||j                   hdS )	zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        c                 d    g | ],\  }}t          j                            |                    -S r\   )rA   ra   rs   ).0rk   rw   rI   s      rJ   
<listcomp>z9MultiDBClient._check_databases_health.<locals>.<listcomp>  sF       Ha #D$9$9($C$CDD  rL   return_exceptionsT)timeoutNz4Health check execution exceeds health_check_intervalz%Health check failed, due to exception)exc_info)r*   rG   rA   wait_forgatherr/   TimeoutError
isinstancer   rk   rg   OPENrc   rf   logger	exceptionoriginal_exception)rI   r_   resultsresultunhealthy_dbs   `    rJ   r`   z%MultiDBClient._check_databases_health  sS     	   #'?  DN $,^&*  3        GG # 	 	 	&F  	
  	8 	8F&"<== 
8%-4\$*  ;#6 !   
  8HV6777	8 	8s   AA $A7c                 "  K   | j                             | j        |           d{V }|s2|j        j        t
          j        k    rt
          j        |j        _        |S |r0|j        j        t
          j        k    rt
          j        |j        _        |S )zO
        Runs health checks on the given database until first failure.
        N)r4   executer-   rc   rf   rg   r   rh   )rI   rk   
is_healthys      rJ   rs   zMultiDBClient._check_db_health6  s      
  4<<
 
 
 
 
 
 
 

  	4%55)0 & 	4H,2gnDD%,^H"rL   rc   	old_state	new_statec                 <   t          j                    }|t          j        k    r3t          j        |                     |j                            | _        d S |t          j        k    r3|t          j	        k    r%|
                    t          t          |           d S d S d S rO   )rA   get_running_looprg   	HALF_OPENra   rs   rk   rH   rh   r   
call_laterr   _half_open_circuit)rI   rc   r   r   loops        rJ   re   z/MultiDBClient._on_circuit_state_change_callbackH  s     '))))))0)<%%g&677* *D& F&&9+D+DOO02DgNNNNN '&+D+DrL   c                 x   K   | j         j        r+| j         j        j                                         d {V  d S d S rO   )r?   active_databaseclientacloserQ   s    rJ   r   zMultiDBClient.acloseV  sT       0 	H'7>EEGGGGGGGGGGG	H 	HrL   )rI   r   rM   r   rO   )*__name__
__module____qualname____doc__r   rK   rR   rY   rP   r   rn   r   ri   r}   r{   r   floatr   r   r   r   r   r   r   r   r	   r   r   r   r   strboolr   r   	Exceptionr   r`   rs   r   rg   re   r   r\   rL   rJ   r   r      s        
,*} ,* ,* ,* ,*\   
  #  #  # Jy    
- 
D 
 
 
 
2J= J J J JJ)JDQJ J J JQm Q Q Q QJ] JE J J J J&95I 9 9 9 94+ 4 4 4 4M M M   %)$)'+
 
 

|U3	#+>%??@
 
 SM	

 "
 e_
 
 
 
,	& 	& 	& PT$8 $88YK3T>1J$JKL$8 $8 $8 $8L}     $O%O29OFMO O O OH H H H HrL   r   rc   c                 (    t           j        | _        d S rO   )rg   r   rf   )rc   s    rJ   r   r   [  s    %GMMMrL   c                       e Zd ZdZdefdZddZd Zd Zd	 Z	de
fd
ZdefdZddZddZddZd Zdee         fdZdS )r   zG
    Pipeline implementation for multiple logical Redis databases.
    r   c                 "    g | _         || _        d S rO   )_command_stack_client)rI   r   s     rJ   rK   zPipeline.__init__d  s     rL   rI   rM   c                 
   K   | S rO   r\   rQ   s    rJ   rR   zPipeline.__aenter__h        rL   c                    K   |                                   d {V  | j                            |||           d {V  d S rO   )resetr   rY   rI   rU   rV   rW   s       rJ   rY   zPipeline.__aexit__k  sX      jjlll$$Xy)DDDDDDDDDDDrL   c                 N    |                                                                  S rO   )_async_self	__await__rQ   s    rJ   r   zPipeline.__await__o  s     !!++---rL   c                 
   K   | S rO   r\   rQ   s    rJ   r   zPipeline._async_selfr  r   rL   c                 *    t          | j                  S rO   )lenr   rQ   s    rJ   __len__zPipeline.__len__u  s    4&'''rL   c                     dS )z1Pipeline instances should always evaluate to TrueTr\   rQ   s    rJ   __bool__zPipeline.__bool__x  s    trL   Nc                    K   g | _         d S rO   )r   rQ   s    rJ   r   zPipeline.reset|  s       rL   c                 >   K   |                                   d{V  dS )zClose the pipelineN)r   rQ   s    rJ   r   zPipeline.aclose  s,      jjllrL   c                 >    | j                             ||f           | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   s      rJ   pipeline_execute_commandz!Pipeline.pipeline_execute_command  s$     	""D'?333rL   c                      | j         |i |S )zAdds a command to the stack)r   rI   r   r   s      rJ   r   zPipeline.execute_command  s    ,t,d=f===rL   c                 @  K   | j         j        s| j                                          d{V  	 | j         j                            t          | j                             d{V 	 |                                  d{V  S # |                                  d{V  w xY w)z0Execute all the commands in the current pipelineN)r   r@   rP   r?   execute_pipelinetupler   r   rQ   s    rJ   r   zPipeline.execute  s      |' 	,,))+++++++++	6GGd)**         **,,$**,,s   6B B)rI   r   rM   r   rM   N)rM   r   )r   r   r   r   r   rK   rR   rY   r   r   intr   r   r   r   r   r   r   r   r   r   r\   rL   rJ   r   r   _  s        }       E E E. . .  ( ( ( ( ($    ! ! ! !      > > >
tCy 
 
 
 
 
 
rL   r   c                       e Zd ZdZdefdZddZddZd Ze	de
fd	            Zd
efdZd
edefdZd
efdZd
edefdZd Z	 dde
dee         fdZddddeddfdZdS )r   z2
    PubSub object for multi database client.
    r   c                 B    || _          | j         j        j        di | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr\   )r   r?   r   )rI   r   r   s      rJ   rK   zPubSub.__init__  s/     ,%,66v66666rL   rM   c                 
   K   | S rO   r\   rQ   s    rJ   rR   zPubSub.__aenter__  r   rL   Nc                 >   K   |                                   d {V  d S rO   )r   r   s       rJ   rY   zPubSub.__aexit__  s,      kkmmrL   c                 P   K   | j         j                            d           d {V S )Nr   r   r?   execute_pubsub_methodrQ   s    rJ   r   zPubSub.aclose  s1      \2HHRRRRRRRRRrL   c                 .    | j         j        j        j        S rO   )r   r?   active_pubsub
subscribedrQ   s    rJ   r   zPubSub.subscribed  s    |,:EErL   r   c                 B   K    | j         j        j        dg|R   d {V S )Nr   r   rI   r   s     rJ   r   zPubSub.execute_command  sP      HT\2H
 $
 
 
 
 
 
 
 
 
 	
rL   r   c                 H   K    | j         j        j        dg|R i | d{V S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr   r   s      rJ   r   zPubSub.psubscribe  s`       IT\2H

 
 
#)
 
 
 
 
 
 
 
 	
rL   c                 B   K    | j         j        j        dg|R   d{V S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr   r   s     rJ   r   zPubSub.punsubscribe  sS      
 IT\2H
!
 
 
 
 
 
 
 
 
 	
rL   c                 H   K    | j         j        j        dg|R i | d{V S )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr   r   s      rJ   r   zPubSub.subscribe  s`       IT\2H

 
 
"(
 
 
 
 
 
 
 
 	
rL   c                 B   K    | j         j        j        dg|R   d{V S )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr   r   s     rJ   r   zPubSub.unsubscribe  sS      
 IT\2H
 
 
 
 
 
 
 
 
 
 	
rL   F        ignore_subscribe_messagesr   c                 V   K   | j         j                            d||           d{V S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r   r   Nr   )rI   r   r   s      rJ   r  zPubSub.get_message  sR       \2HH&? I 
 
 
 
 
 
 
 
 	
rL   g      ?)exception_handlerpoll_timeoutr  c                V   K   | j         j                            |||            d{V S )a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )
sleep_timer  r   N)r   r?   execute_pubsub_run)rI   r  r  s      rJ   runz
PubSub.run  sR      & \2EE#7HQU F 
 
 
 
 
 
 
 
 	
rL   )rM   r   r   )Fr   )r   r   r   r   r   rK   rR   rY   r   propertyr   r   r   r   r   r
   r   r   r   r   r   r   r   r  r  r\   rL   rJ   r   r     s        	7} 	7 	7 	7 	7      S S S FD F F F XF
: 
 
 
 



h 

- 

 

 

 


 
 
 
 


X 

 

 

 

 


 
 
 SV
 
)-
@H
 
 
 
& !	
 
 
 	

 

 
 
 
 
 
rL   r   )2rA   loggingtypingr   r   r   r   r   r   r	   redis.asyncio.clientr
   &redis.asyncio.multidb.command_executorr   redis.asyncio.multidb.configr   r   redis.asyncio.multidb.databaser   r   &redis.asyncio.multidb.failure_detectorr   !redis.asyncio.multidb.healthcheckr   r   redis.backgroundr   redis.commandsr   r   redis.multidb.circuitr   r   rg   redis.multidb.exceptionr   r   redis.typingr   r   r   redis.utilsr   	getLoggerr   r   r   r   r   r   r\   rL   rJ   <module>r     sL     M M M M M M M M M M M M M M M M M M . . . . . . I I I I I I L L L L L L L L C C C C C C C C G G G G G G L L L L L L L L 0 0 0 0 0 0 F F F F F F F F 0 0 0 0 0 0 2 2 2 2 2 2 X X X X X X X X 3 3 3 3 3 3 3 3 3 3 $ $ $ $ $ $		8	$	$ AH AH AH AH AH,.? AH AH AHH
& & & & &A A A A A'): A A AHq
 q
 q
 q
 q
 q
 q
 q
 q
 q
rL   