
    ȅi4                         S SK r S SKJrJr  S SKJr  S SKJrJr  S SK	r	S SK
Jr  / SQr " S S5      r " S S	\5      r " S
 S\5      r " S S5      rg)    N)ABCabstractmethod)TracebackType)Any
NamedTuple)JoinHookJoinableJoinc                   4    \ rS rSrSrS	S jrS\SS4S jrSrg)
r      a  
This defines a join hook, which provides two entry points in the join context manager.

Entry points : a main hook, which is called repeatedly while there exists a non-joined
process, and a post-hook, which is called once all processes have joined.

To implement a join hook for the generic join context manager, define a
class that inherits from :class:`JoinHook` and override ``main_hook()`` and
``post_hook()`` as appropriate.
returnNc                     g)zCall this hook while there exists a non-joined process to shadow collective communications in a training iteration.

Training iteration i.e., in one forward pass, backward pass, and optimizer step.
N selfs    [/home/james-whalen/.local/lib/python3.13/site-packages/torch/distributed/algorithms/join.py	main_hookJoinHook.main_hook           is_last_joinerc                     g)a  
Call hook after all processes have joined.

It is passed an additional ``bool`` argument ``is_last_joiner``, which indicates if the rank is one of the last to join.

Arguments:
    is_last_joiner (bool): ``True`` if the rank is one of the last to
        join; ``False`` otherwise.
Nr   )r   r   s     r   	post_hookJoinHook.post_hook    r   r   r   r   N)	__name__
__module____qualname____firstlineno____doc__r   boolr   __static_attributes__r   r   r   r   r      s    		 	 	r   r   c                      ^  \ rS rSrSr\S	U 4S jj5       r\S\4S j5       r\	\S\
R                  4S j5       5       r\	\S\4S j5       5       rSrU =r$ )
r	   ,   aC  
This defines an abstract base class for joinable classes.

A joinable class
(inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
which returns a :class:`JoinHook` instance, in addition to
:meth:`join_device` and :meth:`join_process_group` that return device and
process group information, respectively.
r   c                 T   > [         TU ]  5         [        R                  5       U l        g N)super__init___JoinConfigconstruct_disabled_join_config_join_config)r   	__class__s    r   r(   Joinable.__init__7   s    'FFHr   c                     g)aV  
Return a :class:`JoinHook` instance for the given :class:`Joinable`.

Arguments:
    kwargs (dict): a :class:`dict` containing any keyword arguments
        to modify the behavior of the join hook at run time; all
        :class:`Joinable` instances sharing the same join context
        manager are forwarded the same value for ``kwargs``.
Nr   )r   kwargss     r   	join_hookJoinable.join_hook<   s     	r   c                     g)zeReturn the device from which to perform collective communications needed by the join context manager.Nr   r   s    r   join_deviceJoinable.join_deviceI        	r   c                     g)zfReturns the process group for the collective communications needed by the join context manager itself.Nr   r   s    r   join_process_groupJoinable.join_process_groupO   r5   r   )r+   r   )r   r   r   r   r    r   r(   r   r0   propertytorchdevicer3   r   r7   r"   __classcell__)r,   s   @r   r	   r	   ,   s     I I 
X 
 
 U\\    C   r   r	   c                   H    \ rS rSr% Sr\\S'   \\S'   \\S'   \S 5       rSr	g)	r)   V   zdThis includes all fields needed from a :class:`Joinable` instance for the join context manager side.enablethrow_on_early_terminationis_first_joinablec                      [        SSSS9$ )zReturn a :class:`_JoinConfig` instance indicating that join-related logic should be disabled.

e.g. if the caller is not in a join context manager.
Fr?   r@   rA   )r)   r   r   r   r*   *_JoinConfig.construct_disabled_join_config]   s     Ue
 	
r   r   N)
r   r   r   r   r    r!   __annotations__staticmethodr*   r"   r   r   r   r)   r)   V   s(    oL $$
 
r   r)   c                       \ rS rSrSr  SS\\   S\S\4S jjrSS jr	SS	 jr
S
 rS\\   S-  S\S-  S\S-  4S jrS rS r\S\4S j5       rSrg)r
   h   a
  
This class defines the generic join context manager, which allows custom hooks to be called after a process joins.

These hooks should shadow the
collective communications of non-joined processes to prevent hanging and
erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
for details about the hook definition.

.. warning::
    The context manager requires each participating :class:`Joinable` to
    call the method :meth:`notify_join_context()` before its own per-
    iteration collective communications to ensure correctness.

.. warning::
    The context manager requires that all ``process_group`` attributes in
    the :class:`JoinHook` objects are the same. If there are multiple
    :class:`JoinHook` objects, then the ``device`` of the first is used.
    The process group and device information is used for checking for non-
    joined processes and for notifying processes to throw an exception if
    ``throw_on_early_termination`` is enabled, both of which using an all-
    reduce.

Arguments:
    joinables (List[Joinable]): a list of the participating
        :class:`Joinable` s; their hooks are iterated over in the given
        order.

    enable (bool): a flag enabling uneven input detection; setting to
        ``False`` disables the context manager's functionality and should
        only be set when the user knows the inputs will not be uneven
        (default: ``True``).

    throw_on_early_termination (bool): a flag controlling whether to throw an
        exception upon detecting uneven inputs (default: ``False``).

Example::

    >>> import os
    >>> import torch
    >>> import torch.distributed as dist
    >>> import torch.multiprocessing as mp
    >>> # xdoctest: +SKIP
    >>> import torch.nn.parallel.DistributedDataParallel as DDP
    >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
    >>> from torch.distributed.algorithms.join import Join
    >>>
    >>> # On each spawned worker
    >>> def worker(rank):
    >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
    >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
    >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
    >>>     # Rank 1 gets one more input than rank 0
    >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
    >>>     with Join([model, optim]):
    >>>         for input in inputs:
    >>>             loss = model(input).sum()
    >>>             loss.backward()
    >>>             optim.step()
    >>>     # All ranks reach here without hanging/erroring
	joinablesr?   r@   c                    [        U5      S:X  a  [        S5      eXl        U R                   Vs/ s H  oUR                  " S0 UD6PM     snU l        X l        X0l        U R                  5         U R                  5         g s  snf )Nr   z7The join context manager requires at least one joinabler   )	len
ValueError
_joinablesr0   _join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   rI   r?   r@   r/   joinables         r   r(   Join.__init__   sw     y>QVWW#9=
9HX((
 +E(""$!
s   A?Nc                     [        U R                  5      S:  d   eSnU R                   H)  n[        U R                  U R                  US9Ul        SnM+     g)zESet the :class:`_JoinConfig` of each participating :class:`Joinable`.r   TrC   FN)rK   rM   r)   rO   rP   r+   )r   rA   rS   s      r   rQ   Join._set_joinable_configs   sU    4??#a''' H$/||+/+K+K"3%H!
 !& (r   c                 
   SnSnU R                    H>  nUc  UR                  nOXR                  :w  a  [        S5      eUb  M2  UR                  nM@     Xl        [
        R                  " U R                  5      U l        X l        g)as  
Extract the process group and device information from the joinables.

If there are multiple joinables, then the context manager uses the
first specified device.

Preconditions:
    ``self._joinables`` is not ``None`` and is non-empty.

Raises:
    ValueError
        If there are multiple conflicting ``process_group`` attributes
        among the ``Joinable`` objects.
Nz7Using join context manager with multiple process groups)	rM   r7   rL   r3   _process_groupdistget_rank_rank_device)r   process_groupr;   rS   s       r   rR   Join._extract_dist_info   s~     H$ ( ; ;"="== M  ~!-- ( ,]]4#6#67
r   c                     g r&   r   r   s    r   	__enter__Join.__enter__   s    r   typevalue	tracebackc           	         U R                   (       a  U(       a  gSnSnSnSn[        R                  " S5        U(       d  Xg:  a)  [        R                  " SU SU R                   S	U S
3SS9  U R                  5       nUS:X  a  SnOKU R                  (       a  U R                  5         U R                   H  n	U	R                  5         M     SnUS-  nU(       d  M  U R                   H  n	U	R                  U5        M     g)z
Repeatedly runs the main hooks until all processes join; then, runs the post-hooks.

Raises:
    RuntimeError
        If ``throw_on_early_termination=True``.
NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )
stacklevel   )rO   warningssimplefilterwarnr[   _get_num_nonjoined_procsrP   _notify_procs_to_terminaterN   r   r   )
r   rb   rc   rd   all_procs_joinedr   iWARN_THRESHOLDnum_nonjoined_procsr0   s
             r   __exit__Join.__exit__   s     ||t f%"!A%&&=zzl.0@ A33  ! #'"?"?"A"a'#' 33335 "&!1!1I'') "2 "'Q1 #"6 ))I/ *r   c                     [         R                  " SU R                  S9n[        R                  " XR
                  S9  UR                  5       $ )zaReturn the number of non-joined processes by shadowing an all-reduce in the non-joined processes.ri   r;   group)r:   zerosr\   rY   
all_reducerX   item)r   rr   s     r   rm   Join._get_num_nonjoined_procs  s9    #kk!DLLA+3F3FG"''))r   c                     [         R                  " SU R                  S9n[        R                  " XR
                  S9  [        SU R                   S35      e)zSchedule an all-reduce to notify non-joined processes to terminate.

Also raise a ``RuntimeError`` indicating that the current process has exhausted its inputs.
ri   rv   rw   zRank z exhausted all inputs.)r:   onesr\   rY   rz   rX   RuntimeErrorr[   )r   r~   s     r   rn   Join._notify_procs_to_terminate   sC    
 zz!DLL1$7$78U4::,.DEFFr   rS   c                    [        U S5      (       d   S[        U 5       S35       eU R                  nUR                  (       a  UR                  (       d  gU R
                  nU R                  n[        R                  " SUS9n[        R                  " XCSS9nUR                  (       aK  [        R                  " SUS9n[        R                  " XcS	9  UR                  5       nU(       a  [        S
5      eU$ )a  
Notifies the join context manager that the calling process has not yet joined.

Then, if ``throw_on_early_termination=True``, checks if uneven inputs have been detected
(i.e. if one process has already joined) and throws an exception if so.

This method should be called from a :class:`Joinable` object before
its per-iteration collective communications. For example, this should
be called at the beginning of the forward pass in
:class:`DistributedDataParallel`.

Only the first :class:`Joinable` object passed into the context
manager performs the collective communications in this method, and
for the others, this method is vacuous.

Arguments:
    joinable (Joinable): the :class:`Joinable` object calling this
        method.

Returns:
    An async work handle for the all-reduce meant to notify the context
    manager that the process has not yet joined if ``joinable`` is the
    first one passed into the context manager; ``None`` otherwise.
r+   zCheck that the z/ constructor calls the ``Joinable`` constructorNri   rv   T)rx   async_oprw   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrrb   r+   rA   r?   r3   r7   r:   r~   rY   rz   r@   ry   r{   r   )rS   join_configr;   r]   r~   workry   should_throws           r   notify_join_contextJoin.notify_join_context)  s    4 x00 	
d8n- .' '	
0
 ++,,K4F4F%% 33 zz!F+t4H11KK&1EOOE7 ::<L"1  r   )r\   rO   rN   rM   rX   r[   rP   )TFr   )r   r   r   r   r    listr	   r!   r(   rQ   rR   r`   rb   BaseExceptionr   rs   rm   rn   rF   r   r"   r   r   r   r
   r
   h   s    ;@ +0	">" " %)	"$
&> 30=!D(30 t#30 !4'	30j*G 4h 4 4r   r
   )rj   abcr   r   typesr   typingr   r   r:   torch.distributeddistributedrY   __all__r   r	   r)   r
   r   r   r   <module>r      sM     #  "    + <'s 'T
* 
$v vr   