
    oi"                     B    S SK r SS/rS r " S S5      r " S S5      rg)    NWorkerExtensionColocateWorkerExtensionc                 N    SSK Jn  SSKJn  UR	                  U UUUS9nU" XtS9nU$ )a'  
vLLM provides `StatelessProcessGroup` to create a process group
without considering the global process group in torch.distributed.
It is recommended to create `StatelessProcessGroup`, and then initialize
the data-plane communication (NCCL) between external (train processes) 
and vLLM workers.
r   )PyNcclCommunicator)StatelessProcessGroup)hostportrank
world_size)device),vllm.distributed.device_communicators.pyncclr   vllm.distributed.utilsr   create)	master_addressmaster_portr
   r   r   r   r   pgpynccls	            U/home/james-whalen/.local/lib/python3.13/site-packages/unsloth_zoo/vllm_rlhf_utils.pystateless_init_process_groupr      s<     P<		%	%>+6+/1; 
& 
=B  2FM    c                   *    \ rS rSrSrS rS rS rSrg)r   *   a[  
The class for vLLM's worker to inherit from.
By defining an extension class, the code can work no matter what is
the underlying worker class. This way, the code can be compatible
with both vLLM V0 and V1.
NOTE: we define this class in a separate module, and the main module
should pass the full qualified name as `worker_extension_cls` argument.
c                 t    SSK Jn  U" 5       R                  U-   n[        UUUUU R                  5      U l        g )Nr   )get_world_group)vllm.distributed.parallel_stater   r
   r   r   model_update_group)selfr   r   rank_offsetr   r   r
   s          r   init_weight_update_group(WorkerExtension.init_weight_update_group4   s8    C %%3">KK#
r   c                     [         R                  " X2SS9nU R                  R                  US[         R                  R                  5       S9  U R                  R                  R                  X4/S9  Ag )Ncuda)dtyper   r   )srcstreamweights)	torchemptyr   	broadcastr"   current_streammodel_runnermodelload_weights)r   namer#   shapeweights        r   update_weightWorkerExtension.update_weight@   sh    U?))&./161J1J1L 	* 	N 	,,tn5E,Fr   c                     SnU R                   R                  R                  5        H9  u  p#U=(       a+    [        R                  " U[        R
                  " U5      5      nM;     U$ z(
Check if the weights are updated to 0.
Tr,   r-   named_parametersr(   allclose
zeros_liker   weights_updatedr/   ps       r   check_weights_changed%WorkerExtension.check_weights_changedJ   X     ((..??AGD- (%..5##A&3(O B r   )r   N)	__name__
__module____qualname____firstlineno____doc__r   r2   r=   __static_attributes__ r   r   r   r   *   s    

r   c                   >    \ rS rSrSrS\4S jrS rS rS r	S r
S	rg
)r   U   at  
The class for vLLM's worker to inherit from, in the colocate setting.
By defining an extension class, the code can work no matter what is
the underlying worker class. This way, the code can be compatible
with both vLLM V0 and V1.
NOTE: we define this class in a separate module, and the main module
should pass the full qualified name as `worker_extension_cls` argument.
returnc                 z    SSK Jn  UR                  U R                  R                  5      U l        U R
                  $ )Nr   )current_platform)vllm.platformsrK   get_device_uuidr   indexdevice_uuid)r   rK   s     r   report_device_id(ColocateWorkerExtension.report_device_id_   s.    3+;;DKK<M<MNr   c                 X   XR                      nU R                  R                  n/ nUR                  5        H/  u  pVUu  px[	        U5      n	X9S'   U" U	6 n
UR                  XZ45        M1     U R                  R                  R                  US9  [        R                  R                  5         g )N   r&   )rO   r   rN   itemslistappendr,   r-   r.   r(   r"   synchronize)r   ipc_handleshandles	device_idr'   r/   handlefuncargs	list_argstensors              r   update_weights_from_ipc_handles7ColocateWorkerExtension.update_weights_from_ipc_handlesd   s    ../KK%%	#MMOLDJDT
I %aL9%FNND>* , 	,,W,=

 r   c                 <   U R                   R                  n/ / p2/ / pT/ nUR                  R                   H  n[        UR                  R
                  R                  S   5        UR                  UR                  R
                  R                  S   5        UR                  UR                  R
                  R                  S   5        UR                  UR                  R
                  R                  S   5        M     [        R                  R                  5         U$ )Nr         )r,   r-   layersprint	self_attnqkv_projlora_a_stackedrV   r(   r"   rW   )r   
vllm_modelmodel_loras_Amodel_loras_Bvllm_loras_Avllm_loras_B
parametersv_layers           r   get_model_runner(ColocateWorkerExtension.get_model_runners   s    &&,,
')2}')2|
!''..G'##,,;;A>?  !2!2!;!;!J!J1!MN  !2!2!;!;!J!J1!MN  !2!2!;!;!J!J1!MN	 / 	

 r   c                     SnU R                   R                  R                  5        H9  u  p#U=(       a+    [        R                  " U[        R
                  " U5      5      nM;     U$ r5   r6   r:   s       r   r=   -ColocateWorkerExtension.check_weights_changed   r?   r   c                     SSK Jn  0 nU R                  R                  nUR	                  5        H  u  pEU" UR                  5       5      X$'   M     U R                  U0$ )Nr   )reduce_tensor) torch.multiprocessing.reductionsrv   r,   r-   r7   detachrO   )r   rv   datarj   r/   r<   s         r   get_weight_ipc_handles.ColocateWorkerExtension.get_weight_ipc_handles   sW    B&&,,
!224GD
 'qxxz2DJ 5   $''r   )rO   N)r@   rA   rB   rC   rD   strrP   r`   rq   r=   rz   rE   rF   r   r   r   r   U   s'     #  
!
(r   )r(   __all__r   r   r   rF   r   r   <module>r~      s3   " 
&( (VA( A(r   