
    keJiH                        d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZmZmZ d dlm Z m!Z! d dl"m#Z# d dl$m%Z%m&Z& d dl'm(Z(  e j)        e*          Z+e( G d dee                      Z,defdZ- G d dee          Z. G d d          Z/dS )    N)as_completed)ThreadPoolExecutor)AnyCallableListOptional)BackgroundScheduler)PubSubWorkerThread)CoreCommandsRedisModuleCommands)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODMultiDbConfig)Database	DatabasesSyncDatabase)NoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)HealthCheckHealthCheckPolicy)experimentalc                   
   e Zd ZdZdefdZd ZdefdZde	ddfd	Z
de	fd
Zde	de	fdZdefdZde	defdZdefdZdefdZd Zd Zdedgdf         fdZd Zde	defdZd#deegdf         fdZdeded efd!Z d" Z!dS )$MultiDBClientzx
    Client that operates on multiple logical Redis databases.
    Should be used in Active-Active database setups.
    configc           
      N   |                                 | _        |j        s|                                n|j        | _        |j        | _        |j                            |j	        |j
                  | _        |j        s|                                n|j        | _        |j        |                                n|j        | _        | j                            | j                   |j        | _        |j        | _        |j        | _        | j                            t4          f           t7          | j        | j        | j        | j        |j        |j        | j        | j                  | _        d| _        tA          j!                    | _"        tG                      | _$        || _%        d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)&r    
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_probes_delay_health_check_policyr   default_failure_detectors_failure_detectorsr"   default_failover_strategy_failover_strategyset_databasesr&   _auto_fallback_intervalr%   _event_dispatcherr!   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r#   r$   command_executorinitialized	threadingRLock_hc_lockr	   _bg_scheduler_config)selfr   s     Z/var/www/html/movieo_spanner_bot/venv/lib/python3.11/site-packages/redis/multidb/client.py__init__zMultiDBClient.__init__   s    **,, '&F((***% 	
 '-&B#7=7Q7W7W&(H8
 8
!
 +*F,,...) 	 '/ ,,...) 	
 	--do>>>'-'D$!'!8$2335K4MNNN 6"5o-"5$6!0!3#'#?	!
 	!
 	!
 !!))022    c                 b   d }|                      |           | j                            | j        | j                    d}| j        D ]N\  }}|j                            | j                   |j        j        t          j
        k    r|s|| j        _        d}O|st          d          d| _        dS )zT
        Perform initialization of databases to define their initial state.
        c                     | N )errors    rD   raise_exception_on_failed_hcz>MultiDBClient.initialize.<locals>.raise_exception_on_failed_hcM   s    KrF   )on_errorFTz4Initial connection failed - no active database foundN)_check_databases_healthrA   run_recurringr,   r'   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr<   active_databaser   r=   )rC   rL   is_active_db_founddatabaseweights        rD   
initializezMultiDBClient.initializeH   s    
	 	 	 	$$.J$KKK 	(('(	
 	
 	

 # $ 	* 	*Hf--d.TUUU %77@R78@%5%)"! 	*F    rF   returnc                     | j         S )zE
        Returns a sorted (by weight) list of all databases.
        )r'   rC   s    rD   get_databaseszMultiDBClient.get_databasesk   s     rF   rX   Nc                 4   d}| j         D ]\  }}||k    rd} n|st          d          |                     |           |j        j        t
          j        k    r1| j                             d          d         \  }}|| j        _	        dS t          d          )zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r'   
ValueError_check_db_healthrP   rS   rT   rU   	get_top_nr<   rV   r   )rC   rX   existsexisting_db_highest_weighted_dbs         rD   set_active_databasez!MultiDBClient.set_active_databaseq   s     "o 	 	NKh&& '  	PNOOOh'''!W^33%)_%>%>q%A%A!%D"4<D!1F&?
 
 	
rF   c                 &   | j         D ]\  }}||k    rt          d          |                     |           | j                             d          d         \  }}| j                             ||j                   |                     ||           dS )z;
        Adds a new database to the database list.
        zGiven database already existsra   r   N)r'   rb   rc   rd   addrY   _change_active_database)rC   rX   rf   rg   rh   highest_weights         rD   add_databasezMultiDBClient.add_database   s     #o 	B 	BNKh&& !@AAA ' 	h'''.2o.G.G.J.J1.M+^Hho666$$X/BCCCCCrF   new_databasehighest_weight_databasec                 z    |j         |j         k    r(|j        j        t          j        k    r|| j        _        d S d S d S rI   )rY   rP   rS   rT   rU   r<   rV   )rC   ro   rp   s      rD   rl   z%MultiDBClient._change_active_database   sI     "9"@@@$*gn<<4@D!111 A@<<rF   c                     | j                             |          }| j                             d          d         \  }}||k    r(|j        j        t
          j        k    r|| j        _        dS dS dS )z<
        Removes a database from the database list.
        ra   r   N)	r'   removerd   rP   rS   rT   rU   r<   rV   )rC   rX   rY   rh   rm   s        rD   remove_databasezMultiDBClient.remove_database   sx     ''11.2o.G.G.J.J1.M+^ f$$#+1W^CC4GD!111 %$CCrF   rY   c                    d}| j         D ]\  }}||k    rd} n|st          d          | j                             d          d         \  }}| j                             ||           ||_        |                     ||           dS )z<
        Updates a database from the database list.
        NTr`   ra   r   )r'   rb   rd   update_weightrY   rl   )rC   rX   rY   re   rf   rg   rh   rm   s           rD   update_database_weightz$MultiDBClient.update_database_weight   s     "o 	 	NKh&& '  	PNOOO.2o.G.G.J.J1.M+^%%h777 $$X/BCCCCCrF   failure_detectorc                 :    | j                             |           dS )z>
        Adds a new failure detector to the database.
        N)r3   append)rC   rx   s     rD   add_failure_detectorz"MultiDBClient.add_failure_detector   s"     	&&'788888rF   healthcheckc                 z    | j         5  | j                            |           ddd           dS # 1 swxY w Y   dS )z:
        Adds a new health check to the database.
        N)r@   r*   rz   )rC   r|   s     rD   add_health_checkzMultiDBClient.add_health_check   s     ] 	4 	4&&{333	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4s   044c                 \    | j         s|                                   | j        j        |i |S )zB
        Executes a single command and return its result.
        )r=   rZ   r<   execute_commandrC   argsoptionss      rD   r   zMultiDBClient.execute_command   s:      	OO4t$4dFgFFFrF   c                      t          |           S )z:
        Enters into pipeline mode of the client.
        )Pipeliner]   s    rD   pipelinezMultiDBClient.pipeline   s     ~~rF   funcr   c                 b    | j         s|                                   | j        j        |g||R  S )z3
        Executes callable as transaction.
        )r=   rZ   r<   execute_transaction)rC   r   watchesr   s       rD   transactionzMultiDBClient.transaction   sB      	OO8t$8RR'RRRRrF   c                 R    | j         s|                                  t          | fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )r=   rZ   PubSub)rC   kwargss     rD   pubsubzMultiDBClient.pubsub   s5      	OOd%%f%%%rF   c                    | j                             | j        |          }|s2|j        j        t
          j        k    rt
          j        |j        _        |S |r0|j        j        t
          j        k    rt
          j        |j        _        |S )zO
        Runs health checks on the given database until first failure.
        )r1   executer*   rP   rS   rT   OPENrU   )rC   rX   
is_healthys      rD   rc   zMultiDBClient._check_db_health   sz    
 .66t7JHUU
 	4%55)0 & 	4H,2gnDD%,^H"rF   rM   c                     t          t           j                            5  fd j        D             }	 t          | j                  D ]~}	 |                                 # t          $ rZ}|j        }t          j	        |j
        _        t                              d|j                   |r ||j                   Y d}~wd}~ww xY wn# t          $ r t          d          w xY w	 ddd           dS # 1 swxY w Y   dS )zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        )max_workersc                 L    h | ] \  }}                     j        |          !S rJ   )submitrc   ).0rX   rg   executorrC   s      rD   	<setcomp>z8MultiDBClient._check_databases_health.<locals>.<setcomp>  s>       Ha  5x@@  rF   )timeoutz%Health check failed, due to exception)exc_infoNz4Health check execution exceeds health_check_interval)r   lenr'   r   r,   resultr   rX   rT   r   rP   rS   logger	exceptionoriginal_exceptionTimeoutError)rC   rM   futuresfutureeunhealthy_dbr   s   `     @rD   rN   z%MultiDBClient._check_databases_health  s   
  C,@,@AAA 	X    #'?  G
*T%@   ; ;F;5 
; 
; 
;'(z5<\,2((C%&%9 )   
 $ ;$HQ%9:::
;;      "J  !;	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	sS   C<CA)(C)
C3ACCCCC<C++C<<D D rP   	old_state	new_statec                     |t           j        k    r|                     |j                   d S |t           j        k    r8|t           j        k    r*| j                            t          t          |           d S d S d S rI   )
rT   	HALF_OPENrc   rX   rU   r   rA   run_oncer   _half_open_circuit)rC   rP   r   r   s       rD   rR   z/MultiDBClient._on_circuit_state_change_callback#  s     )))!!'"2333F&&9+D+D''$&8'     '&+D+DrF   c                     | j         r| j                                          | j        j        r%| j        j        j                                         dS dS )z:
        Closes the client and all its resources.
        N)rA   stopr<   rV   clientcloser]   s    rD   r   zMultiDBClient.close/  s`      	&##%%% 0 	A!18>>@@@@@	A 	ArF   rI   )"__name__
__module____qualname____doc__r   rE   rZ   r   r^   r   ri   rn   rl   r   rt   floatrw   r   r{   r   r~   r   r   r   r   r   boolrc   	ExceptionrN   r   rT   rR   r   rJ   rF   rD   r   r      s:        
(} ( ( ( (T!  !  ! Fy    
L 
T 
 
 
 
2D\ D D D DA(ACOA A A AH H H H HD| DU D D D D&9_ 9 9 9 94K 4 4 4 4G G G  S*t); < S S S S	& 	& 	& $        )d9J0K        D
%
29
FM
 
 
 
A A A A ArF   r   rP   c                 (    t           j        | _        d S rI   )rT   r   rS   )rP   s    rD   r   r   9  s    %GMMMrF   c                       e Zd ZdZdefdZddZd Zd Zde	fdZ
defd	ZddZddZddZd Zdee         fdZd
S )r   zG
    Pipeline implementation for multiple logical Redis databases.
    r   c                 "    g | _         || _        d S rI   )_command_stack_client)rC   r   s     rD   rE   zPipeline.__init__B  s     rF   r[   c                     | S rI   rJ   r]   s    rD   	__enter__zPipeline.__enter__F      rF   c                 .    |                                   d S rI   reset)rC   exc_type	exc_value	tracebacks       rD   __exit__zPipeline.__exit__I      

rF   c                 R    	 |                                   d S # t          $ r Y d S w xY wrI   r   r   r]   s    rD   __del__zPipeline.__del__L  s:    	JJLLLLL 	 	 	DD	    
&&c                 *    t          | j                  S rI   )r   r   r]   s    rD   __len__zPipeline.__len__R  s    4&'''rF   c                     dS )z1Pipeline instances should always evaluate to TrueTrJ   r]   s    rD   __bool__zPipeline.__bool__U  s    trF   Nc                     g | _         d S rI   )r   r]   s    rD   r   zPipeline.resetY  s     rF   c                 .    |                                   dS )zClose the pipelineNr   r]   s    rD   r   zPipeline.close\  s    

rF   c                 >    | j                             ||f           | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   rz   r   s      rD   pipeline_execute_commandz!Pipeline.pipeline_execute_command`  s$     	""D'?333rF   c                      | j         |i |S )zAdds a command to the stack)r   rC   r   r   s      rD   r   zPipeline.execute_commando  s    ,t,d=f===rF   c                 
   | j         j        s| j                                          	 | j         j                            t          | j                            |                                  S # |                                  w xY w)z0Execute all the commands in the current pipeline)r   r=   rZ   r<   execute_pipelinetupler   r   r]   s    rD   r   zPipeline.executes  sq    |' 	&L##%%%	<0AAd)**  JJLLLLDJJLLLLs   0A, ,B)r[   r   r[   N)r   r   r   r   r   rE   r   r   r   intr   r   r   r   r   r   r   r   r   r   rJ   rF   rD   r   r   =  s        }           ( ( ( ( ($    ! ! ! !      > > >
c 
 
 
 
 
 
rF   r   c                       e Zd ZdZdefdZddZddZddZdd	Z	e
defd
            Zd Zd Zd Zd Zd Zd Zd Z	 d dedefdZ	 d dedefdZ	 	 	 	 d!dededee         deddf
dZdS )"r   z2
    PubSub object for multi database client.
    r   c                 B    || _          | j         j        j        di | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        NrJ   )r   r<   r   )rC   r   r   s      rD   rE   zPubSub.__init__  s/     ,%,66v66666rF   r[   c                     | S rI   rJ   r]   s    rD   r   zPubSub.__enter__  r   rF   Nc                 R    	 |                                   d S # t          $ r Y d S w xY wrI   r   r]   s    rD   r   zPubSub.__del__  s<    	 JJLLLLL 	 	 	DD	r   c                 @    | j         j                            d          S )Nr   r   r<   execute_pubsub_methodr]   s    rD   r   zPubSub.reset  s    |,BB7KKKrF   c                 .    |                                   d S rI   r   r]   s    rD   r   zPubSub.close  r   rF   c                 .    | j         j        j        j        S rI   )r   r<   active_pubsub
subscribedr]   s    rD   r   zPubSub.subscribed  s    |,:EErF   c                 2     | j         j        j        dg|R  S )Nr   r   rC   r   s     rD   r   zPubSub.execute_command  s.    Bt|,B
 $
 
 
 	
rF   c                 8     | j         j        j        dg|R i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber   r   s      rD   r   zPubSub.psubscribe  >     Ct|,B

 
 
#)
 
 	
rF   c                 2     | j         j        j        dg|R  S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber   r   s     rD   r   zPubSub.punsubscribe  1    
 Ct|,B
!
 
 
 	
rF   c                 8     | j         j        j        dg|R i |S )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber   r   s      rD   r   zPubSub.subscribe  s>     Ct|,B

 
 
"(
 
 	
rF   c                 2     | j         j        j        dg|R  S )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber   r   s     rD   r   zPubSub.unsubscribe  s&    
 Ct|,B=XSWXXXXrF   c                 8     | j         j        j        dg|R i |S )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber   r   s      rD   r   zPubSub.ssubscribe  r   rF   c                 2     | j         j        j        dg|R  S )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber   r   s     rD   r   zPubSub.sunsubscribe  r   rF   F        ignore_subscribe_messagesr   c                 F    | j         j                            d||          S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager   r   r   rC   r   r   s      rD   r   zPubSub.get_message  s0     |,BB&? C 
 
 	
rF   c                 F    | j         j                            d||          S )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager   r   r   s      rD   r   zPubSub.get_sharded_message  s0     |,BB!&? C 
 
 	
rF   
sleep_timedaemonexception_handlersharded_pubsubr
   c                 J    | j         j                            |||| |          S )N)r   r   r   r  )r   r<   execute_pubsub_run)rC   r   r   r   r  s        rD   run_in_threadzPubSub.run_in_thread  s6     |,??/) @ 
 
 	
rF   )r[   r   r   )Fr   )r   FNF)r   r   r   r   r   rE   r   r   r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   r  rJ   rF   rD   r   r     s        	7} 	7 	7 	7 	7      L L L L    FD F F F XF
 
 



 

 


 
 


 

 

Y Y Y

 

 


 
 
 IL
 
)-
@E
 
 
 
" IL
 
)-
@E
 
 
 
$  04$
 

 
 $H-	

 
 

 
 
 
 
 
rF   r   )0loggingr>   concurrent.futuresr   concurrent.futures.threadr   typingr   r   r   r   redis.backgroundr	   redis.clientr
   redis.commandsr   r   redis.multidb.circuitr   r   rT   redis.multidb.command_executorr   redis.multidb.configr   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   redis.multidb.failure_detectorr   redis.multidb.healthcheckr   r   redis.utilsr   	getLoggerr   r   r   r   r   r   rJ   rF   rD   <module>r     sG        + + + + + + 8 8 8 8 8 8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 + + + + + + < < < < < < < < 0 0 0 0 0 0 2 2 2 2 2 2 A A A A A A D D D D D D D D D D D D D D D D D D X X X X X X X X : : : : : : D D D D D D D D $ $ $ $ $ $		8	$	$ ^A ^A ^A ^A ^A' ^A ^A ^AB	& & & & &@ @ @ @ @"L @ @ @FU
 U
 U
 U
 U
 U
 U
 U
 U
 U
rF   