
    ph^G                     l   S SK r S SKrS SKJr  S SKJr  S SKJrJrJ	r	J
r
  S SKJr  S SKJr  S SKJrJr  S SKJr  S S	KJr  S S
KJr  S SKJrJr  S SKJrJrJr  S SKJ r J!r!  S SK"J#r#  S SK$J%r%J&r&  S SK'J(r(  \ RR                  " \*5      r+\( " S S\\5      5       r,S\4S jr- " S S\\5      r. " S S5      r/g)    N)as_completed)ThreadPoolExecutor)AnyCallableListOptional)BackgroundScheduler)PubSubWorkerThread)CoreCommandsRedisModuleCommands)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODMultiDbConfig)Database	DatabasesSyncDatabase)NoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)HealthCheckHealthCheckPolicy)experimentalc                       \ rS rSrSrS\4S jrS rS\4S jr	S\
SS	4S
 jrS\
4S jrS\
S\
4S jrS\4S jrS\
S\4S jrS\4S jrS\4S jrS rS rS\S/S	4   4S jrS rS\
S\4S jrS%S\\/S	4   4S jjrS\S \ S!\ 4S" jr!S# r"S$r#g	)&MultiDBClient   zl
Client that operates on multiple logical Redis databases.
Should be used in Active-Active database setups.
configc                 X   UR                  5       U l        UR                  5       U l        UR                  b%  U R                  R                  UR                  5        UR                  U l        UR                  R                  UR                  UR                  5      U l        UR                  5       U l        UR                  b%  U R                  R                  UR                  5        UR                   c  UR#                  5       OUR                   U l        U R$                  R'                  U R                  5        UR(                  U l        UR,                  U l        UR0                  U l        U R2                  R5                  [6        45        [9        U R                  U R                  U R2                  U R$                  UR:                  UR<                  U R.                  U R*                  S9U l        SU l         [B        RD                  " 5       U l#        [I        5       U l%        Xl&        g )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)'r!   
_databasesdefault_health_checks_health_checkshealth_checksextendhealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_probes_delay_health_check_policydefault_failure_detectors_failure_detectorsr    r#   default_failover_strategy_failover_strategyset_databasesr'   _auto_fallback_intervalr&   _event_dispatcherr"   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r$   r%   command_executorinitialized	threadingRLock_hc_lockr	   _bg_scheduler_config)selfr   s     N/home/james-whalen/.local/lib/python3.13/site-packages/redis/multidb/client.py__init__MultiDBClient.__init__   s    **,$::<+&&v';';<&,&B&B#7=7Q7Q7W7W&&(H(H8
! #)"B"B"D##/##**6+C+CD ''/ ,,.)) 	
 	--doo>'-'D'D$!'!8!8$22335K4MN 6"55oo--"55$66!00!33#'#?#?	!
 !!)02    c                    S nU R                  US9  U R                  R                  U R                  U R                   5        SnU R                   Ho  u  p4UR
                  R                  U R                  5        UR
                  R                  [        R                  :X  d  MT  U(       a  M]  X0R                  l        SnMq     U(       d  [        S5      eSU l        g)zD
Perform initialization of databases to define their initial state.
c                     U eN )errors    rF   raise_exception_on_failed_hc>MultiDBClient.initialize.<locals>.raise_exception_on_failed_hcL   s    KrI   )on_errorFTz4Initial connection failed - no active database foundN)_check_databases_healthrC   run_recurringr.   r(   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr>   active_databaser   r?   )rE   rO   is_active_db_founddatabaseweights        rF   
initializeMultiDBClient.initializeG   s    
	 	$$.J$K 	((''((	

 # $H--d.T.TU %%7@R@R8@%%5%)" !0 "*F   rI   returnc                     U R                   $ )z5
Returns a sorted (by weight) list of all databases.
)r(   rE   s    rF   get_databasesMultiDBClient.get_databasesj   s     rI   r\   Nc                 P   SnU R                    H  u  p4X1:X  d  M  Sn  O   U(       d  [        S5      eU R                  U5        UR                  R                  [
        R                  :X  a1  U R                   R                  S5      S   u  pTXR                  l	        g[        S5      e)z<
Promote one of the existing databases to become an active.
NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r(   
ValueError_check_db_healthrT   rW   rX   rY   	get_top_nr>   rZ   r   )rE   r\   existsexisting_db_highest_weighted_dbs         rF   set_active_database!MultiDBClient.set_active_databasep   s     "ooNK& .
 NOOh'!!W^^3%)__%>%>q%A!%D"4<!!1&?
 	
rI   c                    U R                    H  u  p#X!:X  d  M  [        S5      e   U R                  U5        U R                   R                  S5      S   u  pEU R                   R	                  XR
                  5        U R                  X5        g)z+
Adds a new database to the database list.
zGiven database already existsrg   r   N)r(   rh   ri   rj   addr]   _change_active_database)rE   r\   rl   rm   rn   highest_weights         rF   add_databaseMultiDBClient.add_database   sw     #ooNK& !@AA . 	h'.2oo.G.G.J1.M+Hoo6$$XCrI   new_databasehighest_weight_databasec                     UR                   UR                   :  a:  UR                  R                  [        R                  :X  a  XR
                  l        g g g rL   )r]   rT   rW   rX   rY   r>   rZ   )rE   rw   rx   s      rF   rs   %MultiDBClient._change_active_database   sH     "9"@"@@$$**gnn<4@!!1 = ArI   c                     U R                   R                  U5      nU R                   R                  S5      S   u  p4XB::  a:  UR                  R                  [
        R                  :X  a  X0R                  l        ggg)z,
Removes a database from the database list.
rg   r   N)	r(   removerj   rT   rW   rX   rY   r>   rZ   )rE   r\   r]   rn   rt   s        rF   remove_databaseMultiDBClient.remove_database   sl     ''1.2oo.G.G.J1.M+ $#++11W^^C4G!!1 D %rI   r]   c                    SnU R                    H  u  pEXA:X  d  M  Sn  O   U(       d  [        S5      eU R                   R                  S5      S   u  pgU R                   R                  X5        X!l        U R                  X5        g)z,
Updates a database from the database list.
NTrf   rg   r   )r(   rh   rj   update_weightr]   rs   )rE   r\   r]   rk   rl   rm   rn   rt   s           rF   update_database_weight$MultiDBClient.update_database_weight   sz     "ooNK& .
 NOO.2oo.G.G.J1.M+%%h7 $$XCrI   failure_detectorc                 :    U R                   R                  U5        g)z.
Adds a new failure detector to the database.
N)r5   append)rE   r   s     rF   add_failure_detector"MultiDBClient.add_failure_detector   s     	&&'78rI   healthcheckc                     U R                      U R                  R                  U5        SSS5        g! , (       d  f       g= f)z*
Adds a new health check to the database.
N)rB   r*   r   )rE   r   s     rF   add_health_checkMultiDBClient.add_health_check   s)     ]]&&{3 ]]s	   2
A c                 |    U R                   (       d  U R                  5         U R                  R                  " U0 UD6$ )z2
Executes a single command and return its result.
)r?   r^   r>   execute_commandrE   argsoptionss      rF   r   MultiDBClient.execute_command   s3     OO$$44dFgFFrI   c                     [        U 5      $ )z*
Enters into pipeline mode of the client.
)Pipelinerb   s    rF   pipelineMultiDBClient.pipeline   s     ~rI   funcr   c                     U R                   (       d  U R                  5         U R                  R                  " U/UQUQ76 $ )z#
Executes callable as transaction.
)r?   r^   r>   execute_transaction)rE   r   watchesr   s       rF   transactionMultiDBClient.transaction   s8     OO$$88RR'RRrI   c                 \    U R                   (       d  U R                  5         [        U 40 UD6$ )z
Return a Publish/Subscribe object. With this object, you can
subscribe to channels and listen for messages that get published to
them.
)r?   r^   PubSub)rE   kwargss     rF   pubsubMultiDBClient.pubsub   s'     OOd%f%%rI   c                    U R                   R                  U R                  U5      nU(       dI  UR                  R                  [
        R                  :w  a  [
        R                  UR                  l        U$ U(       aG  UR                  R                  [
        R                  :w  a  [
        R                  UR                  l        U$ )z?
Runs health checks on the given database until first failure.
)r3   executer*   rT   rW   rX   OPENrY   )rE   r\   
is_healthys      rF   ri   MultiDBClient._check_db_health   s    
 ..66t7J7JHU
%%5)0  &H,,22gnnD%,^^H"rI   rQ   c           	      |   [        [        U R                  5      S9 nU R                   VVs1 s H!  u  p4UR                  U R                  U5      iM#     nnn [        XPR                  S9 H  n UR                  5         M      SSS5        gs  snnf ! [         as  nUR                  n[        R                  UR                  l        [        R                  SUR                   S9  U(       a  U" UR                   5         SnAM   SnAM  SnAff = f! ["         a    [#        S5      ef = f! , (       d  f       g= f)zS
Runs health checks as a recurring task.
Runs health checks against all databases.
)max_workers)timeoutz%Health check failed, due to exception)exc_infoNz4Health check execution exceeds health_check_interval)r   lenr(   submitri   r   r.   resultr   r\   rX   r   rT   rW   logger	exceptionoriginal_exceptionTimeoutError)	rE   rQ   executorr\   rm   futuresfutureeunhealthy_dbs	            rF   rR   %MultiDBClient._check_databases_health   s   
  C,@AX $(??#2KH  5 5x@#2  
*%@%@F;	 BA 6 
;'(zz5<\\,,2((C%&%9%9 ) 
 $$Q%9%9:: $
;   "J 1 BAsY   D-(BD-D0B DD-
DA"D DDDD**D--
D;rT   	old_state	new_statec                    U[         R                  :X  a  U R                  UR                  5        g U[         R                  :X  a;  U[         R
                  :X  a&  U R                  R                  [        [        U5        g g g rL   )
rX   	HALF_OPENri   r\   rY   r   rC   run_oncer   _half_open_circuit)rE   rT   r   r   s       rF   rV   /MultiDBClient._on_circuit_state_change_callback"  se     )))!!'"2"23&9+D''$&8' ,E&rI   c                 `    U R                   R                  R                  R                  5         g rL   )r>   rZ   clientcloserb   s    rF   r   MultiDBClient.close.  s     --44::<rI   )r9   rC   r;   rD   r(   r:   r7   r5   rB   r.   r3   r*   r>   r?   rL   )$__name__
__module____qualname____firstlineno____doc__r   rG   r^   r   rc   r   ro   ru   rs   r   r}   floatr   r   r   r   r   r   r   r   r   r   boolri   	ExceptionrR   r   rX   rV   r   __static_attributes__rM   rI   rF   r   r      s   
'} 'R! Fy 
L 
T 
2D\ DA(ACOAH HD| DU D&9_ 94K 4GS*t); < S	& $   )d9J0K  D
%
29
FM
=rI   r   rT   c                 .    [         R                  U l        g rL   )rX   r   rW   )rT   s    rF   r   r   2  s    %%GMrI   c                       \ rS rSrSrS\4S jrSS jrS rS r	S\
4S	 jrS\4S
 jrSS jrSS jrSS jrS rS\\   4S jrSrg)r   i6  z?
Pipeline implementation for multiple logical Redis databases.
r   c                     / U l         Xl        g rL   )_command_stack_client)rE   r   s     rF   rG   Pipeline.__init__;  s     rI   r`   c                     U $ rL   rM   rb   s    rF   	__enter__Pipeline.__enter__?      rI   c                 $    U R                  5         g rL   reset)rE   exc_type	exc_value	tracebacks       rF   __exit__Pipeline.__exit__B      

rI   c                 F     U R                  5         g ! [         a     g f = frL   r   r   rb   s    rF   __del__Pipeline.__del__E  s"    	JJL 		    
  c                 ,    [        U R                  5      $ rL   )r   r   rb   s    rF   __len__Pipeline.__len__K  s    4&&''rI   c                     g)z1Pipeline instances should always evaluate to TrueTrM   rb   s    rF   __bool__Pipeline.__bool__N  s    rI   Nc                     / U l         g rL   )r   rb   s    rF   r   Pipeline.resetR  s
     rI   c                 $    U R                  5         g)zClose the pipelineNr   rb   s    rF   r   Pipeline.closeU  s    

rI   c                 >    U R                   R                  X45        U $ )a:  
Stage a command to be executed when execute() is next called

Returns the current Pipeline object back so commands can be
chained together, such as:

pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

At some other point, you can then run: pipe.execute(),
which will execute all commands queued in the pipe.
)r   r   r   s      rF   pipeline_execute_command!Pipeline.pipeline_execute_commandY  s     	""D?3rI   c                 &    U R                   " U0 UD6$ )zAdds a command to the stack)r   rE   r   r   s      rF   r   Pipeline.execute_commandh  s    ,,d=f==rI   c                 (   U R                   R                  (       d  U R                   R                  5          U R                   R                  R	                  [        U R                  5      5      U R                  5         $ ! U R                  5         f = f)z0Execute all the commands in the current pipeline)r   r?   r^   r>   execute_pipelinetupler   r   rb   s    rF   r   Pipeline.executel  s_    ||''LL##%	<<00AAd))* JJLDJJLs   7A? ?B)r   r   )r`   r   r`   N)r   r   r   r   r   r   rG   r   r   r   intr   r   r   r   r   r   r   r   r   r   r   rM   rI   rF   r   r   6  s^    } ( ($ !>
c 
rI   r   c                       \ rS rSrSrS\4S jrSS jrSS jrSS	 jr	SS
 jr
\S\4S j5       rS rS rS rS rS rS rS r S S\S\4S jjr S S\S\4S jjr    S!S\S\S\\   S\SS4
S jjrSrg)"r   iy  z*
PubSub object for multi database client.
r   c                 \    Xl         U R                   R                  R                  " S0 UD6  g)zInitialize the PubSub object for a multi-database client.

Args:
    client: MultiDBClient instance to use for pub/sub operations
    **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
NrM   )r   r>   r   )rE   r   r   s      rF   rG   PubSub.__init__~  s$     %%,,6v6rI   r`   c                     U $ rL   rM   rb   s    rF   r   PubSub.__enter__  r   rI   Nc                 F     U R                  5         g ! [         a     g f = frL   r   rb   s    rF   r   PubSub.__del__  s$    	 JJL 		r   c                 L    U R                   R                  R                  S5      $ )Nr   r   r>   execute_pubsub_methodrb   s    rF   r   PubSub.reset  s    ||,,BB7KKrI   c                 $    U R                  5         g rL   r   rb   s    rF   r   PubSub.close  r   rI   c                 V    U R                   R                  R                  R                  $ rL   )r   r>   active_pubsub
subscribedrb   s    rF   r  PubSub.subscribed  s    ||,,::EEErI   c                 P    U R                   R                  R                  " S/UQ76 $ )Nr   r  rE   r   s     rF   r   PubSub.execute_command  s*    ||,,BB
 $
 	
rI   c                 V    U R                   R                  R                  " S/UQ70 UD6$ )a  
Subscribe to channel patterns. Patterns supplied as keyword arguments
expect a pattern name as the key and a callable as the value. A
pattern's callable will be invoked automatically when a message is
received on that pattern rather than producing a message via
``listen()``.

psubscriber  r   s      rF   r  PubSub.psubscribe  4     ||,,BB

#)
 	
rI   c                 P    U R                   R                  R                  " S/UQ76 $ )zR
Unsubscribe from the supplied patterns. If empty, unsubscribe from
all patterns.
punsubscriber  r  s     rF   r  PubSub.punsubscribe  ,    
 ||,,BB
!
 	
rI   c                 V    U R                   R                  R                  " S/UQ70 UD6$ )a"  
Subscribe to channels. Channels supplied as keyword arguments expect
a channel name as the key and a callable as the value. A channel's
callable will be invoked automatically when a message is received on
that channel rather than producing a message via ``listen()`` or
``get_message()``.
	subscriber  r   s      rF   r  PubSub.subscribe  s4     ||,,BB

"(
 	
rI   c                 P    U R                   R                  R                  " S/UQ76 $ )zQ
Unsubscribe from the supplied channels. If empty, unsubscribe from
all channels
unsubscriber  r  s     rF   r  PubSub.unsubscribe  s%    
 ||,,BB=XSWXXrI   c                 V    U R                   R                  R                  " S/UQ70 UD6$ )aJ  
Subscribes the client to the specified shard channels.
Channels supplied as keyword arguments expect a channel name as the key
and a callable as the value. A channel's callable will be invoked automatically
when a message is received on that channel rather than producing a message via
``listen()`` or ``get_sharded_message()``.

ssubscriber  r   s      rF   r  PubSub.ssubscribe  r  rI   c                 P    U R                   R                  R                  " S/UQ76 $ )z]
Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
all shard_channels
sunsubscriber  r  s     rF   r  PubSub.sunsubscribe  r  rI   ignore_subscribe_messagesr   c                 L    U R                   R                  R                  SUUS9$ )z
Get the next message if one is available, otherwise None.

If timeout is specified, the system will wait for `timeout` seconds
before returning. Timeout should be specified as a floating point
number, or None, to wait indefinitely.
get_messager!  r   r  rE   r!  r   s      rF   r#  PubSub.get_message  s0     ||,,BB&? C 
 	
rI   c                 L    U R                   R                  R                  SUUS9$ )z
Get the next message if one is available in a sharded channel, otherwise None.

If timeout is specified, the system will wait for `timeout` seconds
before returning. Timeout should be specified as a floating point
number, or None, to wait indefinitely.
get_sharded_messager$  r  r%  s      rF   r(  PubSub.get_sharded_message  s0     ||,,BB!&? C 
 	
rI   
sleep_timedaemonexception_handlersharded_pubsubr
   c                 P    U R                   R                  R                  UUUU US9$ )N)r+  r,  r   r-  )r   r>   execute_pubsub_run)rE   r*  r+  r,  r-  s        rF   run_in_threadPubSub.run_in_thread  s6     ||,,??/) @ 
 	
rI   )r   )r`   r   r   )F        )r2  FNF)r   r   r   r   r   r   rG   r   r   r   r   propertyr   r  r   r  r  r  r  r  r  r   r#  r(  r   r   r0  r   rM   rI   rF   r   r   y  s    	7} 	7L FD F F






Y


 IL
)-
@E
" IL
)-
@E
$  04$

 
 $H-	

 
 

 
rI   r   )0loggingr@   concurrent.futuresr   concurrent.futures.threadr   typingr   r   r   r   redis.backgroundr	   redis.clientr
   redis.commandsr   r   redis.multidb.circuitr   r   rX   redis.multidb.command_executorr   redis.multidb.configr   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   redis.multidb.failure_detectorr   redis.multidb.healthcheckr   r   redis.utilsr   	getLoggerr   r   r   r   r   r   rM   rI   rF   <module>rD     s      + 8 0 0 0 + < 0 2 A D D D X : D $			8	$ W=' W= W=t& &@"L @FU
 U
rI   