
    ȅi#                     h   % S r SSKrSSKrSSKJr  SSKJrJr  SSKJ	r	  SSK
JrJr  SSKJr  SSKJs  Js  Js  Jr  \R*                  " 5       r0 r\\\4   \S'   S\S	\4S
 jr\ " S S5      5       rS\S	\S   4S jr " S S\R>                  5      r \ " S S\ 5      5       r!\ " S S\ 5      5       r"g)a/  
Barrier implementations for synchronizing distributed checkpoint operations.

This module provides abstract and concrete barrier implementations that ensure
all ranks in a distributed training environment complete their checkpoint operations
before proceeding, which is essential for data consistency.
    N)Counter)	dataclassfield)	timedelta)AnyOptionalBARRIER_REGISTRYbarrier_classreturnc                 N    [        U S5      (       a  U [        U R                  '   U $ )z0Register a barrier class in the global registry.barrier_type)hasattrr	   r   )r
   s    m/home/james-whalen/.local/lib/python3.13/site-packages/torch/distributed/checkpoint/_experimental/barriers.pyregister_barrierr      s&    }n--7D334    c                   N    \ rS rSr% SrSr\S-  \S'   \" \	S9r
\	\\4   \S'   Srg)BarrierConfig"   a  
Configuration for barrier construction.

This class provides a flexible way to configure different barrier implementations
with their specific constructor arguments. The barrier type will be looked up
from a registry and instantiated with rank_info and barrier_args.

Attributes:
    barrier_type: A string identifying the barrier type (e.g., "tcp_store").
                 If None, no barrier will be used.
    barrier_args: Dictionary of arguments to pass to the barrier constructor.
                 rank_info will be automatically injected as the first argument.

Examples:
    # No barrier
    BarrierConfig()

    # TCPStore barrier
    BarrierConfig(
        barrier_type="tcp_store",
        barrier_args={
            'timeout_barrier_init_secs': 30,
            'barrier_prefix_list': ['checkpoint'],
            'use_checkpoint_barrier_tcpstore_libuv': False,
            'tcpstore_port': 12345,
            'master_address': 'localhost'
        }
    )
Nr   )default_factorybarrier_args )__name__
__module____qualname____firstlineno____doc__r   str__annotations__r   dictr   r   __static_attributes__r   r   r   r   r   "   s.    <  $L#*##(#>L$sCx.>r   r   barrier_configBarrierc           	          U R                   c  gU R                   [        ;  a6  [        SU R                    S[        [        R	                  5       5       35      e[        U R                      nU" S0 U R
                  D6$ )a  
Create a barrier instance from BarrierConfig.

Args:
    barrier_config: Configuration for barrier construction.

Returns:
    Barrier instance or None if no barrier type is configured.

Raises:
    ValueError: If the barrier_type is not found in the registry.
NzUnknown barrier type: z. Available types: r   )r   r	   
ValueErrorlistkeysr   )r!   r
   s     r   create_barrier_from_configr'   F   s     ""*""*::$^%@%@$A B  $%5%:%:%< =>@
 	

 %^%@%@AM7>6677r   c                   v    \ rS rSrSr\R                  S\\\	4   4S j5       r
\R                  SS j5       rSrg)	r"   b   a  
Abstract base class for synchronization barriers.

A barrier ensures that all ranks in a distributed environment reach a certain
point in execution before any rank proceeds further, which is essential for
coordinating operations like checkpointing across multiple processes.
kwargsc                     g)z
Initialize a barrier.

Args:
    **kwargs: Keyword arguments for specific barrier implementations.
             Common arguments may include rank information, barrier prefixes,
             timeout settings, and other barrier-specific configuration.
Nr   )selfr*   s     r   __init__Barrier.__init__k       r   Nc                     g)z
Execute a synchronization barrier.

This method uses the barrier_prefix provided during initialization to
coordinate synchronization across processes.
Nr   r,   s    r   execute_barrierBarrier.execute_barrierw   r/   r   r   r   N)r   r   r   r   r   abcabstractmethodr   r   r   r-   r2   r    r   r   r   r"   r"   b   sJ     	c3h   	 r   c                   4    \ rS rSrSrSr  SS jrSS jrSrg)	DistBarrier   a  
A barrier implementation using PyTorch's distributed barrier for synchronization.

This barrier uses the built-in torch.distributed.barrier() function to coordinate
synchronization across multiple processes. It's simpler than TCPStoreBarrier but
requires an initialized process group.
dist_barrierNc                 N    [         R                  " 5       (       d  [        S5      eg)z
Initialize a DistBarrier.

This barrier requires an initialized PyTorch distributed process group.
No additional arguments are needed as it uses the current process group.

Raises:
    AssertionError: If the distributed process group is not initialized.
z2DistBarrier requires an initialized process group.N)distis_initializedAssertionErrorr1   s    r   r-   DistBarrier.__init__   s$     ""$$ !UVV %r   c                 .    [         R                  " 5         g)zT
Execute a synchronization barrier using the prefix provided during initialization.
N)r<   barrierr1   s    r   r2   DistBarrier.execute_barrier   s     	r   r   r4   )	r   r   r   r   r   r   r-   r2   r    r   r   r   r8   r8      s"     "LW	Wr   r8   c                   P    \ rS rSrSrSrS\S\S\S\S\S	\S
\S\4S jr	SS jr
Srg)TCPStoreBarrier   a;  
A barrier implementation using PyTorch's TCPStore for synchronization.

This barrier uses a TCP-based distributed key-value store to coordinate
synchronization across multiple processes. It uses a single TCP store
for all barrier operations, with different prefixes to distinguish between
different barrier types.
	tcp_storeglobal_rankglobal_world_sizebarrier_prefixtimeout_barrier_init_secs%use_checkpoint_barrier_tcpstore_libuvtcpstore_portmaster_addresstimeout_secsc	                    [         R                  SUUUUUUUU5	        [        5       U l        X0l        Xl        X l        Xl        [        R                  " U[        U5      U R                  [        US9U R
                  S:H  S9U l        g)a~  
Initialize a TCPStoreBarrier.

Args:
    global_rank: The rank of the current process in the distributed environment.
    global_world_size: The total number of processes in the distributed environment.
    barrier_prefix: A string prefix to identify this specific barrier.
    timeout_barrier_init_secs: Timeout in seconds for initializing the TCPStore.
    use_checkpoint_barrier_tcpstore_libuv: Whether to use libuv for the TCPStore.
    tcpstore_port: Port number for the TCPStore.
    master_address: Address of the master node for the TCPStore.
    timeout_secs: Maximum time in seconds to wait for all ranks to reach the barrier.
zInitializing TCPStore master_address=%s tcpstore_port=%s rank=%s world_size=%s barrier_prefix=%s timeout_barrier_init_secs=%s use_checkpoint_barrier_tcpstore_libuv=%s timeout_secs=%s)secondsr   )
world_sizetimeout	is_masterN)loggerinfor   _tcp_store_barrier_seq_barrier_prefix_global_rank_global_world_size_timeout_secsr<   TCPStoreintr   
_tcp_store)	r,   rG   rH   rI   rJ   rK   rL   rM   rN   s	            r   r-   TCPStoreBarrier.__init__   s    0 	G %1	
 07y#- ("3) --..&?@((A-
r   Nc           	         U R                   n[        R                  SUU R                  5        S[        S[
        4S jnU R                  R                  U" U R                  5      [        U R                  U   5      5        [        R                  " U R                  U R                  U[        U R                  U   5      -   S9  U R                  U==   S-  ss'   g)a  
Execute a synchronization barrier using the prefix provided during initialization.

The implementation uses a sequence number that is incremented every time
a barrier is reached. The sequence number is per barrier prefix to allow
different barriers to operate concurrently.
z3Executing barrier barrier_prefix=%s timeout_secs=%srankr   c                     SU  3$ )Nr`   r   )r`   s    r   	_rank_key2TCPStoreBarrier.execute_barrier.<locals>._rank_key   s    $= r   )storerQ   
key_prefix   N)rW   rT   rU   rZ   r\   r   r]   setrX   rV   
store_utilrA   rY   )r,   rI   rb   s      r   r2   TCPStoreBarrier.execute_barrier   s     --A	
	!C 	!C 	! 	d''(++N;<	
 	//..T%@%@%P!QQ		
 	##N3q83r   )rW   rX   rY   r]   rV   rZ   r4   )r   r   r   r   r   r   r\   r   boolr-   r2   r    r   r   r   rD   rD      sg     L6
6
 6
 	6

 $'6
 046
 6
 6
 6
p!9r   rD   )#r   r5   loggingcollectionsr   dataclassesr   r   datetimer   typingr   r   torch.distributeddistributedr<   %torch.distributed.elastic.utils.storeelasticutilsrd   rh   	getLoggerrT   r	   r   r   typer   r   r   r'   ABCr"   r8   rD   r   r   r   <module>rx      s       (      : : 
			 %' $sDy/ &D T   ?  ?  ?F8!8i88cgg >  '    F e9g e9 e9r   