
    ph@.                     2   S SK JrJr  S SKJrJr  S SKJrJrJrJ	r	  S SK
JrJrJr  S SKJrJr  S SKJr  S SKJr  S SKJrJrJr  S S	KJrJrJrJr  S S
KJ r J!r!J"r"J#r#J$r$  S SK%J&r&  S SK'J(r(   " S S\5      r) " S S\)5      r* " S S\)5      r+ " S S\+\*5      r,g)    )ABCabstractmethod)datetime	timedelta)AnyCallableListOptional)PipelinePubSubPubSubWorkerThread)EventDispatcherInterfaceOnCommandsFailEvent)State)DEFAULT_AUTO_FALLBACK_INTERVAL)Database	DatabasesSyncDatabase)ActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYDefaultFailoverStrategyExecutorFailoverStrategyFailoverStrategyExecutor)FailureDetector)Retryc                   p    \ rS rSr\\S\4S j5       5       r\R                  \S\SS4S j5       5       rSr	g)CommandExecutor   returnc                     g)zReturns auto-fallback interval.N selfs    X/home/james-whalen/.local/lib/python3.13/site-packages/redis/multidb/command_executor.pyauto_fallback_interval&CommandExecutor.auto_fallback_interval        	    r)   Nc                     g)zSets auto-fallback interval.Nr%   r'   r)   s     r(   r)   r*   "   r+   r,   r%   )
__name__
__module____qualname____firstlineno__propertyr   floatr)   setter__static_attributes__r%   r,   r(   r!   r!      sS        ""U t   #r,   r!   c                   z    \ rS rSr\4S\4S jjr\S\4S j5       r\R                  S\
SS4S j5       rS
S jrS	rg)BaseCommandExecutor)   r)   c                     Xl         U   g N_auto_fallback_intervalr.   s     r(   __init__BaseCommandExecutor.__init__*   s     (>$r,   r#   c                     U R                   $ r;   r<   r&   s    r(   r)   *BaseCommandExecutor.auto_fallback_interval1   s    +++r,   Nc                     Xl         g r;   r<   r.   s     r(   r)   rA   5   s    '=$r,   c                     U R                   [        :X  a  g [        R                  " 5       [	        U R                   S9-   U l        g )N)seconds)r=   r   r   nowr   _next_fallback_attemptr&   s    r(   _schedule_next_fallback+BaseCommandExecutor._schedule_next_fallback9   s7    ''+II&.llny008
 '
#r,   )r=   rF   )r#   N)r/   r0   r1   r2   r   r4   r>   r3   r)   r5   intrG   r6   r%   r,   r(   r8   r8   )   s]     )G. %. , , , "">S >T > #>
r,   r8   c                      \ rS rSr\\S\4S j5       5       r\\S\\	   4S j5       5       r
\S\	SS4S j5       r\\S\\   4S j5       5       r\R                  \S	\SS4S
 j5       5       r\\S\\   4S j5       5       r\R                  \S\SS4S j5       5       r\\S\4S j5       5       r\\S\4S j5       5       r\S 5       r\S 5       r\S\4S j5       r\S\\/S4   4S j5       r\S\4S j5       r\S\ S\!4S j5       r"Sr#g)SyncCommandExecutorB   r#   c                     g)zReturns a list of databases.Nr%   r&   s    r(   	databasesSyncCommandExecutor.databasesC   r+   r,   c                     g)z$Returns a list of failure detectors.Nr%   r&   s    r(   failure_detectors%SyncCommandExecutor.failure_detectorsI   r+   r,   failure_detectorNc                     g)z=Adds a new failure detector to the list of failure detectors.Nr%   r'   rS   s     r(   add_failure_detector(SyncCommandExecutor.add_failure_detectorO        	r,   c                     g)z"Returns currently active database.Nr%   r&   s    r(   active_database#SyncCommandExecutor.active_databaseT   r+   r,   databasec                     g)z#Sets the currently active database.Nr%   )r'   r\   s     r(   rZ   r[   Z   r+   r,   c                     g)z Returns currently active pubsub.Nr%   r&   s    r(   active_pubsub!SyncCommandExecutor.active_pubsub`   r+   r,   pubsubc                     g)zSets currently active pubsub.Nr%   r'   ra   s     r(   r_   r`   f   r+   r,   c                     g)z#Returns failover strategy executor.Nr%   r&   s    r(   failover_strategy_executor.SyncCommandExecutor.failover_strategy_executorl   r+   r,   c                     g)zReturns command retry object.Nr%   r&   s    r(   command_retry!SyncCommandExecutor.command_retryr   r+   r,   c                     g)z:Initializes a PubSub object on a currently active databaseNr%   )r'   kwargss     r(   ra   SyncCommandExecutor.pubsubx   rX   r,   c                     g)z*Executes a command and returns the result.Nr%   )r'   argsoptionss      r(   execute_command#SyncCommandExecutor.execute_command}   rX   r,   command_stackc                     g)z)Executes a stack of commands in pipeline.Nr%   )r'   rr   s     r(   execute_pipeline$SyncCommandExecutor.execute_pipeline   rX   r,   transactionc                     g)z1Executes a transaction block wrapped in callback.Nr%   )r'   rv   watchesro   s       r(   execute_transaction'SyncCommandExecutor.execute_transaction   s    
 	r,   method_namec                     g)z*Executes a given method on active pub/sub.Nr%   )r'   r{   rn   rk   s       r(   execute_pubsub_method)SyncCommandExecutor.execute_pubsub_method   rX   r,   
sleep_timec                     g)z!Executes pub/sub run in a thread.Nr%   )r'   r   rk   s      r(   execute_pubsub_run&SyncCommandExecutor.execute_pubsub_run   rX   r,   r%   )$r/   r0   r1   r2   r3   r   r   rN   r	   r   rQ   rV   r
   r   rZ   r5   r   r   r_   r   re   r   rh   ra   rp   tuplert   r   r   ry   strr}   r4   r   r   r6   r%   r,   r(   rK   rK   B   s   9    4#8    _    (!3         x/    F t    ,D    u        e   #XJ$45     U   r,   rK   c                     ^  \ rS rSr\\\4S\\   S\	S\
S\S\S\S\S	\4U 4S
 jjjr\S\	4S j5       r\S\\   4S j5       rS\SS4S jr\S\
4S j5       r\S\\   4S j5       r\R0                  S\SS4S j5       r\S\\   4S j5       r\R0                  S\SS4S j5       r\S\4S j5       rS rS\4S jrS\ \!/S4   4S jr"S r#S\$4S  jr%S+S! jr&S,S"\ S#\4S$ jjr'S% r(S& r)S'\4S( jr*S) r+S*r,U =r-$ )-DefaultCommandExecutor   rQ   rN   rh   failover_strategyevent_dispatcherfailover_attemptsfailover_delayr)   c	                   > [         T
U ]  U5        U H  n	U	R                  U S9  M     X l        Xl        X0l        [        XFU5      U l        XPl        SU l	        SU l
        0 U l        U R                  5         U R                  5         g)aX  
Initialize the DefaultCommandExecutor instance.

Args:
    failure_detectors: List of failure detector instances to monitor database health
    databases: Collection of available databases to execute commands on
    command_retry: Retry policy for failed command execution
    failover_strategy: Strategy for handling database failover
    event_dispatcher: Interface for dispatching events
    failover_attempts: Number of failover attempts
    failover_delay: Delay between failover attempts
    auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
)command_executorN)superr>   set_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcherrG   )r'   rQ   rN   rh   r   r   r   r   r)   fd	__class__s             r(   r>   DefaultCommandExecutor.__init__   s    0 	/0#B##T#: $ $"3++J.,
( "24804%'"$$&$$&r,   r#   c                     U R                   $ r;   )r   r&   s    r(   rN    DefaultCommandExecutor.databases   s    r,   c                     U R                   $ r;   )r   r&   s    r(   rQ   (DefaultCommandExecutor.failure_detectors   s    &&&r,   rS   Nc                 :    U R                   R                  U5        g r;   )r   appendrU   s     r(   rV   +DefaultCommandExecutor.add_failure_detector   s    &&'78r,   c                     U R                   $ r;   )r   r&   s    r(   rh   $DefaultCommandExecutor.command_retry       """r,   c                     U R                   $ r;   )r   r&   s    r(   rZ   &DefaultCommandExecutor.active_database   s    $$$r,   r\   c                     U R                   nXl         UbA  X!La<  U R                  R                  [        UU R                   U 40 U R                  D65        g g g r;   )r   r   dispatchr   r   )r'   r\   
old_actives      r(   rZ   r      sb    **
 (!j&@""++%)) 00	 'A!r,   c                     U R                   $ r;   r   r&   s    r(   r_   $DefaultCommandExecutor.active_pubsub   r   r,   ra   c                     Xl         g r;   r   rc   s     r(   r_   r      s    $r,   c                     U R                   $ r;   )r   r&   s    r(   re   1DefaultCommandExecutor.failover_strategy_executor   s    ///r,   c                 <   ^ ^^ UUU 4S jnT R                  UT5      $ )Nc                  v   > TR                   R                  R                  " T0 TD6n TR                  T5        U $ r;   )r   clientrp   _register_command_execution)responsern   ro   r'   s    r(   callback8DefaultCommandExecutor.execute_command.<locals>.callback   s7    ,,33CCTUWUH,,T2Or,   _execute_with_failure_detection)r'   rn   ro   r   s   ``` r(   rp   &DefaultCommandExecutor.execute_command   s    	
 33HdCCr,   rr   c                 8   ^ ^ UU 4S jnT R                  UT5      $ )Nc                    > TR                   R                  R                  5        n T H  u  pU R                  " U0 UD6  M     U R	                  5       nTR                  T5        UsS S S 5        $ ! , (       d  f       g = fr;   )r   r   pipelinerp   executer   )pipecommandro   r   rr   r'   s       r(   r   9DefaultCommandExecutor.execute_pipeline.<locals>.callback   sg    &&--668D(5$G(('=W= )6  <<>00? 988s   A A00
A>r   )r'   rr   r   s   `` r(   rt   'DefaultCommandExecutor.execute_pipeline   s    	  33HmLLr,   rv   c                 >   ^ ^^^ UU UU4S jnT R                  U5      $ )Nc                  ~   > TR                   R                  R                  " T/TQ70 TD6n TR                  S5        U $ Nr%   )r   r   rv   r   )r   ro   r'   rv   rx   s    r(   r   <DefaultCommandExecutor.execute_transaction.<locals>.callback	  sG    ,,33??%)0H ,,R0Or,   r   )r'   rv   rx   ro   r   s   ```` r(   ry   *DefaultCommandExecutor.execute_transaction  s     	 	 33H==r,   c                 6   ^ ^ UU 4S jnT R                  U5      $ )Nc                     > TR                   c2  TR                  R                  R                  " S0 T D6Tl         T Tl        g r   )r   r   r   ra   r   )rk   r'   s   r(   r   /DefaultCommandExecutor.pubsub.<locals>.callback  s<    ""*&*&;&;&B&B&I&I&SF&S#-3*r,   r   )r'   rk   r   s   `` r(   ra   DefaultCommandExecutor.pubsub  s    	 33H==r,   r{   c                 B   ^ ^^^ UUUU 4S jnT R                   " U/TQ76 $ )Nc                  f   > [        TR                  T5      n U " T0 TD6nTR                  T5        U$ r;   )getattrr_   r   )methodr   rn   rk   r{   r'   s     r(   r   >DefaultCommandExecutor.execute_pubsub_method.<locals>.callback  s7    T//=Ft.v.H,,T2Or,   r   )r'   r{   rn   rk   r   s   ```` r(   r}   ,DefaultCommandExecutor.execute_pubsub_method  s%    	 	 33HDtDDr,   c                 :   ^ ^^ UU U4S jnT R                  U5      $ )Nc                  >   > TR                   R                  " T40 T D6$ r;   )r   run_in_thread)rk   r'   r   s   r(   r   ;DefaultCommandExecutor.execute_pubsub_run.<locals>.callback%  s    &&44ZJ6JJr,   r   )r'   r   rk   r   s   ``` r(   r   )DefaultCommandExecutor.execute_pubsub_run$  s    	K 33H==r,   r   cmdsc                 b   ^ ^^^ UU 4S jmT R                   R                  U4S jUU 4S j5      $ )z?
Execute a commands execution callback with failure detection.
c                  2   > TR                  5         T " 5       $ r;   )_check_active_database)r   r'   s   r(   wrapperGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapper/  s    ''):r,   c                     > T " 5       $ r;   r%   )r   s   r(   <lambda>HDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>5  s    GIr,   c                 *   > TR                   " U /TQ76 $ r;   )_on_command_fail)errorr   r'   s    r(   r   r   6  s    $//==r,   )r   call_with_retry)r'   r   r   r   s   ```@r(   r   6DefaultCommandExecutor._execute_with_failure_detection*  s)    
	
 ""22=
 	
r,   c                 L    U R                   R                  [        X!5      5        g r;   )r   r   r   )r'   r   rn   s      r(   r   'DefaultCommandExecutor._on_command_fail9  s    ''(;D(HIr,   c                 R   U R                   bi  U R                   R                  R                  [        R                  :w  d7  U R
                  [        :w  aT  U R                  [        R                  " 5       ::  a0  U R                  R                  5       U l        U R                  5         ggg)z2
Checks if active a database needs to be updated.
N)r   circuitstateCBStateCLOSEDr=   r   rF   r   rE   r   r   rZ   rG   r&   s    r(   r   -DefaultCommandExecutor._check_active_database<  s    
 !!)$$,,22gnnD,,0NN//8<<>A $(#C#C#K#K#MD ((*	 B Or,   cmdc                 L    U R                    H  nUR                  U5        M     g r;   )r   register_command_execution)r'   r   detectors      r(   r   2DefaultCommandExecutor._register_command_executionK  s     //H//4 0r,   c                     [        U R                  5      n[        5       n[        5       nU R                  R                  [        U/[        UU/05        g)z 
Registers necessary listeners.
N)r   r   r   r   r   register_listenersr   r   )r'   failure_listenerresubscribe_listenerclose_connection_listeners       r(   r   .DefaultCommandExecutor._setup_event_dispatcherO  sW     2$2I2IJAC$J$L!11#&6%7%-((	
r,   )	r   r   r   r   r   r   r   r   rZ   )r#   r   )r%   ).r/   r0   r1   r2   r   r   r   r	   r   r   r   r   r   rI   r4   r>   r3   rN   rQ   rV   rh   r
   r   rZ   r5   r   r_   r   re   rp   r   rt   r   r   ry   ra   r   r}   r   r   r   r   r   r   r6   __classcell__)r   s   @r(   r   r      s    "; 6(F('0(' (' 	('
 ,(' 3(' (' (' !&(' ('T 9   '4#8 ' '9_ 9 9 #u # # %,!7 % %     #x/ # # %F %t % % 0,D 0 0D
Me 
M
>#XJ$45
>>E E>
 
 
J+5u 5
 
r,   r   N)-abcr   r   r   r   typingr   r   r	   r
   redis.clientr   r   r   redis.eventr   r   redis.multidb.circuitr   r   redis.multidb.configr   redis.multidb.databaser   r   r   redis.multidb.eventr   r   r   r   redis.multidb.failoverr   r   r   r   r   redis.multidb.failure_detectorr   redis.retryr   r!   r8   rK   r   r%   r,   r(   <module>r     s{    # ( 0 0 = = E 2 ? D D   ; c 
/ 
2T/ TnE
02E E
r,   