
    ȅi+H                        S SK r S SKrS SKrS SKJr  S SKJr  S SKJr  S SK	J
r
Jr  S SKJr  S SKrS SKJr  S SKJr  S SKJr  S S	KJr  S S
KJr  S SKJr  S SKJr  S SKJr  S SK J!r!J"r"  S SK#J$r$J%r%J&r&  S SK'J(r(  S SK)J*r*J+r+  S SK,J-r-  SSK.J/r/J0r0J1r1  \(       a  S SK2J3r3  / SQr4 " S S\5      r5\" S\6S9    S2S\S\*S\Rn                  S-  S\8S \9S!\"S-  S"\4S# jj5       r:\" S$S%9\/SSSSSS$S&.S\S'\;\Rx                  -  S-  S\*S-  S!\"S-  S\Rn                  S-  S \9S(\9S"\4S) jj5       5       r=\ " S* S+5      5       r>\" S$S%9SSSS\5R~                  SSS$S,.S\S'\;\Rx                  -  S-  S\*S-  S!\"S-  S\Rn                  S-  S-\5S.\$S-  S \9S(\9S"\\>-  4S/ jj5       r@\" S$S%9S\S"\4S0 j5       rA     S3S\S\*S\Rn                  S-  S\8S \9S!\"S-  S(\9S"\4S1 jjrBg)4    N)Future)	dataclass)Enum)castTYPE_CHECKING)
deprecated)STATE_DICT_TYPE)$_ProcessBasedAsyncCheckpointExecutor)#_ThreadBasedAsyncCheckpointExecutor)_storage_setup)DefaultSavePlanner)_dcp_method_logger)Metadata)SavePlanSavePlanner)AsyncStagerDefaultStagerStagingOptions)Stateful)StorageWriterWriteResult)_get_default_group   )_api_bc_check_DistWrapper_profile)_AsyncCheckpointExecutor)save_state_dictsave
async_saveAsyncCheckpointerTypeAsyncSaveResponsec                        \ rS rSrSrSrSrSrg)r!   3   z!Enum for async checkpointer type.threadprocess N)__name__
__module____qualname____firstlineno____doc__THREADPROCESS__static_attributes__r'       g/home/james-whalen/.local/lib/python3.13/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr!   r!   3   s    +FGr0   r!   za`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           	          UR                  5         [        5          [        U UUUUU5      sSSS5        $ ! , (       d  f       g= f)z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r3   r4   r5   r6   r7   r8   s         r1   r   r   :   s;      

 
s	   5
ATlog_exceptionscheckpoint_idr4   r8   r5   r7   use_collectivesr@   rA   c          
         [         R                  R                  S5        U=(       d;    [        R                  " 5       (       + =(       d    [        R
                  " 5       (       + nU(       a  [        R                  " SSS9  [        5          [        [        [        X!SS95      n[        [        U 5      UUUUUS9sSSS5        $ ! , (       d  f       g= f)	a+  
Save a distributed model in SPMD style.

This function is different from ``torch.save()`` as it handles
``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
save will call ``state_dict`` before serialization.

.. warning::
    There is no guarantees of Backwards Compatibility across PyTorch versions
    for saved state_dicts.

.. warning::
    If using the `process_group` argument, make sure that only its ranks
    call `save_state_dict` and that all data in state_dict belong to it.

.. note::
    When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
    the shard_group should be calling `save_state_dict` and the corresponding process
    group needs to be passed in.

.. note::
    If no process group is available, this function assumes the intention is to save the
     state_dict in the local process.

.. note:
    Rank 0 is assumed to be the coordinator rank.


Args:
    state_dict (Dict[str, Any]): The state_dict to save.
    checkpoint_id (Union[str, os.PathLike, None]):
        The ID of this checkpoint instance. The meaning of the checkpoint_id
        depends on the storage. It can be a path to a folder or to a file.
        It can also be a key if the storage is a key-value store.
        (Default: ``None``)
    storage_writer (Optional[StorageWriter]):
        Instance of StorageWriter used to perform writes. If this is not
        specified, DCP will automatically infer the writer based on the
        checkpoint_id. If checkpoint_id is also None, an exception will
        be raised. (Default: ``None``)
    planner (Optional[SavePlanner]):
        Instance of SavePlanner. If this is not specified, the default
        planner will be used. (Default: ``None``)
    process_group (Optional[ProcessGroup]):
        ProcessGroup to be used for cross-rank synchronization.
        (Default: ``None``)
    no_dist (bool):
        If ``True``, this function will assume the intent is to load
        a checkpoint on a single rank/process.
        (Default: ``False``)
    use_collectives (bool): If ``False``, this function will assume the intent is to save
        a checkpoint without using cross-rank synchronization.
        (Default: ``True``)
        This configuration is experimental and should be used with caution.
        It will change the format of the saved checkpoint and may not be backward compatible.

Returns:
    Metadata: Metadata object for the saved checkpoint.

Example:
    >>> # xdoctest: +SKIP
    >>> my_model = MyModule()

    >>> state_dict = {"model": my_model}

    >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
    ...     "/checkpoint/1"
    ... )
    >>> torch.distributed.checkpoint.save(
    >>>     state_dict=state_dict,
    >>>     storage_writer=fs_storage_writer,
    >>> )

.. note::
    save_state_dict uses collectives to coordinate writes across ranks.
    For NCCL-based process groups, internal tensor representations of
    objects must be moved to the GPU device before communication takes place.
    In this case, the device used is given by ``torch.cuda.current_device()``
    and it is the user's responsibility to ensure that this is set so that
    each rank has an individual GPU, via ``torch.cuda.set_device()``.
z!torch.distributed.checkpoint.savezptorch.distributed is disabled, unavailable or uninitialized, assuming the intent is to save in a single process.   
stacklevelF)reader)r3   r4   r5   r7   r8   rA   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r   r<   _stateful_to_state_dict)r3   r@   r4   r8   r5   r7   rA   s          r1   r   r   V   s    ~ 
HH  !DEQd//11Q4;N;N;P7PG~	

 
>.PUV
  .z:)'+
 
s   
/C
Cc                   :    \ rS rSr% Sr\S   \S'   \S   \S'   Srg)r"      a	  This class contains futures for staging and upload completion.
It is returned by async_save().
staging_completion is a future that indicates when local copy
of state_dict is complete.
upload_completion is a future that indicates when a checkpoint
completed saving.
Nstaging_completionupload_completionr'   )r(   r)   r*   r+   r,   r   __annotations__r/   r'   r0   r1   r"   r"      s     t$d|#r0   r"   )r@   r4   r8   r5   async_checkpointer_typeasync_stagerr7   rA   rU   rV   c          
        ^ ^ [         R                  R                  S5        [        R                  " 5       (       a\  [        R
                  " 5       (       aB  U=(       d
    [        5       n	[         R                  " S5      U	R                  ;  a  [        S5      eTc2  Ub  [        U[        5      (       a  UmO[        [        SSSS5      5      m[        T 5      m [        SS9S[         ["           ["        -  4UU 4S	 jj5       n
U
" 5       nU[$        R&                  :X  a
  [)        5       O	[+        5       nUR-                  UUUUUUUS
9n[        U[         5      (       ak  Un[!        5       nU4S[         ["           S[         S   4S jjnUR/                  5       (       d  UR1                  U5        OUR3                  S5        [5        XS9$ [        SS9U4S j5       nU" 5         U$ )a,
  Asynchronous version of ``save``. This code first de-stages the state_dict on to the
staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

.. warning::
    This feature is experimental and subject to change.
    MUST CALL CLOSE AFTER LAST CHECKPOINT IS SAVED

Args:
    state_dict (Dict[str, Any]): The state_dict to save.
    checkpoint_id (Union[str, os.PathLike, None]):
        The ID of this checkpoint instance. The meaning of the checkpoint_id
        depends on the storage. It can be a path to a folder or to a file.
        It can also be a key if the storage is a key-value store.
        (Default: ``None``)
    storage_writer (Optional[StorageWriter]):
        Instance of StorageWriter used to perform 'stage' and  'save'. If
        this is not specified, DCP will automatically infer the writer based on the
        checkpoint_id. If checkpoint_id is also None, an exception will
        be raised. (Default: ``None``)
    planner (Optional[SavePlanner]):
        Instance of SavePlanner. If this is not specified, the default
        planner will be used. (Default: ``None``)
    process_group (Optional[ProcessGroup]):
        ProcessGroup to be used for cross-rank synchronization.
        (Default: ``None``)
    async_checkpointer_type (AsyncCheckpointerType):
        whether to do checkpoint in separate thread or process
        (Default: ``AsyncCheckpointerType.THREAD``)
    async_stager (AsyncStager):
        provides staging implementation. If storage_writer implements AsyncStager
        and async_stager is provided, async_stager will be used for staging
    no_dist (bool):
        If ``True``, this function will assume the intent is to save
        a checkpoint on a single rank/process.
        (Default: ``False``)
    use_collectives: If False, Save the checkpoint without rank coordination. (Default: ``True``)
        This configuration is experimental and should be used with caution.
        It will change the format of the saved checkpoint and may not be backward compatible.

Returns:
    Future: A future holding the resultant Metadata object from `save`.

Example:
    >>> # xdoctest: +SKIP
    >>> my_model = MyModule()

    >>> state_dict = {"model": my_model}

    >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
    ...     "/checkpoint/1"
    ... )
    >>> checkpoint_future = torch.distributed.checkpoint.async_save(
    >>>     state_dict=state_dict,
    >>>     storage_writer=fs_storage_writer,
    >>> )
    >>>
    >>> # ... do some work ...
    >>>
    >>> checkpoint_future.result()

z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'NFTr=   r9   c                  &   > T R                  T5      $ N)stage)rV   r3   s   r1   stage_state_dict$async_save.<locals>.stage_state_dict>  s    !!*--r0   r?   original_staging_futurereturn_staging_futurec                      U R                  5         UR                  S 5        g ! [         a  nUR                  U5         S nAg S nAff = frZ   )result
set_result	Exceptionset_exception)r^   r_   es      r1   callbackasync_save.<locals>.callbackX  sB    7'..0%006 7%33A667s   !$ 
A	AA	)rR   rS   c                  J   > T R                   (       a  T R                  5         g g rZ   ) should_synchronize_after_executesynchronize_staging)rV   s   r1   maybe_synchronize_staging-async_save.<locals>.maybe_synchronize_stagingm  s    <<002 =r0   )rG   rH   rI   rJ   rK   rL   r   device_device_typesAssertionError
isinstancer   r   r   rO   r   r   r	   r!   r.   r
   r   execute_savedoneadd_done_callbackrb   r"   )r3   r@   r4   r8   r5   rU   rV   r7   rA   pgr\   staging_future_or_state_dictupload_executorupload_futurestaging_futurer_   rf   rk   s   `     `           r1   r    r       s   T 
HH  !JKt2244202<<b&6&66 x  %*^[*Q*Q)L(	L )4Jt,.f_5G . -. $4#5  #&;&C&CC 	-.02  ,88$#%#' 9 M .775.4h 3H	7%+O%<	7#)$<	7 ""$$,,X6!,,T2 !4
 	

 
4	0	3 
1	3 	"#r0   c                     0 nU R                  5        H)  u  p#S nSU 3Ul        [        SS9" U5      " U5      X'   M+     U$ )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.c                 P    [        U [        5      (       a  U R                  5       $ U $ rZ   )rp   r   r3   )elems    r1   _elem_to_state_dict4_stateful_to_state_dict.<locals>._elem_to_state_dict|  s     (24(B(B4??$LLr0   z_stateful_to_state_dict.Tr=   )itemsr(   r   )r3   stateful_state_dictkeyr{   r|   s        r1   rO   rO   v  s^     %%'		M *B#'G$#5T#J$

$  ( r0   c                   ^ ^^^^^^ [         R                  R                  S5        [        X$(       + U5      mTc
  [	        5       mTc  [        S5      eS m0 n[        TSS 5      =nb  XS'   TR                  US'   [        S0 UD6UUU UU4S j5       n	[        S0 UD6UUU4S j5       n
S mT(       a  TR                  SX5      mOU	" 5       nU
" U/5      nUS   m[        S0 UD6UUU4S	 j5       n[        S0 UD6UU4S
 j5       nT(       a  TR                  SX5      nU$ U" 5       nU" U/5      nTR                  5         U$ )Nz,torch.distributed.checkpoint.save_state_dictplanner is Noner@   r5   c                  Z  > Tc  [        S5      eTR                  5       n S[        R                  " TR                  5      R
                  ;  a2  [        R                  " SSS9  TR	                  TTR                  5        OTR	                  TU TR                  S9  S[        R                  " TR                  5      R
                  ;   a&  TR                  TR                  TR                  TS9  OTR                  TR                  5        TR                  5       nTR                  U5      nU$ )	Nr   storage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.rC   rD   )r3   r   is_coordinatorkwargs)rankrA   )ro   r   inspect	signatureset_up_planner
parametersrM   rN   r   set_up_storage_writerr   create_local_planprepare_local_plan)r   
local_plandistWr8   r3   r4   rA   s     r1   
local_step$_save_state_dict.<locals>.local_step  s   ? !233%224!2!273I3I!J!U!UUMM. 	 "":u/C/CD""%)$33 #    !E!EFQQR 00$$ZZ / 1  001E1EF..0
#66zB
r0   c                 n   > Tc  [        S5      eTR                  U 5      u  n mTR                  U 5      n U $ )Nr   )ro   create_global_planprepare_global_plan)all_local_plansglobal_metadatar8   r4   s    r1   global_step%_save_state_dict.<locals>.global_step  sA     ? !233+2+E+Eo+V((<<_Mr0   planr   c                     > Tc  [        S5      eTc  [        S5      eTR                  T5      n TR                  U T5      nUR                  5         UR	                  5       $ )Nr   zcentral_plan is None)ro   finish_plan
write_datawaitvalue)final_local_plan
all_writescentral_planr8   r4   s     r1   r   $_save_state_dict.<locals>.write_data  sc    ? !233 !788"..|<#../?I
!!r0   c                 D   > Tc  [        S5      eTR                  TU S9  T$ )Nzglobal_metadata is None)metadataresults)ro   finish)all_resultsr   r4   s    r1   finish_checkpoint+_save_state_dict.<locals>.finish_checkpoint  s.    " !:;;Lr0   writer'   )rG   rH   rI   r   r   ro   getattrgroupr   reduce_scatter
all_reducebarrier)r3   r4   r5   r6   r7   r8   rA   ckpt_kwargsckpt_idr   r   r   global_planr   r   r   write_resultsr   r   r   s   ``   ``          @@@r1   r<   r<     sd    
HH  !OP5EFE$&.//OK>?DAAN'.O$',{{O$&+&! ! '!F &+& ' %)L++FJL)|
&1:,&?"1~&+&	" '	" &+& ' ##GZK O	 ,6<$m_5Or0   )Nr   FN)Nr   FNT)Cr   osrM   concurrent.futuresr   dataclassesr   enumr   typingr   r   typing_extensionsr   rG   torch.distributeddistributedrJ   #torch.distributed._state_dict_utilsr	   4torch.distributed.checkpoint._async_process_executorr
   3torch.distributed.checkpoint._async_thread_executorr   +torch.distributed.checkpoint._storage_utilsr   ,torch.distributed.checkpoint.default_plannerr   #torch.distributed.checkpoint.loggerr   %torch.distributed.checkpoint.metadatar   $torch.distributed.checkpoint.plannerr   r   $torch.distributed.checkpoint.stagingr   r   r   %torch.distributed.checkpoint.statefulr   $torch.distributed.checkpoint.storager   r   "torch.distributed.distributed_c10dr   utilsr   r   r   ,torch.distributed.checkpoint._async_executorr   __all__r!   FutureWarningProcessGroupintboolr   strPathLiker   r"   r-   r    rO   r<   r'   r0   r1   <module>r      s=    	  % !  & (    ? G K B : F 
 ; K A 8 8 UD  ! /3"&

!
 $$t+
 	

 
 4
 


. 4( /3+/"&.2 r
r
 $t+r
 "D(	r

 4r
 $$t+r
 r
 r
 r
  )r
j 
$ 
$ 
$ 4( /3+/"&.25J5Q5Q'+ WW $t+W "D(	W
 4W $$t+W 3W $W W W W )Wt 4( O  )& /3"& hh!h $$t+h 	h
 h 4h h hr0   