
    h^G                        d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZmZmZ d dlm Z m!Z! d dl"m#Z# d dl$m%Z%m&Z& d dl'm(Z(  e j)        e*          Z+e( G d dee                      Z,defdZ- G d dee          Z. G d d          Z/dS )    N)as_completed)ThreadPoolExecutor)AnyCallableListOptional)BackgroundScheduler)PubSubWorkerThread)CoreCommandsRedisModuleCommands)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODMultiDbConfig)Database	DatabasesSyncDatabase)NoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)HealthCheckHealthCheckPolicy)experimentalc                   
   e Zd ZdZdefdZd ZdefdZde	ddfd	Z
de	fd
Zde	de	fdZdefdZde	defdZdefdZdefdZd Zd Zdedgdf         fdZd Zde	defdZd#deegdf         fdZdeded efd!Z d" Z!dS )$MultiDBClientzx
    Client that operates on multiple logical Redis databases.
    Should be used in Active-Active database setups.
    configc           
         |                                 | _        |                                | _        |j        | j                            |j                   |j        | _        |j        	                    |j
        |j                  | _        |                                | _        |j        | j                            |j                   |j        |                                n|j        | _        | j                            | j                   |j        | _        |j        | _        |j        | _        | j                            t6          f           t9          | j        | j        | j        | j        |j        |j        | j        | j                  | _        d| _         tC          j"                    | _#        tI                      | _%        || _&        d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)'r    
_databasesdefault_health_checks_health_checkshealth_checksextendhealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_probes_delay_health_check_policydefault_failure_detectors_failure_detectorsr   r"   default_failover_strategy_failover_strategyset_databasesr&   _auto_fallback_intervalr%   _event_dispatcherr!   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r#   r$   command_executorinitialized	threadingRLock_hc_lockr	   _bg_scheduler_config)selfr   s     T/var/www/html/auto_sub_bot/venv/lib/python3.11/site-packages/redis/multidb/client.py__init__zMultiDBClient.__init__   s    **,,$::<<+&&v';<<<&,&B#7=7Q7W7W&(H8
 8
! #)"B"B"D"D#/#**6+CDDD '/ ,,...) 	
 	--do>>>'-'D$!'!8$2335K4MNNN 6"5o-"5$6!0!3#'#?	!
 	!
 	!
 !!))022    c                 b   d }|                      |           | j                            | j        | j                    d}| j        D ]N\  }}|j                            | j                   |j        j        t          j
        k    r|s|| j        _        d}O|st          d          d| _        dS )zT
        Perform initialization of databases to define their initial state.
        c                     | N )errors    rE   raise_exception_on_failed_hcz>MultiDBClient.initialize.<locals>.raise_exception_on_failed_hcL   s    KrG   )on_errorFTz4Initial connection failed - no active database foundN)_check_databases_healthrB   run_recurringr-   r'   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr=   active_databaser   r>   )rD   rM   is_active_db_founddatabaseweights        rE   
initializezMultiDBClient.initializeG   s    
	 	 	 	$$.J$KKK 	(('(	
 	
 	

 # $ 	* 	*Hf--d.TUUU %77@R78@%5%)"! 	*F    rG   returnc                     | j         S )zE
        Returns a sorted (by weight) list of all databases.
        )r'   rD   s    rE   get_databaseszMultiDBClient.get_databasesj   s     rG   rY   Nc                 4   d}| j         D ]\  }}||k    rd} n|st          d          |                     |           |j        j        t
          j        k    r1| j                             d          d         \  }}|| j        _	        dS t          d          )zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r'   
ValueError_check_db_healthrQ   rT   rU   rV   	get_top_nr=   rW   r   )rD   rY   existsexisting_db_highest_weighted_dbs         rE   set_active_databasez!MultiDBClient.set_active_databasep   s     "o 	 	NKh&& '  	PNOOOh'''!W^33%)_%>%>q%A%A!%D"4<D!1F&?
 
 	
rG   c                 &   | j         D ]\  }}||k    rt          d          |                     |           | j                             d          d         \  }}| j                             ||j                   |                     ||           dS )z;
        Adds a new database to the database list.
        zGiven database already existsrb   r   N)r'   rc   rd   re   addrZ   _change_active_database)rD   rY   rg   rh   ri   highest_weights         rE   add_databasezMultiDBClient.add_database   s     #o 	B 	BNKh&& !@AAA ' 	h'''.2o.G.G.J.J1.M+^Hho666$$X/BCCCCCrG   new_databasehighest_weight_databasec                 z    |j         |j         k    r(|j        j        t          j        k    r|| j        _        d S d S d S rJ   )rZ   rQ   rT   rU   rV   r=   rW   )rD   rp   rq   s      rE   rm   z%MultiDBClient._change_active_database   sI     "9"@@@$*gn<<4@D!111 A@<<rG   c                     | j                             |          }| j                             d          d         \  }}||k    r(|j        j        t
          j        k    r|| j        _        dS dS dS )z<
        Removes a database from the database list.
        rb   r   N)	r'   removere   rQ   rT   rU   rV   r=   rW   )rD   rY   rZ   ri   rn   s        rE   remove_databasezMultiDBClient.remove_database   sx     ''11.2o.G.G.J.J1.M+^ f$$#+1W^CC4GD!111 %$CCrG   rZ   c                    d}| j         D ]\  }}||k    rd} n|st          d          | j                             d          d         \  }}| j                             ||           ||_        |                     ||           dS )z<
        Updates a database from the database list.
        NTra   rb   r   )r'   rc   re   update_weightrZ   rm   )rD   rY   rZ   rf   rg   rh   ri   rn   s           rE   update_database_weightz$MultiDBClient.update_database_weight   s     "o 	 	NKh&& '  	PNOOO.2o.G.G.J.J1.M+^%%h777 $$X/BCCCCCrG   failure_detectorc                 :    | j                             |           dS )z>
        Adds a new failure detector to the database.
        N)r4   append)rD   ry   s     rE   add_failure_detectorz"MultiDBClient.add_failure_detector   s"     	&&'788888rG   healthcheckc                 z    | j         5  | j                            |           ddd           dS # 1 swxY w Y   dS )z:
        Adds a new health check to the database.
        N)rA   r)   r{   )rD   r}   s     rE   add_health_checkzMultiDBClient.add_health_check   s     ] 	4 	4&&{333	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4s   044c                 \    | j         s|                                   | j        j        |i |S )zB
        Executes a single command and return its result.
        )r>   r[   r=   execute_commandrD   argsoptionss      rE   r   zMultiDBClient.execute_command   s:      	OO4t$4dFgFFFrG   c                      t          |           S )z:
        Enters into pipeline mode of the client.
        )Pipeliner^   s    rE   pipelinezMultiDBClient.pipeline   s     ~~rG   funcr   c                 b    | j         s|                                   | j        j        |g||R  S )z3
        Executes callable as transaction.
        )r>   r[   r=   execute_transaction)rD   r   watchesr   s       rE   transactionzMultiDBClient.transaction   sB      	OO8t$8RR'RRRRrG   c                 R    | j         s|                                  t          | fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )r>   r[   PubSub)rD   kwargss     rE   pubsubzMultiDBClient.pubsub   s5      	OOd%%f%%%rG   c                    | j                             | j        |          }|s2|j        j        t
          j        k    rt
          j        |j        _        |S |r0|j        j        t
          j        k    rt
          j        |j        _        |S )zO
        Runs health checks on the given database until first failure.
        )r2   executer)   rQ   rT   rU   OPENrV   )rD   rY   
is_healthys      rE   rd   zMultiDBClient._check_db_health   sz    
 .66t7JHUU
 	4%55)0 & 	4H,2gnDD%,^H"rG   rN   c                     t          t           j                            5  fd j        D             }	 t          | j                  D ]~}	 |                                 # t          $ rZ}|j        }t          j	        |j
        _        t                              d|j                   |r ||j                   Y d}~wd}~ww xY wn# t          $ r t          d          w xY w	 ddd           dS # 1 swxY w Y   dS )zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        )max_workersc                 L    h | ] \  }}                     j        |          !S rK   )submitrd   ).0rY   rh   executorrD   s      rE   	<setcomp>z8MultiDBClient._check_databases_health.<locals>.<setcomp>  s>       Ha  5x@@  rG   )timeoutz%Health check failed, due to exception)exc_infoNz4Health check execution exceeds health_check_interval)r   lenr'   r   r-   resultr   rY   rU   r   rQ   rT   logger	exceptionoriginal_exceptionTimeoutError)rD   rN   futuresfutureeunhealthy_dbr   s   `     @rE   rO   z%MultiDBClient._check_databases_health   s   
  C,@,@AAA 	X    #'?  G
*T%@   ; ;F;5 
; 
; 
;'(z5<\,2((C%&%9 )   
 $ ;$HQ%9:::
;;      "J  !;	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	sS   C<CA)(C)
C3ACCCCC<C++C<<D D rQ   	old_state	new_statec                     |t           j        k    r|                     |j                   d S |t           j        k    r8|t           j        k    r*| j                            t          t          |           d S d S d S rJ   )
rU   	HALF_OPENrd   rY   rV   r   rB   run_oncer   _half_open_circuit)rD   rQ   r   r   s       rE   rS   z/MultiDBClient._on_circuit_state_change_callback"  s     )))!!'"2333F&&9+D+D''$&8'     '&+D+DrG   c                 L    | j         j        j                                         d S rJ   )r=   rW   clientcloser^   s    rE   r   zMultiDBClient.close.  s#    -4::<<<<<rG   rJ   )"__name__
__module____qualname____doc__r   rF   r[   r   r_   r   rj   ro   rm   r   ru   floatrx   r   r|   r   r   r   r   r   r   r   boolrd   	ExceptionrO   r   rU   rS   r   rK   rG   rE   r   r      s5        
'} ' ' ' 'R!  !  ! Fy    
L 
T 
 
 
 
2D\ D D D DA(ACOA A A AH H H H HD| DU D D D D&9_ 9 9 9 94K 4 4 4 4G G G  S*t); < S S S S	& 	& 	& $        )d9J0K        D
%
29
FM
 
 
 
= = = = =rG   r   rQ   c                 (    t           j        | _        d S rJ   )rU   r   rT   )rQ   s    rE   r   r   2  s    %GMMMrG   c                       e Zd ZdZdefdZddZd Zd Zde	fdZ
defd	ZddZddZddZd Zdee         fdZd
S )r   zG
    Pipeline implementation for multiple logical Redis databases.
    r   c                 "    g | _         || _        d S rJ   )_command_stack_client)rD   r   s     rE   rF   zPipeline.__init__;  s     rG   r\   c                     | S rJ   rK   r^   s    rE   	__enter__zPipeline.__enter__?      rG   c                 .    |                                   d S rJ   reset)rD   exc_type	exc_value	tracebacks       rE   __exit__zPipeline.__exit__B      

rG   c                 R    	 |                                   d S # t          $ r Y d S w xY wrJ   r   r   r^   s    rE   __del__zPipeline.__del__E  s:    	JJLLLLL 	 	 	DD	    
&&c                 *    t          | j                  S rJ   )r   r   r^   s    rE   __len__zPipeline.__len__K  s    4&'''rG   c                     dS )z1Pipeline instances should always evaluate to TrueTrK   r^   s    rE   __bool__zPipeline.__bool__N  s    trG   Nc                     g | _         d S rJ   )r   r^   s    rE   r   zPipeline.resetR  s     rG   c                 .    |                                   dS )zClose the pipelineNr   r^   s    rE   r   zPipeline.closeU  s    

rG   c                 >    | j                             ||f           | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r{   r   s      rE   pipeline_execute_commandz!Pipeline.pipeline_execute_commandY  s$     	""D'?333rG   c                      | j         |i |S )zAdds a command to the stack)r   rD   r   r   s      rE   r   zPipeline.execute_commandh  s    ,t,d=f===rG   c                 
   | j         j        s| j                                          	 | j         j                            t          | j                            |                                  S # |                                  w xY w)z0Execute all the commands in the current pipeline)r   r>   r[   r=   execute_pipelinetupler   r   r^   s    rE   r   zPipeline.executel  sq    |' 	&L##%%%	<0AAd)**  JJLLLLDJJLLLLs   0A, ,B)r\   r   r\   N)r   r   r   r   r   rF   r   r   r   intr   r   r   r   r   r   r   r   r   r   rK   rG   rE   r   r   6  s        }           ( ( ( ( ($    ! ! ! !      > > >
c 
 
 
 
 
 
rG   r   c                       e Zd ZdZdefdZddZddZddZdd	Z	e
defd
            Zd Zd Zd Zd Zd Zd Zd Z	 d dedefdZ	 d dedefdZ	 	 	 	 d!dededee         deddf
dZdS )"r   z2
    PubSub object for multi database client.
    r   c                 B    || _          | j         j        j        di | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        NrK   )r   r=   r   )rD   r   r   s      rE   rF   zPubSub.__init__~  s/     ,%,66v66666rG   r\   c                     | S rJ   rK   r^   s    rE   r   zPubSub.__enter__  r   rG   Nc                 R    	 |                                   d S # t          $ r Y d S w xY wrJ   r   r^   s    rE   r   zPubSub.__del__  s<    	 JJLLLLL 	 	 	DD	r   c                 @    | j         j                            d          S )Nr   r   r=   execute_pubsub_methodr^   s    rE   r   zPubSub.reset  s    |,BB7KKKrG   c                 .    |                                   d S rJ   r   r^   s    rE   r   zPubSub.close  r   rG   c                 .    | j         j        j        j        S rJ   )r   r=   active_pubsub
subscribedr^   s    rE   r   zPubSub.subscribed  s    |,:EErG   c                 2     | j         j        j        dg|R  S )Nr   r   rD   r   s     rE   r   zPubSub.execute_command  s.    Bt|,B
 $
 
 
 	
rG   c                 8     | j         j        j        dg|R i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber   r   s      rE   r   zPubSub.psubscribe  >     Ct|,B

 
 
#)
 
 	
rG   c                 2     | j         j        j        dg|R  S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber   r   s     rE   r   zPubSub.punsubscribe  1    
 Ct|,B
!
 
 
 	
rG   c                 8     | j         j        j        dg|R i |S )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber   r   s      rE   r   zPubSub.subscribe  s>     Ct|,B

 
 
"(
 
 	
rG   c                 2     | j         j        j        dg|R  S )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber   r   s     rE   r   zPubSub.unsubscribe  s&    
 Ct|,B=XSWXXXXrG   c                 8     | j         j        j        dg|R i |S )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber   r   s      rE   r   zPubSub.ssubscribe  r   rG   c                 2     | j         j        j        dg|R  S )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber   r   s     rE   r   zPubSub.sunsubscribe  r   rG   F        ignore_subscribe_messagesr   c                 F    | j         j                            d||          S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager   r   r   rD   r   r   s      rE   r   zPubSub.get_message  s0     |,BB&? C 
 
 	
rG   c                 F    | j         j                            d||          S )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager   r   r   s      rE   r   zPubSub.get_sharded_message  s0     |,BB!&? C 
 
 	
rG   
sleep_timedaemonexception_handlersharded_pubsubr
   c                 J    | j         j                            |||| |          S )N)r   r   r   r  )r   r=   execute_pubsub_run)rD   r   r   r   r  s        rE   run_in_threadzPubSub.run_in_thread  s6     |,??/) @ 
 
 	
rG   )r\   r   r   )Fr   )r   FNF)r   r   r   r   r   rF   r   r   r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   r  rK   rG   rE   r   r   y  s        	7} 	7 	7 	7 	7      L L L L    FD F F F XF
 
 



 

 


 
 


 

 

Y Y Y

 

 


 
 
 IL
 
)-
@E
 
 
 
" IL
 
)-
@E
 
 
 
$  04$
 

 
 $H-	

 
 

 
 
 
 
 
rG   r   )0loggingr?   concurrent.futuresr   concurrent.futures.threadr   typingr   r   r   r   redis.backgroundr	   redis.clientr
   redis.commandsr   r   redis.multidb.circuitr   r   rU   redis.multidb.command_executorr   redis.multidb.configr   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   redis.multidb.failure_detectorr   redis.multidb.healthcheckr   r   redis.utilsr   	getLoggerr   r   r   r   r   r   rK   rG   rE   <module>r     s?        + + + + + + 8 8 8 8 8 8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 + + + + + + < < < < < < < < 0 0 0 0 0 0 2 2 2 2 2 2 A A A A A A D D D D D D D D D D D D D D D D D D X X X X X X X X : : : : : : D D D D D D D D $ $ $ $ $ $		8	$	$ W= W= W= W= W=' W= W= W=t& & & & &@ @ @ @ @"L @ @ @FU
 U
 U
 U
 U
 U
 U
 U
 U
 U
rG   