
    ΅i                       % S SK r S SKrS SKrS SKrS SKrS SKrS SKrS SKrS SKrS SK	r	S SK
r
S SKrS SKrS SKrS SKrS SKrS SKJr  S SKJr  S SKJr  S SKJr  S SKJr  S SKJrJrJr  S SKJr  S S	KJ r J!r!J"r"J#r#  S S
K$J%r%  S SK&r&S SK'r&S SK(r&S SK)J*r+  S SK,J-r-  S SK.J/r/  S SK0J1r1  S SK2J3r3  S SK4J5r5  S SK6J7r7J8r8J9r9J:r:J;r;J<r<J=r=J>r>J?r?J@r@JArAJBrBJCrC  S SKDJErEJFrFJGrG  \R                  " \I5      rJ\JR                  \R                  5        / SQrMSS/rN\>=(       d    \?=(       d    \BrO " S S\!5      rP0 S\P" SS5      _S\P" SS5      _S\P" SS5      _S\P" S S!5      _S"\P" S#S$5      _S%\P" S&S'5      _S(\P" S)S*5      _S+\P" S,S-5      _S.\P" S/S05      _S1\P" S2S35      _S4\P" S5S65      _S7\P" S8S95      _S:\P" S;S<5      _S=\P" S>S?5      _S@\P" SASB5      _SC\P" SDSE5      _SF\P" SGSH5      _SI\P" SJSK5      0ErQ\ " SL SM5      5       rRSN rSSO rTSP rUSQ rVSR rWSS rXST rYSU\Z4SV jr[SW r\SX\]4SY jr^SZ\ S[\]SU\]4S\ jr_S] r`S^ raS_ rbS` rcSa rdSb reSc rfSd rgSe rhSf riSSg jrjSh rkSi rl\:" Sj 5      rm\Z\nSk'   Sl roSm\p\qSn4   4So jrrSSp jrsSq rtSr\&R                  Ss\]St\]SU\Z4Su jrv\;SvSwSx\" SySz9SxS{Sx4S| j5       rw\A(       a  S}rxO\]" \R                  " S~S5      5      rxSS0rz\@(       a  S\zS'   SS\Z4S jjr{SU\]4S jr|\S 5       r}SS\]S\]S\]4S jjr~S\]S\q4S jrSq\"\
GR                     \nS'   SS\"\q   SUS4S jjrSS jrSr " S S\C5      r " S S\5      rS\\q\\    4   S\S\ 4S jr\GR                  SU\Z4S j5       rS rS\x\4S jr " S S\C5      r " S S\-GR                  5      r " S S\-GR                  5      r\ SS j5       r " S S\&GR&                  GR(                  R                  5      r " S S\5      r " S S\C5      rg)    N)Callable)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)Any
NamedTupleOptionalUnion)patch)
DeviceType)_SymmetricMemory)	trace_log)common_utils)FILE_SCHEMAfind_free_portIS_SANDCASTLELazyValretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_if	TEST_CUDATEST_HPUTEST_WITH_ROCMTEST_WITH_TSANTEST_XPUTestCase)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroupncclxcclhcclcudaxpuc                   *    \ rS rSr% \\S'   \\S'   Srg)TestSkipD   	exit_codemessage N)__name__
__module____qualname____firstlineno__int__annotations__str__static_attributes__r0       d/home/james-whalen/.local/lib/python3.13/site-packages/torch/testing/_internal/common_distributed.pyr,   r,   D   s    NLr9   r,   backend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesr&   L   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importno_acceleratorY   zaccelerator is not available.c                       \ rS rSr0 r1 Sk\S'   \" 5       \S'   1 Sk\S'   1 Sk\S'   0 r1 Sk\S	'   1 Sk\S
'   1 Sk\S'   1 Sk\S'   \" 5       \S'   \(       a  S1\S'   \(       a	  S1\S'   Sr	gSr	g)DistTestCasesc   >   mpiuccr&   r'   allgather_coalescedr	   >   rZ   r&   r'   zsendrecv anysourcezcpu barrier>   rZ   gloor&   gpur)   ddpsubgrouppluginr(   hpur'   r*   r0   N)
r1   r2   r3   r4   skip_collectivesetbackend_featurer   r    r8   r0   r9   r:   rW   rW   c   s     O-KO)* #OH,CO()%<OM" O4OE5OF4OE"9OJ #OH"("( r9   rW   c                     U [         ;   $ N)DDP_RANK_DEVICES)devices    r:   requires_ddp_rankri   y   s    %%%r9   c                 0   ^  [        T 5      U 4S j5       nU$ )zSkips if the world size exceeds the number of GPUs, ensuring that if the
test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                    > [         (       d=  [        (       d2  [        (       d'  [        R                  " [
        S   R                  5        [        [        R                  S   5      n[         (       aL  [        R                  R                  5       U:  a*  [        R                  " [
        SU 3   R                  5        [        (       aL  [        R                  R                  5       U:  a*  [        R                  " [
        SU 3   R                  5        [        (       aL  [        R                  R                  5       U:  a*  [        R                  " [
        SU 3   R                  5        T" U 0 UD6$ )NrA   
WORLD_SIZE
multi-gpu-)r   r   r    sysexit
TEST_SKIPSr.   r5   osenvirontorchr)   device_countra   r*   )argskwargs
world_sizefuncs      r:   wrapperskip_if_no_gpu.<locals>.wrapper   s    	XXHHZ	*445L12
9002Z?HHZ*ZL 9:DDE8		..0:=HHZ*ZL 9:DDE8		..0:=HHZ*ZL 9:DDET$V$$r9   r
   rx   ry   s   ` r:   skip_if_no_gpur}   }   s"     4[% % Nr9   c                 0   ^  [        T 5      U 4S j5       nU$ )Nc                     > [         R                  S   S:w  aG  [        [         R                  S   5      S:  a'  [        R                  " [
        S   R                  5        T" U 0 UD6$ )NBACKENDrY   rl      r=   rq   rr   r5   rn   ro   rp   r.   ru   rv   rx   s     r:   ry   (skip_if_small_worldsize.<locals>.wrapper   sR    JJy!U*BJJ|4L0MPQ0QHHZ 12<<=T$V$$r9   r{   r|   s   ` r:   skip_if_small_worldsizer           
4[% % Nr9   c                 0   ^  [        T 5      U 4S j5       nU$ )Nc                     > [         R                  S   S:w  aJ  [        [         R                  S   5      S-  S:X  a'  [        R                  " [
        S   R                  5        T" U 0 UD6$ )Nr   rY   rl         r?   r   r   s     r:   ry   &skip_if_odd_worldsize.<locals>.wrapper   sW    JJy!U*BJJ|4L0MPQ0QUV0VHHZ0::;T$V$$r9   r{   r|   s   ` r:   skip_if_odd_worldsizer      r   r9   c                    ^ ^ UU 4S jnU$ )Nc                 6   >^  [        T 5      UU U4S j5       nU$ )Nc                     > TS:X  aM  [         R                  R                  5       T:  a+  [        R                  " [
        ST 3   R                  5        g T" U 0 UD6$ Nr&   rm   )rs   r)   rt   rn   ro   rp   r.   )ru   rv   backendrx   ns     r:   ry   Crequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapper   sM    & UZZ%<%<%>%Bj$45??@T,V,,r9   r{   )rx   ry   r   r   s   ` r:   	decorator2require_n_gpus_for_nccl_backend.<locals>.decorator   s     	t	- 
	- r9   r0   )r   r   r   s   `` r:   require_n_gpus_for_nccl_backendr      s     r9   c                      S n U $ )Nc                 0   ^  [        T 5      U 4S j5       nU$ )Nc                     >  SSK JnJn  T" U 0 UD6$ ! [         a*    [        R
                  " [        S   R                  5         g f = f)Nr   )AutoModelForMaskedLM
BertConfigrR   )transformersr   r   ImportErrorrn   ro   rp   r.   )ru   rv   r   r   rx   s       r:   ry   ?import_transformers_or_skip.<locals>.decorator.<locals>.wrapper   sA    >IT,V,, >M2<<=>s    1AAr{   r|   s   ` r:   r   .import_transformers_or_skip.<locals>.decorator   s     	t	> 
	> r9   r0   )r   s    r:   import_transformers_or_skipr      s    
 r9   c                    [         (       a#  [        R                  R                  5       U :  a  g[        (       a#  [        R
                  R                  5       U :  a  g[        (       a#  [        R                  R                  5       U :  a  ggNTF)r   rs   r)   rt   r   ra   r    r*   )xs    r:   at_least_x_gpur      sY    yUZZ,,.!3xEII**,1xEII**,1r9   returnc                 Z    [        U S   SS 5      n[        U 5      S:X  d  Uc  gU" U5        g)Nr   _handle_test_skipFT)getattrlen)ru   msgr   s      r:   _maybe_handle_skip_if_lt_x_gpur      s5    Q)<dC
4yA~*2cr9   c                    ^  U 4S jnU$ )Nc                 4   >^  [        T 5      U U4S j5       nU$ )Nc                    > [         R                  R                  5       (       a*  [         R                  R                  5       T:  a  T" U 0 UD6$ [        (       a*  [         R
                  R                  5       T:  a  T" U 0 UD6$ [        (       a*  [         R                  R                  5       T:  a  T" U 0 UD6$ [        ST 3   n[        XR                  5      (       d!  [        R                  " UR                  5        g g )Nrm   )rs   r)   is_availablert   r   ra   r    r*   rp   r   r/   rn   ro   r.   )ru   rv   	test_skiprx   r   s      r:   ry   4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   s    zz&&((UZZ-D-D-F!-KT,V,,xEII2249T,V,,xEII2249T,V,,"Zs#34I1$8I8IJJ,,- Kr9   r{   )rx   ry   r   s   ` r:   r   #skip_if_lt_x_gpu.<locals>.decorator   s     	t		. 
		. r9   r0   )r   r   s   ` r:   skip_if_lt_x_gpur      s     r9   r   c                    ^  U 4S jnU$ )a2  
Decorator to request a specific world size for a test. The test harness can
read this attribute to set the number of ranks to spawn. If there are fewer
than `n` CUDA devices available, the test should be skipped by the harness.

Usage:
    @require_world_size(3)
    def test_something(self):
        ...
c                    > TU l         [        R                  R                  5       n[        R
                  " UT:  ST SU 35      " U 5      $ )Nz	requires z GPUs, found )_required_world_sizers   r)   rt   unittest
skipUnless)rx   	availabler   s     r:   r   &requires_world_size.<locals>.decorator   sN    $%!JJ++-	""Nis-	{C

 	r9   r0   )r   r   s   ` r:   requires_world_sizer      s     r9   objdefaultc                      [        U S5      (       a*  [        U R                  5      (       a  U R                  5       OU R                  n[	        X5      nUR
                  n[        U5      $ ! [         a    Us $ f = f)z
Returns the requested world size for the currently running unittest method on `obj`
if annotated via `@require_world_size(n)`, else returns `default`.
_current_test_name)hasattrcallabler   _testMethodNamer   r   r5   	Exception)r   r   	test_namefnvalues        r:   get_required_world_sizer     sx    
 s011hs?U?U6V6V ""$$$ 	
 S$''5z s   A(A+ +A:9A:c                    ^ ^ U U4S jnU$ )Nc                 6   >^  [        T 5      UU U4S j5       nU$ )Nc                  J  > TS:w  a  T" U 0 UD6$ [         R                  R                  5       (       a*  [         R                  R                  5       T:  a  T" U 0 UD6$ [        ST 3   n[        XR                  5      (       d!  [        R                  " UR                  5        g g r   )
rs   r)   r   rt   rp   r   r/   rn   ro   r.   )ru   rv   r   r   rx   r   s      r:   ry   9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper  s    & T,V,,zz&&((UZZ-D-D-F!-KT,V,,"Zs#34I1$8I8IJJ,,- Kr9   r{   )rx   ry   r   r   s   ` r:   r   (nccl_skip_if_lt_x_gpu.<locals>.decorator  s     	t	. 
	. r9   r0   )r   r   r   s   `` r:   nccl_skip_if_lt_x_gpur     s     r9   c                     U R                  5       nSU;   d   eSU;   d   eSU;   d   eUS   nUR                  S5      S:X  a  UOUR                  S5      S   nXC;   d   SU SU 35       eg )	N	iteration	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )_get_ddp_logging_datafindsplit)	model_DDP
err_substrddp_logging_datalogging_erractuals        r:   verify_ddp_error_loggedr   ,  s     668********&&&&"7+K ??56"< 	89!< 
   
 (D[MR r9   c                 0   ^  [        T 5      U 4S j5       nU$ )a6  
Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
c                    >  [         R                  S   n[         R                  S	  [         R                  S   nS[         R                  S'    T" U 0 UD6nUUb  U[         R                  S'   Ub  U[         R                  S'   $ $ ! [         a    S n Nmf = f! [         a    S n Nkf = f! S[         R                  S'   f = f! Ub  U[         R                  S'   Ub  U[         R                  S'   f f = f)NTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAIT1)rq   rr   KeyError)ru   rv    cached_nccl_async_error_handlingcached_nccl_blocking_waitretrx   s        r:   ry   (with_nccl_blocking_wait.<locals>.wrapperF  s   	4AC1B, 

<=	9:<***;% 69BJJ12	S''C 0;4 

<= )49R

56 51  	4/3,	4  	-(,%	- 69BJJ12 0;4 

<= )49R

56 5s@   $B B 	C BBB'$B* &B''B* *B?/C1r{   r|   s   ` r:   with_nccl_blocking_waitr   >  s%     4[ S  SD Nr9   c                    ^  U 4S jnU$ )zC
Runs a test for each distributed debug level specified in levels.
c                 4   >^  [        T 5      U U4S j5       nU$ )Nc                    > [         R                  R                  SS 5      nT H`  nU[         R                  S'   [        R                  " 5         T" U 0 UD6n[        R
                  " 5         Uc  MM  U[         R                  S'   Mb     W$ )NTORCH_DISTRIBUTED_DEBUG)rq   rr   getc10dset_debug_level_from_envbarrier)ru   rv   	old_levellevelr   rx   levelss        r:   ry   :with_dist_debug_levels.<locals>.decorator.<locals>.wrapperr  sq    

'@$GI8=

45--/D+F+(<EBJJ89   Jr9   r{   )rx   ry   r   s   ` r:   r   )with_dist_debug_levels.<locals>.decoratorq  s     	t	 
	 r9   r0   )r   r   s   ` r:   with_dist_debug_levelsr   l  s    
$ r9   c                  J    [        [        R                  " 5       (       + S5      $ )Nz+c10d was not compiled with the Gloo backend)r   r   is_gloo_availabler0   r9   r:   requires_gloor     !    )""$$5 r9   c           	      0   [         (       d  S $ [        R                  " 5       (       d  [        S5      $ [	        [
        R                  R                  R                  5       U :  SU  S[
        R                  R                  R                  5        SU 35      $ )Nc                     U $ rf   r0   )fs    r:   <lambda>'requires_nccl_version.<locals>.<lambda>  s    r9   +c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )	r   r   is_nccl_availabler   r   rs   r)   r&   version)r   r   s     r:   requires_nccl_versionr    s    9!!##*9
 	
 .JJOO##%/>wiyQVQ[Q[Q`Q`QhQhQjPkkuvyuz{
 	
r9   c                      [        SS5      $ )zC
Require NCCL shrink support (NCCL available and version >= 2.27).
)r      z Need NCCL 2.27+ for shrink_group)r  r0   r9   r:   requires_nccl_shrinkr    s     !*LMMr9   c                  J    [        [        R                  " 5       (       + S5      $ )Nr   )r   r   r   r0   r9   r:   requires_ncclr    r   r9   c                  J    [        [        R                  " 5       (       + S5      $ )Nz*c10d was not compiled with the UCC backend)r   r   is_ucc_availabler0   r9   r:   requires_uccr	    !    )!!##4 r9   c                  J    [        [        R                  " 5       (       + S5      $ )Nz*c10d was not compiled with the MPI backend)r   r   is_mpi_availabler0   r9   r:   requires_mpir    r
  r9   c                 `    U c  [         n [        S U  5       5      n[        U(       + SU  35      $ )a  
Decorator to skip tests if no accelerator communication backend (NCCL, XCCL, HCCL) is available.

Args:
    backends (Optional[List[str]]): Specific accelerator backends to check (e.g., ["nccl", "xccl", "hccl"]).
                                   If None, checks all supported accelerator backends (NCCL, XCCL, HCCL).

Returns:
    callable: A decorator that skips the test if no specified accelerator backend is available.
c              3      #    U  H>  n[         R                  [         R                  S  S.R                  US 5      " 5       v   M@     g7f)c                      [         $ rf   )r   r0   r9   r:   r   =requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    Hr9   r%   c                      gNFr0   r0   r9   r:   r   r    s    ur9   N)r   r   is_xccl_availabler   ).0r   s     r:   	<genexpr>4requires_accelerator_dist_backend.<locals>.<genexpr>  sO        G	 ****$	
 #g}
%		& 	( 	(
  s   AAz5No accelerator communication backend available among )ACCELERATOR_DIST_BACKENDSanyr   )backendsbackend_availables     r:   !requires_accelerator_dist_backendr    sH     ,     *
?zJ r9   c                      [         R                  R                  5       =(       a%    [        R                  " [
        R                  S5      n [        U (       + S5      $ )Nr   z"multicast support is not available)rs   r)   r   r   has_multicast_supportr   CUDAr   )r  s    r:   requires_multicast_supportr     sI    

! 	G22:??AF  *!!, r9   c                      [         (       aI  [        (       a=  SS/n U  H2  nU[        R                  R	                  S5      R
                  ;   d  M2    g   ggg)Ngfx942gfx950r   TF)r   r   rs   r)   get_device_propertiesgcnArchName)	arch_listarchs     r:   #evaluate_platform_supports_symm_memr(    sK    y>!8,I!5::;;A>JJJ " r9   c                      [        5       $ rf   )r(  r0   r9   r:   r   r     s    /1r9   PLATFORM_SUPPORTS_SYMM_MEMc                 f    [         R                  " [        [        S   R                  5      " U 5      $ )z&Skips a test for ROCm multiprocess UTsrL   )r   skipIfr   rp   r/   )rx   s    r:   skip_if_rocm_multiprocessr-    s#    ??>:l+C+K+KLTRRr9   r'  .c                    ^  U 4S jnU$ )z4Skips a test for given ROCm archs - multiprocess UTsc                    > S n[         (       aF  [        R                  R                  S5      R                  R                  S5      S   nUT;   a  ST 3n[        R                  " US LU5      " U 5      $ )Nr   :z0skip_if_rocm_arch_multiprocess: test skipped on )r   rs   r)   r$  r%  r   r   r,  )rx   reasonpropr'  s      r:   r   1skip_if_rocm_arch_multiprocess.<locals>.decorator  se    >::33A6BBHHMaPDt|KD6RvT16:4@@r9   r0   )r'  r   s   ` r:   skip_if_rocm_arch_multiprocessr4    s    A r9   c                    ^  U 4S jnU$ )z:Skips a test for ROCm based on ROCm ver - multiprocess UTsc                 F  > S n[         (       au  [        [        R                  R                  5      nUR                  SSS9S   n[        S UR                  S5       5       5      nUb  Tb  U[        T5      :  a	  SU ST S	3n[        R                  " US LU5      " U 5      $ )
N-r   maxsplitr   c              3   8   #    U  H  n[        U5      v   M     g 7frf   )r5   )r  r   s     r:   r  Lskip_if_rocm_ver_lessthan_multiprocess.<locals>.decorator.<locals>.<genexpr>  s     &O7N!s1vv7Ns   .z-skip_if_rocm_ver_lessthan_multiprocess: ROCm z is available but z	 required)	r   r7   rs   r   hipr   tupler   r,  )rx   r1  rocm_versionrocm_version_tupler   s       r:   r   9skip_if_rocm_ver_lessthan_multiprocess.<locals>.decorator	  s    >u}}001L'--cA->qAL!&&O|7I7I#7N&O!O"*?%g6HI[H\\novnw  xA  BvT16:4@@r9   r0   )r   r   s   ` r:   &skip_if_rocm_ver_lessthan_multiprocessrB    s    A r9   c                  <    [        [        R                  S:H  S5      $ )Nwin32z8This unit test case is not supported on Windows platform)r   rn   platformr0   r9   r:   skip_if_win32rF    s    )B r9   rh   majorminorc                     U R                   S:w  a  g[        R                  R                  b  g[        R                  R                  U 5      X4:  $ )z
Returns True if the device's compute capability is (major, minor) or higher.
Error out if the device is not a CUDA device.
Returns False if device is a RoCM device.
Returns True if device is a non-CUDA device.
r)   TF)typers   r   r=  r)   get_device_capability)rh   rG  rH  s      r:   sm_is_or_higher_thanrL  "  sB     {{f}}$::++F3~EEr9   	localhostr   T   )minutesFc           	          [        5       nU(       a@  [        U[        SS9-  5      n[        R                  R
                  R                  XXU5      $ [        R                  " U UUUUUS9$ )zD
Creates a TCP store. Retries if the chosen port is already in use.
r   )milliseconds)wait_for_workers	use_libuv)r   r5   r   rs   classes	dist_c10dTCPStorer   )	addrrw   	is_mastertimeoutrR  	jit_classrS  porttimeout_milliseconds	            r:   create_tcp_storer]  3  sn     D!'I1,E"EF}}&&//
/B
 	
 }}-
 	
r9   i  !DISTRIBUTED_TESTS_DEFAULT_TIMEOUT300test_ddp_uneven_inputsi     test_join_kwargs	lazy_initc                     [         R                  S:X  d  U c  [        R                  R	                  SUS9$ [        R                  R	                  XS9$ )NrD  z	127.0.0.1)hostnamerc  	interfacerc  )rn   rE  r   ProcessGroupGloocreate_devicerf  s     r:   ri  ri  ^  sY    
||w)"3$$22 I 3 
 	
 $$22 3 
 	
r9   c                 Z    [         R                  U R                  S5      S   [        5      $ Nr<  r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)test_ids    r:   get_timeoutro  i  s#    c 22 6HHr9   c               #   D  #    [        5       [        5       p[        R                  [        R                  p2 Xs[        l        [        l        [        R                  [        R                  4v   X#s[        l        [        l        g ! X#s[        l        [        l        f = f7frf   )r   rn   stdoutstderr)new_outnew_errold_outold_errs       r:   captured_outputrw  m  sa     z8:Wzz3::W2!(
CJjj#**$$!(
CJ
CJs   3B 8B .B BB rankrw   
num_inputsc                    SS[         S[         S[         S[         4S jjnS[         4S jn[        USS9[        US	S9[        US
S9[        USS9[        US	S9[        US
S94 VVs/ s HQ  n[        U5       Vs/ s H  nU" X -  U-   X!-  5      PM     sn[        U5       Vs/ s H  od" XRU-  5      PM     sn4PMS     snn$ s  snf s  snf s  snnf )z
Generate a number of basic test cases for sparse reduction.
These cover tensors with a varying number of sparse dimensions and a varying
number of dense dimensions. The only reduction operation we support is sum.
r   rx  rw   sparse_dims
dense_dimsc           	         [         R                  " [         R                  " U S-   5      SU S-   45      nU/[        U5       Vs/ s H  nSPM     sn-   n[        US-
  5       HD  n[         R                  " U[         R
                  " SU S-   5      45      nUR                  U5        MF     [         R                  " U S-   /[        U5       Vs/ s H  nSPM     sn-   5      n[         R                  " XGU5      $ s  snf s  snf )Nr   r   )	rs   reshapearangerangecatzerosappendonessparse_coo_tensor)rx  rw   r{  r|  indices_shapevaluess           r:   generate,simple_sparse_reduce_tests.<locals>.generate  s     --TAX 6D1HF5+<=+<a+<=={Q'Aii%++a*B CDGLL$ ( TAXJU:5F)G5F!5F)GGH&&w>>  > *Hs   C8C=
c           
      ~    [        [        R                  [        U5       Vs/ s H
  o " X!5      PM     sn5      $ s  snf rf   )r	   operatoraddr  )r   rw   rx  s      r:   compute_sum/simple_sparse_reduce_tests.<locals>.compute_sum  s6    LLE*<MN<MD2d/<MN
 	
Ns   :
)r{  r      )r|  )r   r   )r5   r   r  )rx  rw   ry  r  r  r   is          r:   simple_sparse_reduce_testsr  x  s    
?s 
? 
?# 
?s 
?
C 
 H!,H!,H!,H+H+H+

B	 z**A :$q(**AB* @EZ?PQ?P![*45?PQ	

  Rs$   &C9CC"C6C
Cr   c           
      |   [         R                  R                  5       n[        (       a  [         R                  R                  5       n[
        (       a  [         R                  R                  5       n[        U5      nSnX:  a  X -  n[        U 5       Vs0 s H  nU[        X5U-  US-   U-   5      _M     nnU$ s  snf )zMultigpu tests are designed to simulate the multi nodes with multi
GPUs on each node. Nccl backend requires equal #GPUs in each process.
On a single node, all visible GPUs are evenly
divided to subsets, each process only uses a subset.
r   )	rs   r)   rt   r   ra   r    r*   r  list)rw   r   nGPUsvisible_devicesnGPUs_per_processr  rank_to_GPUs          r:   init_multigpu_helperr    s     JJ##%Ex		&&(x		&&(ElO !/ z""A 	
4$5 5QBS8STUU"   	s   !B9tmp_dirinit_methodc                    [         R                  " 5       q[        R                  [        R
                  S'   [        R                  " [        R                  R                  [        R                  S5      5        [        R                  " [        R                  R                  [        R                  S5      5        [        R                  R                  [        R                  S5      n[        R                  " U5        U b  U [        R
                  S'   g [        [        R                  R                  US5      -   [        R
                  S'   g )NTEMP_DIRr   test_dirinit_dirINIT_METHODshared_init_file)
tempfileTemporaryDirectoryr  namerq   rr   mkdirpathjoinr   )r  init_dir_paths     r:   initialize_temp_directoriesr    s    ))+G$\\BJJzHHRWW\\',,	23HHRWW\\',,
34GGLLz:MHH]$/

=!$/"'',,-3
 %


=!r9   c                  <    [         b  [         R                  5         g g rf   )r  cleanupr0   r9   r:   cleanup_temp_dirr    s     r9      c            	       f  ^  \ rS rSrSrSrS\4S jr\S\4S j5       r	\S\
4S j5       rS r SS	\S
\SS4U 4S jjjrS U 4S jjrS U 4S jjrS\4S jrS S jrS S jr " S S\5      r\S\
4S j5       r\S\
S\S\SS4S j5       rS\SS4S jrS S jrS S jrS S jr\S\4S j5       rSrU =r $ )!MultiProcessTestCasei  r   
   r   c                     gr  r0   selfs    r:   _should_stop_test_suite,MultiProcessTestCase._should_stop_test_suite  s    r9   c                     g)NTr0   r  s    r:   destroy_pg_upon_exit)MultiProcessTestCase.destroy_pg_upon_exit  s    r9   c                     [         $ rf   DEFAULT_WORLD_SIZEr  s    r:   rw   MultiProcessTestCase.world_size      !!r9   c                 X   ^ [        T5      U4S j5       n[        R                  " X 5      $ )Nc                 l   > U R                   U R                  :X  a  U R                  T5        g T" 5         g rf   )rx  MAIN_PROCESS_RANK_join_processesr  r   s    r:   ry   1MultiProcessTestCase.join_or_run.<locals>.wrapper  s(    yyD222$$R(r9   r
   types
MethodTyper  r   ry   s    ` r:   join_or_run MultiProcessTestCase.join_or_run  ,    	r	 
	 ..r9   method_name
methodNameNc                    > US:w  a  Un[         TU ]  U5         [        X5      n[        XU R	                  U5      5        g ! [
         a,  nUS:w  a  [        SU R                   SU 35      Ue S nAg S nAff = fNrunTestzno such test method in : super__init__r   setattrr  AttributeError
ValueError	__class__r  r  r  r   er  s        r:   r  MultiProcessTestCase.__init__      
 "$K%		+BDt'7'7';< 	Y& !-dnn-=R
|L '	   &A 
A7"A22A7c                    > [         TU ]  5         0 U l        / U l        / U l        U R
                  U l        [        R                  " SS9 nUR                  U l
        S S S 5        0 U l        g ! , (       d  f       N= f)NFdelete)r  setUpspecial_return_code_checksskip_return_code_checks	processesr  rx  r  NamedTemporaryFiler  	file_namepid_to_pipe)r  r   r  s     r:   r  MultiProcessTestCase.setUp  sg     13' .0$**	((6!VVDN 7  76s   
A,,
A:c                 v   > [         TU ]  5         U R                   H  nUR                  5         M     / U l        g rf   )r  tearDownr  	terminate)r  pr  s     r:   r  MultiProcessTestCase.tearDown1  s.    AKKM   r9   c                 F    U R                  5       R                  S5      S   $ rk  idr   r  s    r:   r   'MultiProcessTestCase._current_test_name;  s    wwys#B''r9   c                    / U l         [        [        U R                  5      5       H  n[        R
                  R                  5       u  p4U" U R                  R                  S[        U5      -   UU R                  5       U R                  U4S[        U SS5      0S9nUR                  5         [        R                  SX%R                   5        X0R"                  UR                   '   U R                   R%                  U5        M     g )Nprocess fake_pgF)targetr  ru   rv   Started process %s with pid %s)r  r  r5   rw   rs   multiprocessingPiper  _runr7   r   r  r   startloggerinfopidr  r  )r  procrx  parent_conn
child_connprocesss         r:   _start_processes%MultiProcessTestCase._start_processes?  s    #doo./D&+&;&;&@&@&B#K~~**#d)+++-NN	 wtY>G MMOKK8$L,7W[[)NN!!'*% 0r9   c                      [         R                  R                  S5        [         R                  R	                  S5      R
                  nU R                  U5        g ! [         a     NGf = f)Nspawn)rs   r  set_start_methodRuntimeErrorget_contextProcessr  )r  r  s     r:   _spawn_processes%MultiProcessTestCase._spawn_processesU  s[    	!!227; $$009AAd#	  		s   A 
A)(A)c                       \ rS rSrSrSrg)MultiProcessTestCase.Eventi^  r   r0   N)r1   r2   r3   r4   GET_TRACEBACKr8   r0   r9   r:   Eventr  ^  s    r9   r  rx  c                    [         R                  SU5         [        R                  R	                  X/5      nX;   a  U R
                  (       a  [         R                  SU5        g U R                  5       n[         R                  SXB5        U[        R                  R                  :X  a  [        R                  " SS9 n[        R                  " U5        UR                  5         UR!                  S5        U R#                  UR%                  5       5        [         R                  SU5        S S S 5        X;   a  g GM#  ! , (       d  f       N= f)Nz*Starting event listener thread for rank %sz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)r  debugr  
connectionwaitclosedrecvr  r  r  r  r  r  faulthandlerdump_tracebackflushseeksendread)parent_pipesignal_piperx  ready_pipeseventtmp_files         r:   _event_listener$MultiProcessTestCase._event_listenera  s    A4H)4499;:TUK)%%LLT #((*=uK066DDD!44$?8$33H= ( a(#((9$?F @ )5   @?s   =A-D;;
E	r   r  c                 N    U " U5      nXl         X6l        UR                  X$5        g rf   )rx  r  run_testclsrx  r   r  r  rv   r  s          r:   r  MultiProcessTestCase._run  s#     9~	"i-r9   c           	      @   [         R                  R                  SS9u  p4[        R                  " [
        R                  X#U R                  4SS9nUR                  5         [        R                  S:w  a3  [        R                  S:w  a  [         R                  R                  S5        S[        R                  S'   [        R                   " 5          [#        X5      " 5         Ub  UR?                  S 5        Uc   eURA                  5         URC                  5         U RD                  (       a   [F        RH                  " 5         g g ! [$        R&                   a\  n[(        R+                  S	U R                  U[-        U5      5        [        R.                  " [0        S
   R2                  5         S nANS nAf[4         a    [(        R7                  S[8        R:                  " 5       U R                  [
        R<                  5        UR?                  [8        R:                  " 5       5        [        R.                  " [
        R<                  5         GNhf = f! Ub  UR?                  S 5        Uc   eURA                  5         URC                  5         f = f! [J        [L        4 a     g f = f)NF)duplexT)r  ru   daemonrD  darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srP   z;Caught exception: 
%s exiting process %s with exit code: %s)'rs   r  r  	threadingThreadr  r"  rx  r  rn   rE  _C'_set_print_stack_traces_on_fatal_signalrq   rr   r   set_rng_seedr   r   SkipTestr  r  r7   ro   rp   r.   r   r   	traceback
format_excTEST_ERROR_EXIT_CODEr  r  closer  r   destroy_process_groupAssertionErrorr  )r  r   r  signal_recv_pipesignal_send_pipeevent_listener_threadses          r:   r%  MultiProcessTestCase.run_test  s   -2-B-B-G-Gu-G-U* ) 0 0'77;!

 	##%<<7"s||x'? HH<<TB36

/0!!#	 D$&(  + %%d+(444!&&($$ **,	 %7    	6KKF		B	 HHZ	*4455 		@LLN$$&		$99	 Y1134HH)>>?		@  + %%d+(444!&&( #J/ sJ   E +J
 I	AF-(I -BI	I I		I ;J
JJc                    / n[        U R                  5       Hi  u  p#UR                  b  M  U R                  UR                     n UR                  [        R                  R                  5        UR                  X$45        Mk     U H  u  pT UR                  S5      (       aQ  UR                  (       a  [        R                  SU5        ME  UR!                  5       n[        R#                  SXV5        Mm  [        R#                  SU5        M     g ! [         a    [        R                  SU5         GM  f = f! [         a    [        R                  SU5         M  f = f)Nz>Encountered error while trying to get traceback for process %srN  z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumerater  exitcoder  r  r  r  r  r  r  ConnectionErrorr  	exceptionpollr  r  r  r   )r  pipesr  r  piperx  r4  s          r:   _get_timedout_process_traceback4MultiProcessTestCase._get_timedout_process_traceback  s$   #DNN3JA'''4II288FFGLL!+ 4  JD99Q<<{{S  ! $		ILLEt LLPRV!   ' $$X 4 #   Ts0   ;D=D7&D74D7 D43D47 EEc                    [        U R                  5       5      n[        R                  " 5       nSn  [        U R                  5       Hz  u  pVUR
                  [        R                  :X  d  M%  [        SU SUR
                   S35        [        R                  R                  5       nU H  nUR                  5         M     Sn  O   U(       a  O[        S U R                   5       5      (       a  Oy[        R                  " 5       U-
  n	X:  aC  U R                  5         [        SU S35        U R                   H  nUR                  5         M     O[        R                  " S	5        GM6  [        R                  " 5       U-
  n
U R!                  X5        U R"                  R%                  5        H  nUR'                  5         M     g ! U R"                  R%                  5        H  nUR'                  5         M     f = f)
NFTProcess z terminated with exit code z", terminating remaining processes.c              3   <   #    U  H  oR                   S Lv   M     g 7frf   )rA  )r  r  s     r:   r  7MultiProcessTestCase._join_processes.<locals>.<genexpr>  s     F~!zz-~s   zTiming out after z" seconds and killing subprocesses.g?)ro  r  timer@  r  rA  r  r6  printrs   r  active_childrenr  allrG  sleep_check_return_codesr  r  r7  )r  r   rY  
start_timesubprocess_errorr  r  rO  acelapsedelapsed_timerF  s               r:   r  $MultiProcessTestCase._join_processes  s   dggi(YY[
 &	%dnn5DA zz%9%N%NN&qc)DQZZLPrs +0*?*?*O*O*Q"1BLLN #2+/( 6 $Ft~~FFF))+
2$88:+G94VW "^^ ,

3= @  99;3L$$R6 ((//1

 2((//1

 2s   7G -D%G 3G7c           
      B   U R                   (       d  [        R                  S5        gU R                   S   n[        U R                   5       VVs/ s H(  u  pEUR                  [
        R                  :X  d  M%  XE4PM*     nnnU(       a\  SnU HI  u  pHU R                  UR                     R                  5       n	USU S[
        R                   SU	 S3-  nMK     [        U5      e[        U R                   5       H$  u  pEUR                  b  M  [        SU S	U S
35      e   XR                  ;   a  g[        R                  5        Hy  n
UR                  U
R                  :X  d  M  [        (       a1  [        R!                  SU R#                  5       U
R$                  5          g[&        R(                  " U
R$                  5      e   SnXR*                  ;   a  U R*                  U   nU R-                  UR                  USU SUR                   SUR                   3S9  gs  snnf )z
Checks that the return codes of all spawned processes match, and skips
tests if they returned a return code indicating a skipping condition.
z<Note: no subprocesses were spawned, test was likely skipped.Nr    rJ  z exited with error code z and exception:

 terminated or timed out after  seconds6Skipping %s on sandcastle for the following reason: %szExpected exit code z	 but got z
 for pid: )r   )r  r  warningr@  rA  r  r6  r  r  r  r  r  rp   r  r.   r   r  r  r/   r   r3  r  assertEqual)r  r   rW  first_processr  r  errored_processesr   r  error_messageskipexpected_return_codes               r:   rR  (MultiProcessTestCase._check_return_codes  s    ~~NNN q) "$..1
1zz1FFF QF1 	 

 E/
 $ 0 0 = B B Dqc!9:N:c:c9d e''4oR9 0 u%% dnn-DAzz!"qc!@hW  . ---%%'D%%7 =
 KKP	
 "++DLL99 ("  ! 000#'#B#B2#F "" %&:%;9]E[E[D\\fgtgxgxfyz 	 	
g
s   $H7Hc                      U R                   S:H  $ )Nr   rx  r  s    r:   rX  MultiProcessTestCase.is_mastera  s    yyA~r9   )r  r  r  rx  r  r  r  r  r   N)!r1   r2   r3   r4   r  r6  boolr  propertyr  r5   rw   r  r7   r  r  r  r   r  r
  r   r  staticmethodr"  classmethodr  r%  rG  r  rR  rX  r8   __classcell__r  s   @r:   r  r    s=   
   d   "C " "/ ?H8;	 &$(C (+,$    < ..#&.36.	. .6# 6t 6p&P*XJ
X 4  r9   r  c                   L   ^  \ rS rSrU 4S jrS rS\4S jrS	S jrS r	Sr
U =r$ )
DistributedTestBaseik  c                    > [         TU ]  5         [        U R                  5      [        R
                  S'   U R                  5         g )Nrl   )r  r  r7   rw   rq   rr   r
  r  r  s    r:   r  DistributedTestBase.setUpl  s/    #&t#7

< r9   c                      [         R                  R                  5          [        R
                  " U R                  5        g ! [         a     N.f = f! [         a     g f = frf   )rs   distributedr8  r9  rq   remover  OSErrorr  s    r:   r  DistributedTestBase.tearDownq  sU    	335	IIdnn%  		  		s"   A  A 
AA
AAr   c                 .    SU;   a  gSU;   a  gSU;   a  gg)Nr)   r&   ra   r(   r*   r'   r\   r0   )r  rh   s     r:   r   DistributedTestBase.backend{  s$    Vf_f_r9   c                 $   Uc  U R                   n[        R                  " U5      R                  5       n[        R                  R                  U R                  U5      n[        R                  R                  U R                  U5      UU R                  US9  SU R                  U5      ;   d  SU R                  U5      ;   a)  [        R                  R                  U R                  5        [        R                  R                  R                  5       $ )Nr   rw   rx  storer&   r'   )rw   rs   get_device_modulert   rx  	FileStorer  init_process_groupr   rx  acceleratorset_device_indexdistributed_c10d_get_default_group)r  rh   rw   num_visible_devicesr  s        r:   	create_pgDistributedTestBase.create_pg  s    J#55f=JJL!!++DNN<OP,,LL(!	 	- 	
 T\\&))Vt||F7K-K..tyy9  11DDFFr9   c                     [         R                  " U5      R                  5       n[        U R                  5       Vs0 s H  o3X2-  /_M
     sn$ s  snf rf   )rs   r  rt   r  rw   )r  rh   r  r  s       r:   rank_to_device"DistributedTestBase.rank_to_device  sH    #55f=JJL6;DOO6LM6LA+,,6LMMMs   Ar0   rf   )r1   r2   r3   r4   r  r  r7   r   r  r  r8   rp  rq  s   @r:   rs  rs  k  s+     
 GN Nr9   rs  subtest_configtest_fntest_kwargsc           	         [        UR                  5       5      nU Vs/ s H  ofS   PM	     nnU Vs/ s H  ofS   PM	     nn[        R                  " U6  H  n	[	        [        XySS95      n
U R                  " S0 U
D6   [        R                  R                  5         U" U0 UDU
D6  [        R                  R                  5         SSS5        [        R                  " 5         M     gs  snf s  snf ! , (       d  f       N2= f)a0  
Runs a test function given by ``test_fn`` as a subtest according to the
configurations specified by ``subtest_config``. This amortizes the
costly setup overhead (including process spawn and initializing the
process group) over the subtests.

Args:
    subtest_config (Dict[str, List[Any]]): A mapping from subtest
        keyword argument name to a list of its possible values.
    test_fn (Callable): A callable that runs the actual test.
    test_args: Positional arguments to pass to ``test_fn``.
    test_kwargs: Keyword arguments to pass to ``test_fn``.
r   r   T)strictNr0   )r  items	itertoolsproductdictzipsubTestrs   _dynamoresetr   r   )cls_instr  r  	test_argsr  subtest_config_itemsitemsubtest_config_keyssubtest_config_valuesr  subtest_kwargss              r:   run_subtestsr    s    * 9=^=Q=Q=S8T:N%O:N$1g:N%OBV-WBV$1gBV-W##%:;c"5dKL//MM!Y@+@@MM! 0 	 < &P-W 0/s   C&C+=AC00
C>	c                  l     [         R                  " / SQSS9R                  S:H  $ ! [         a     gf = f)z
If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
Libfabric EFA interfaces and EFA software components installed,
see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
)fi_infoz-pefaz-t	FI_EP_RDMF)checkr   )
subprocessrun
returncodeFileNotFoundErrorr0   r9   r:   has_efar    sA    NN;5j	
  s   #& 
33c                  ,    [        5       (       a  SS/$ S$ )aw  
If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
uses InfiniBand transport, so we exclude it from tensorpipe transports,
see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
shmuvN)r  r0   r9   r:   tp_transportsr    s     $IIE4=/4/r9   c                 b   ^ ^^ T c  [        [        UTS9$ S m[        T 5      UU U4S j5       nU$ )z#
Wrapper to use with a test method
)rY  rw   c                    ^ ^^^ [        5       m[        R                  " 5       nU4S jmUUU 4S jn/ n[        T 5       H;  n[        R
                  " X5TU4S9nUR                  5         UR                  U5        M=     U$ )Nc                  >   > T [         R                  R                  :H  $ rf   r   r  _worldworlds   r:   world_is_validaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_valid      D118888r9   c                   > [         R                  " SU TUS9   T" 5         T" 5       (       a  [         R                  " 5         g g ! [         aT  n[        R                  R                  U [        R                  " 5       45        [        R                  " U5         S nAN|S nAff = f! T" 5       (       a  [         R                  " 5         f f = f)Nthreadedr   rx  rw   r  )r   r  BaseExceptionMultiThreadedTestCaseexception_queueputrn   exc_infor$   exception_handler8  )rx  world_pgr  excallbackr  rw   s       r:   workerYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.worker  s    ##"*E
1
 "##..0 $ ! %55994:PQ!22  "##..0 $s*   A 
B"A
BB% B""B% %$C	r  ru   )r"   r   	HashStorer  r.  r/  r  r  )	rw   r  global_storer  threadsrx  tr  r  s	   ``     @@r:   #_run_test_method_with_multi_threadsIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads  sj    $&~~'	9	1  *%D  E<5PQAGGINN1 &
 r9   c                 Z  >^ ^^ [         R                  R                  R                  S5         T" TUUUU 4S j5      n[        R                  UT5        [         R                  R                  R                  S5        g ! [         R                  R                  R                  S5        f = f)NTc                     > T" T/T Q70 TD6$ rf   r0   )ru   rx   rv   r  s   r:   r   ?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>	  s    D$?$?$?r9   F)rs   r0  _distributed_c10d_set_thread_isolation_moder  _join_threads)r  ru   rv   r  r  rx   rw   s   ``` r:   ry   -spawn_threads_and_init_comms.<locals>.wrapper  sv     	""==dC	I9?G "//>HH&&AA%HEHH&&AA%Hs   &A? ?+B*)r   spawn_threads_and_init_commsr
   )rx   rY  rw   ry   r  s   ` ` @r:   r  r    sD     |('j
 	
> 4[
I 
I Nr9   c                   &  ^  \ rS rSrSr\R                  " 5       rSrS r	 SS\
S\
SS4U 4S	 jjjrS
 rS rSU 4S jjrU 4S jrS r\S 5       rS r\S 5       r\S 5       r\S\4S j5       r\S\
4S j5       rSSS.S jjrSSS.S jjrSrU =r$ )r  i  a  
Test runner that runs all tests with the in-proc process group using
multiple threads with the threaded process group.

Each test spawns world_size threads and run the test method in each thread.

Difference from regular MultiProcess test runner:
Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
    to set up / tear down each thread when running each test.
No global state possible
    How bad of a limitation is this?
r   c                 X   ^ [        T5      U4S j5       n[        R                  " X 5      $ )Nc                    > U R                   U R                  :X  a  U R                  U R                  T5        g T" 5         g rf   )rx  MAIN_THREAD_RANKr  r  r  s    r:   ry   2MultiThreadedTestCase.join_or_run.<locals>.wrapper'  s.    yyD111""4<<4r9   r  r  s    ` r:   r  !MultiThreadedTestCase.join_or_run&  r  r9   r  r  r   Nc                    > US:w  a  Un[         TU ]  U5         [        X5      n[        XU R	                  U5      5        g ! [
         a,  nUS:w  a  [        SU R                   SU 35      Ue S nAg S nAff = fr  r  r  s        r:   r  MultiThreadedTestCase.__init__0  r  r  c                     g rf   r0   r  s    r:   perThreadSetUp$MultiThreadedTestCase.perThreadSetUpC  s    r9   c                     g rf   r0   r  s    r:   perThreadTearDown'MultiThreadedTestCase.perThreadTearDownG  s    r9   c                 x   > [         TU ]  5         U R                  U l        / U l        S[
        R                  S'   g)zy
setUp only set up things in the main thread, if you want to configure things
in the spawned threads, use perThreadSetUp
r   r-  N)r  r  r  rx  r  rq   rr   ru  s    r:   r  MultiThreadedTestCase.setUpJ  s1    
 	))	36

/0r9   c                 0   > [         TU ]  5         / U l        g)z
tearDown only set up things in the main thread, if you want to configure things
in the spawned threads, use perThreadTearDown
N)r  r  r  ru  s    r:   r  MultiThreadedTestCase.tearDownU  s    
 	r9   c                   ^ [         R                  R                  R                  S5        U R                  n[        5       m[        R                  " 5       U R                  l	        U4S jnU" 5       (       d  [        S5      e[        U R                  5       Hc  n[        R                  " U R                  R                  XU R                  4S9nUR!                  5         U R"                  R%                  U5        Me     g)z[
class method to spawn threads and run test, use this method in the SetUp of your TestCase
Tc                  >   > T [         R                  R                  :H  $ rf   r  r  s   r:   r  <MultiThreadedTestCase._spawn_threads.<locals>.world_is_validg  r  r9   zInvalid worldr  N)rs   r0  r  r  r   r"   r   r  r  r  r  r  rw   r.  r/  r  r  r  r  )r  r   r  rx  r  r  s        @r:   _spawn_threads$MultiThreadedTestCase._spawn_threads]  s     	""==dC++	$&&*nn&6#	9 //$//*D  ~~**)4??1SA GGILL" +r9   c                    U " U5      nX%l         [        US5      (       aX  [        R                  " 5       Ul        [
        R                  UR                  l        [
        R                  UR                  l	        UR                  XU5        g )N_tls)rx  r   r.  localr  r!   
_precision	precision_rel_tolrel_tolrun_test_with_threaded_pg)r'  r   rx  rw   rv   r  s         r:   r  MultiThreadedTestCase._runt  sa    9~	 4  !)DI"*"5"5DII ( 1 1DII&&y
Cr9   c                    [         R                  " SUUU R                  R                  S9  U R	                  5          [        X5      " 5         [         R                  " 5         U R                  5         g! [         aP  nU R                  R                  U[        R                  " 5       45        [        R                  " U5         SnAN{SnAff = f! [         R                  " 5         U R                  5         f = f)zT
Run the current test associated with `test_name` using the threaded process group.
r  r  N)r   r  r  r  r  r   r  r  r  rn   r  r$   r  r8  r  )r  r   rx  rw   r  s        r:   r  /MultiThreadedTestCase.run_test_with_threaded_pg  s     	!..--		
 			%D$& &&(""$  	  $$dCLLN%;<.. 	 &&(""$s*   A3 3
C=ACC CC 'C7c           
         [         n [        U5       Hl  u  pEUR                  [        SU5      5        UR	                  5       (       d  M7  [
        R                  R                  U[        [        SU S35      S 445        Mn     [        R                  " 5         / nU R                  R                  5       (       dL  U R                  R                  5       nUR                  U5        U R                  R                  5       (       d  ML  [        5         [        R                   R"                  R%                  S5        U R'                  XcU5        g ! [        5         [        R                   R"                  R%                  S5        f = f)Nr   zRank failed to join in under r]  F)rm  r@  r  maxis_aliver  r  r  TimeoutErrorr$   r  emptyr   r  r#   rs   r0  r  r  rR  )r'  r  r   rY  idxthreadfailed_ranksfailures           r:   r  #MultiThreadedTestCase._join_threads  s-   !	I(1C7O,??$$)99== , ,&CG9H$U!" !%	  2 ##%L))//11--113##G, ))//11 #$HH&&AA%Hr: #$HH&&AA%Hs   >E 
B9E 5F c           	         SnSnU GHN  u  pgUS   n[        U[        R                  5      (       a>  [        R	                  SUU[        U5      5        US:  a  [        S   R                  nMf  Mh  [        U[        5      (       a)  SU SU S	3n	[        R                  U	5        [        U	5      e[        U[        5      (       aG  SR                  [        R                  " U6 5      n	[        R                  S
X5        USU SU	 S3-  nGM  [        U[        5      (       d  GM  [!        UR"                  5      [$        L d  GM9  US:  d  GMB  UR"                  nGMQ     ['        U5      S:  a  [        U5      eUS:  ay  [        R)                  5        H`  n
XZR                  :X  d  M  [*        (       a#  [        R	                  SUU
R,                  5          g [        R                  " U
R,                  5      e   g g )NrZ  r   r   z3Thread %s skipping test %s for following reason: %sr   rP   zThread r\  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
r[  r^  )
isinstancer   r3  r  r  r7   rp   r.   r  r   r  r   r  r4  format_exception
SystemExitrJ  coder5   r   r  r   r/   )r'  r  rY  r   	error_msg	skip_coderx  r  excr   rd  s              r:   rR  )MultiThreadedTestCase._check_return_codes  s    		*ND1+C#x0011IH	 q= *9 5 ? ?I !C..v%DWIZXS!"3''C++ggi88(CDGSwtf,EcU"MM	C,,>S(Y] #I+ +0 y>Ay))q="))+.$}T LL
 &//== , r9   c                     [         $ rf   r  r  s    r:   rw    MultiThreadedTestCase.world_size  r  r9   c                 F    U R                  5       R                  S5      S   $ rk  r  r  s    r:   r   (MultiThreadedTestCase._current_test_name  s     wwys#B''r9   r   rh  c                J    U R                   U:X  a  U R                  XU5        gg)z
The reason why we have this util function instead of
self.assertEqual is all threads are sharing one CPU RNG
so the assertion result is only reliable on rank 0
N)rx  r`  r  r   yr   rx  s        r:   assertEqualOnRank'MultiThreadedTestCase.assertEqualOnRank  s%     99Q3' r9   c                H    U R                   U:X  a  U R                  X5        g g rf   )rx  assertNotEqualr  s        r:   assertNotEqualOnRank*MultiThreadedTestCase.assertNotEqualOnRank  s!    99% r9   )r  rx  r  rj  rk  rf   )r1   r2   r3   r4   __doc__queueQueuer  r  r  r7   r  r  r  r  r  r  ro  r  r  r  rR  rm  r5   rw   r   r  r  r8   rp  rq  s   @r:   r  r    s     kkmO/ ?H8;	 &	7#. D D%. ; ;: /> />b "C " " (C ( (( (&1 & &r9   r  c                      ^  \ rS rSrS\\R                  \R                  4   S\	SS4U 4S jjr
S\R                  S\R                  4S jrS	rU =r$ )
SaveForwardInputsModulei  forward_inputscast_forward_inputsr   Nc                 r   > [         TU ]  5         [        R                  " SS5      U l        Xl        X l        g )Nd   )r  r  nnLinearlr  r   r  r  r   r  s      r:   r   SaveForwardInputsModule.__init__  s.    
 	3$,#6 r9   r   c                     XR                   U '   U R                  U R                  (       a3  UR                  U R                  R                  R
                  5      5      $ U5      $ rf   )r  r%  r   toweightdtyper  r   s     r:   forwardSaveForwardInputsModule.forward  sG    $%D!vv43K3Kadd466==../SSQRSSr9   )r   r  r%  r1   r2   r3   r4   r  r#  Modulers   Tensorrl  r  r-  r8   rp  rq  s   @r:   r  r    sZ    7RYY457 "7 
	7T T%,, T Tr9   r  c                      ^  \ rS rSrS\\R                  \R                  4   S\	SS4U 4S jjr
S\R                  S\R                  4S jrS	rU =r$ )
SaveForwardInputsModeli  r  r   r   Nc                 n   > [         TU ]  5         [        X5      U l        [        X5      U l        Xl        g rf   )r  r  r  c1c2r  r&  s      r:   r  SaveForwardInputsModel.__init__  s.    
 	).N).N,r9   r   c                 ^    XR                   U '   U R                  U R                  U5      5      $ rf   )r  r6  r5  r,  s     r:   r-  SaveForwardInputsModel.forward  s'    $%D!wwtwwqz""r9   )r5  r6  r  r/  rq  s   @r:   r3  r3    sV    -RYY45- "- 
	-# #%,, # #r9   r3  c              #     #    U(       d  [         R                  R                  U 5        [         R                  R                  5       =n(       a  UR                  OSnUc  [
        R                  " U5      nS[        R                  S'   S[        R                  S'   U(       ap  U(       aT  [         R                  R                  R                  R                  R                  5       n[
        R                  " SUU US9  O[
        R                  " X US9  [         R                  R!                  5         [         R                  R"                  R$                  R'                  5          S v   [         R                  R!                  5         [         R                  R"                  R$                  R'                  5         U(       a  [
        R(                  " 5         g g ! [         R                  R!                  5         [         R                  R"                  R$                  R'                  5         U(       a  [
        R(                  " 5         f f = f7f)	NcpurM  MASTER_ADDR6789MASTER_PORTfaker  )r   rx  rw   )rs   r  r  current_acceleratorrJ  r   get_default_backend_for_devicerq   rr   testing	_internalrx  r  	FakeStorer  r  r  utilscountersclearr8  )rx  rw   r   init_pgr  accdevice_typer  s           r:   _dynamo_dist_per_rank_initrK  "  s     **40 "--AACCSC%  55kB +BJJ} &BJJ}MM++77??IIKE##%	 ##G:V	MM	MM  &&()$$**,&&(  	$$**,&&( s    E I#G 'A.IA/IIc                   L   ^  \ rS rSrSr\U 4S j5       r\U 4S j5       rSrU =r	$ )#DynamoDistributedSingleProcTestCaseiI  z
Test harness for single-process dynamo distributed tests,
initializes dist process group.

Prefer this for simple tests, as it's easier to debug.
c                   > [         TU ]  5         U R                  R                  [        R
                  " [        R                  SSS.5      5        SU l        [        R                  R                  5       R                  nU SU R                   3U l        XR                  ;   a  S OU R                  /U l        [        R                   " [        R"                  " U5      U R                  SS9  g )NrM  12355)r<  r>  r   r0  r   rx  rw   )r  
setUpClass_exit_stackenter_contextr   r  rq   rr   rx  rs   r  r@  rJ  rh   
device_idsr   r  rA  )r'  rh   r  s     r:   rQ  .DynamoDistributedSingleProcTestCase.setUpClassQ  s    %%JJ

#.#*	
 ""668==xq
+
!'::!5CHH://7chhST	
r9   c                 L   > [         R                  " 5         [        TU ]  5         g rf   )r   r8  r  tearDownClass)r'  r  s    r:   rW  1DynamoDistributedSingleProcTestCase.tearDownClassf  s    ""$r9   r0   )
r1   r2   r3   r4   r  ro  rQ  rW  r8   rp  rq  s   @r:   rM  rM  I  s0     
 
(    r9   rM  c            	       T    \ rS rSrSr\S\4S j5       r\S\S\	S\	SS4S	 j5       r
S
rg)"DynamoDistributedMultiProcTestCaseil  a  
Use this for tests that actually run on multiple GPUs.

Decorate tests with @skip_if_lt_x_gpu(ngpu)

Note: MultiProcTestCase spawns processes per test and is slow.
Prefer MultiThreadedTestCase for most tests. Perhaps use this one
sparingly for integration tests.
r   c                 >    [         R                  R                  5       $ rf   )rs   r  rt   r  s    r:   rw   -DynamoDistributedMultiProcTestCase.world_sizew  s      --//r9   rx  r   r  Nc                     [         R                  " [        R                  " 5       5        U " U5      nXl        X6l        UR                  X$5        g rf   )r   
addHandlerloggingNullHandlerrx  r  r%  r&  s          r:   r  'DynamoDistributedMultiProcTestCase._run{  s<     	W0023 9~	"i-r9   )r  rx  )r1   r2   r3   r4   r  rm  r5   rw   ro  r7   r  r8   r0   r9   r:   rZ  rZ  l  sV     0C 0 0 	.	.#&	.36	.		. 	.r9   rZ  c                   t  ^  \ rS rSr% SrSr\\S'   Sr\\S'   Sr	\
\   \S'   \" SS	9r\\S
'   Sr\\S'   \S\
\   4S j5       r\S\4S j5       r\SS j5       r\S 5       r\S\SS4S j5       r\S 5       r\SS j5       r\U 4S j5       r\U 4S j5       rSU 4S jjrS r S S\S\SS4U 4S jjjrSrU =r$ )!MultiProcContinuousTesti  r   rw   rx  N	rdvz_filex   )secondsrY  Fpoison_pillr   c                     g)z
ProcessGroup backend str.
To be customized by sub test classes, e.g. "nccl".
Otherwise we return None -- lazily decided by tensor.
Nr0   )r'  s    r:   backend_str#MultiProcContinuousTest.backend_str       r9   c                 ^    [         R                  R                  5       nUc  gUR                  $ )Nr;  )rs   r  r@  rJ  )r'  curr_devices     r:   rJ  #MultiProcContinuousTest.device_type  s+    '';;=r9   c                     g)zs
ProcessGroup init options.
To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
Here we return None.
Nr0   )r'  high_priority_streams     r:   optsMultiProcContinuousTest.opts  rl  r9   c           	      6   Uc   e[        U5      [        R                  S'   [        R                  " X25      n[        R
                  " U R                  5       UUUU R                  5       U R                  S9  [        R                  R                  5       U l        g )N
LOCAL_RANK)r   rw   rx  r  
pg_optionsrY  )r7   rq   rr   r   r  r  rj  rr  rY  r  r  pg)r'  rx  rw   re  r  s        r:   _init_pg MultiProcContinuousTest._init_pg  sy    $$$ $'t9

< y5OO%!xxzKK	
 &&99;r9   rn  c                     UR                  SSS9S   nU " U5      nU R                  Ul        U R                  Ul        [        XC5      n[        R
                  " 5         U" S0 UD6  g )Nr<  r   r8  r   r0   )rsplitrx  rw   r   r   r2  )r'  rn  rv   r   r  r  s         r:   _run_test_given_id*MultiProcContinuousTest._run_test_given_id  s_     NN3N3B7	9~HH	..$* 	!!# 	&r9   c                   ^ SnSUs=::  a  U:  d   e   eXl         X l        U R                  XU5        [        R	                  S5         UR                  5       n[        R	                  SU 35        Uc  O% U R                  U5        UR                  U5        MQ  [        R	                  S
5        U(       d  [2        R4                  " 5         g g ! [         a  n[        U[        5      (       ar  [        USS 5      m[        U4S j[        R                  5        5       S 5      n	U	(       a6  UR                  [        R                   " U	R"                  5      5         S nAGM  Sn[$        R&                  " 5       n
SR)                  [*        R,                  " U
6 5      n[/        S	U 35      nXl        UR                  U5         S nAGN*S nAff = f)NFr   zSetup completeTz	Got test r  c              3   J   >#    U  H  oR                   T:X  d  M  Uv   M     g 7frf   )r.   )r  vr.   s     r:   r  7MultiProcContinuousTest._worker_loop.<locals>.<genexpr>  s     T$7q;;);S$7s   #	#rZ  zException in worker process:
zTerminating ...)rx  rw   rx  r  r  r   r|  r  r  r  r  r   nextrp   r  r   r3  r/   rn   r  r  r4  r  r  	__cause__r   r8  )r'  rx  rw   re  
task_queuecompletion_queueraised_exceptionrn  r  
skip_entryr  tb_strenhanced_exr.   s                @r:   _worker_loop$MultiProcContinuousTest._worker_loop  s    D%:%%%%%# 	Ty1 	%&  nn&GLL9WI./2&&w/ $$W- L 	&'
  &&(  A ! 2b*-- 'FD 9I "&TJ$5$5$7T"J "(,,X->->z?Q?Q-RS #' <<>!;!;X!FG*-KF8+TU(*% $$[1112s    7"C 
GB GAGGc                 J   / U l         / U l        / U l        [        R                  " SS9 nUR
                  U l        S S S 5         [        R                  R                  S5        [        [        U5      5       GH  n[        R                  R                  5       n[        R                  R                  5       n[        R                  R                  U R                  S[!        U5      -   SX1U R                  XE4S9nUR#                  5         U R                   R%                  U5        U R                  R%                  U5        U R                  R%                  U5        [&        R)                  SX6R*                  5        GM     g ! , (       d  f       GNM= f! [         a     GN=f = f)NFr  r  r  T)r  r  r+  ru   r  )r  task_queuescompletion_queuesr  r  r  re  rs   r  r  r  r  r5   r  r	  r  r7   r  r  r  r  r  )r'  rw   r   rx  r  r  r  s          r:   r
  (MultiProcContinuousTest._spawn_processes  sC    "((6!FFCM 7
	!!227;
 #j/*D..446J$44::<++33''#d)+zT	 4 G MMOMM  )OO"":.!!(()9:LL94M + 76  		s   FF 
F
F"!F"c                   > [         TU ]  5         U R                  5       nU R                  S:X  aS  [        R
                  " U5      R                  5       U l        U R                  S:X  a  [        R                  " SU S35      e[        R                  SU R                   SU R                   SU 35        U R                  U R                  5        g)	zm
Class-scope test fixture. Run once for entire test class, before any test starts.
Set up the process group.
rd  r   zNo z devices availablezTesting class z on  N)r  rQ  rJ  rw   rs   r  rt   r   r3  r  r  r1   r
  )r'  rJ  r  s     r:   rQ  "MultiProcContinuousTest.setUpClass1  s     	 oo'>>R"44[ANNPCN~~"''#k]:L(MNNS\\N$s~~.>a}M	
 	S^^,r9   c                   > [         R                  SU R                   S35        U R                   H  nUR	                  S5        M     U R
                   H  nUR                  5         M      [        R                  " U R                  5        [         R                  SU R                   S35        [        TU ]9  5         g! [         a     N>f = f)zp
Class-scope test fixture. Run once for entire test class, after all tests finish.
Tear down the process group.
zJoining z workersNzClass z	 finished)r  r  rw   r  r  r  r  rq   ry  re  rz  r  r1   r  rW  )r'  r  r  r  s      r:   rW  %MultiProcContinuousTest.tearDownClassG  s     	x/x89//JNN4  * }}GLLN %	IIcmm$ 	fS\\N)45	  		s   - B? ?
CCc                   > [         TU ]  5         U R                  U l        U R                  R
                  (       a'  [        R                  " SU R                  5        35      e[        U R                  5       HM  u  p[        R                  SU SU R                  5        35        UR                  U R                  5       5        MO     g)z%
Test fixture. Run before each test.
zPrevious test failed, skipping zSending Rank r  N)r  r  r  rx  r  rh  r   r3  r  r@  r  r  r  r  )r  r  r  r  s      r:   r  MultiProcContinuousTest.setUp_  s     	 **	 >>%%##&Edggi[$QRR 't'7'78MALL=2dggi[9:NN4779% 9r9   c                 X   ^ [        T5      U4S j5       n[        R                  " X 5      $ )Nc           	        > U R                   U R                  :X  Ga  [        R                  SU R	                  5        35        [        U R                  5       H  u  pUR                  5       n[        U[        R                  5      (       a  Ue[        U[        5      (       aS  [        R                  SU SU R	                  5        SU R                  R                   35        SU R                  l        UeX0R	                  5       :X  d   e[        R                  SU SU R	                  5        35        M     g T" 5         g )NzWaiting for workers to finish zDetected failure from Rank z in: z(, skipping rest of tests in Test class: TzMain proc detected rank z
 finished )rx  r  r  r  r  r@  r  r   r  r   r3  r  r_  r  r1   rh  )r  r  r  rvr   s       r:   ry   >MultiProcContinuousTest._worker_run_main_wait.<locals>.wrapperr  s   yyD222=dggi[IJ+4T5K5K+L'A)--/B!"h&7&788 !"m449!E$'') MEEI^^E\E\D]_ 6:2  ?*?LL21#Z	{K# ,M, r9   r  r  s    ` r:   _worker_run_main_wait-MultiProcContinuousTest._worker_run_main_waitq  s,    	r	 
	8 ..r9   r  r  c                    > US:w  a  Un[         TU ]  U5         [        X5      n[        XU R	                  U5      5        g ! [
         a,  nUS:w  a  [        SU R                   SU 35      Ue S nAg S nAff = fr  )r  r  r   r  r  r  r  r  r  s        r:   r   MultiProcContinuousTest.__init__  s    
 "$K%		+BDt'A'A"'EF 	Y& !-dnn-=R
|L '	r  rP  )Frk  rj  )r1   r2   r3   r4   r  rw   r5   r6   rx  re  r   r7   r   rY  rh  rl  ro  rj  rJ  rr  rx  r|  r  r
  rQ  rW  r  r  r  r8   rp  rq  s   @r:   rc  rc    sK   JD#N#Ix}#"3/GY/KHSM    C       < <$  4   ;) ;)z N N> - -*    .&$/J ?H8;	 r9   rc  rf   r  )r   rk  r   )r  	functoolsr  r_  r  r  rq   r  r  rn   r  r.  rM  r4  r  r   collections.abcr   
contextlibr   dataclassesr   datetimer   enumr   r   r	   r
   ior   typingr   r   r   r   unittest.mockr   rs   torch._dynamo.test_casetorch.cuda.nccltorch.distributedrx  r   torch.nnr#  torch._C._autogradr   torch._C._distributed_c10dr   torch._logging._internalr   torch.testing._internalr   $torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   r   r    r!   5torch.testing._internal.distributed.multi_threaded_pgr"   r#   r$   	getLoggerr1   r  setLevelINFOr  rg   HAS_ACCELERATORr,   rp   rW   ri   r}   r   r   r   r   r   rl  r   r   r5   r   r   r   r   r   r   r   r  r  r  r	  r  r  r   r(  r*  r6   r-  r>  r7   r4  rB  rF  rh   rL  r]  rm  getenvrl  ri  ro  rw  r  r  r  r  r  r  r  r  rs  r  r  r  cacher  r  r  r  r0  r  r3  rK  r  	test_caserM  rZ  rc  r0   r9   r:   <module>r     sp         	   
       $ % !   , ,  3 3        ) 7 . 0     
		8	$  4 E? 3x38z 
8
C x$FG	
 Xb"BC x45 8B => 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? HR>?  (267!" hr#LM#$ x
V%* 8B DE+, hr#BC-
4 * * **&6  &3 , s s ("$+\4
N: $+1$ D 
S
sCx *F Fc F# F$ F" 	a 
 
: O"))$GOPO,c2  +.'(
T 
IC I 2 2(S (c (s (XS 3 2 26(--	. 5
Xc] 
d 
"  ~8 ~L+N. +N\d3i( 
 D   &0 
3E7tl&H l&^Tbii T #RYY #  :?#) #)L  %--*A*A*J*J   F.)< .8^h ^r9   