
    <i!                     
   S SK r S SKrS SKJr  S SKJr  S SKJrJr  S SK	J
r
  S SKJr  S SKJr  S SKJr  S S	KJrJrJr  S
rSr " S S\\5      r " S S5      r SS\\   S\S\S\S\S\\\4   S-  SS4S jjr " S S5      rg)    N)defaultdict)Enum)Queueget_context)BaseContext)BaseProcess)Synchronized)Empty)AnyIterableTypeiX     c                        \ rS rSrSrSrSrSrg)QueueSignals   stopconfirmerror N)__name__
__module____qualname____firstlineno__r   r   r   __static_attributes__r       Z/home/james-whalen/.local/lib/python3.13/site-packages/qdrant_client/parallel_processor.pyr   r      s    DGEr   r   c                   R    \ rS rSr\S\S\SS 4S j5       rS\\   S\\   4S jrSr	g	)
Worker   argskwargsreturnc                     [        5       eNNotImplementedError)clsr    r!   s      r   startWorker.start   s    !##r   itemsc                     [        5       er$   r%   )selfr*   s     r   processWorker.process   s    !##r   r   N)
r   r   r   r   classmethodr   r(   r   r-   r   r   r   r   r   r      sD    $# $ $ $ $$Xc] $x} $r   r   worker_classinput_queueoutput_queuenum_active_workers	worker_idr!   r"   c                 "  ^ Uc  0 n[         R                  " SU S[        R                  " 5        35         U R                  " S	0 UD6nS[
        [           4U4S jjnUR                  U" 5       5       H  nUR                  U5        M     TR                  5         UR                  5         TR                  5         UR                  5         UR                  5          U=R                   S-  sl        SSS5        [         R                  " SU S35        g! [         a?  n	[         R                  " U	5        UR                  [        R                  5         Sn	A	NSn	A	ff = f! , (       d  f       Nu= f! TR                  5         UR                  5         TR                  5         UR                  5         UR                  5          U=R                   S-  sl        SSS5        O! , (       d  f       O= f[         R                  " SU S35        f = f)
z
A worker that pulls data pints off the input queue, and places the execution result on the output queue.
When there are no data pints left on the input queue, it decrements
num_active_workers to signal completion.
NzReader worker: z PID: r"   c               3   d   >#     TR                  5       n U [        R                  :X  a  g U v   M+  7fr$   )getr   r   )itemr1   s    r   input_queue_iterable%_worker.<locals>.input_queue_iterable7   s0     "(<,,,
	 s   -0   zReader worker z	 finishedr   )logginginfoosgetpidr(   r   r   r-   put	Exception	exceptionr   r   closejoin_threadget_lockvalue)
r0   r1   r2   r3   r4   r!   workerr9   processed_itemes
    `        r   _workerrJ   !   s    ~LL?9+VBIIK=AB!<##-f-	hsm 	 %nn-A-CDN^, E 	!  "((*$$)$ + 	~i[	:;)  -!++,,-" +* 	!  "((*$$)$ +** 	~i[	:;sO   AD E!
E5EE2 EE2 !
E/2AHG"	H"
G0,"Hc            	           \ rS rSrS\4S\S\\   S\S-  S\4S jjr	S\
S	S4S
 jrS\\
   S\
S\
S	\\
   4S jrS\\
   S\
S\
S	\\
   4S jrS\\
   S\
S\
S	\\
   4S jrSS jrSS\S-  S	S4S jjrSS jrSS jrSrg)ParallelWorkerPoolW   Nnum_workersrG   start_methodmax_internal_batch_sizec                     X l         Xl        S U l        S U l        [	        U5      U l        / U l        U R                  U-  U l        SU l        S U l	        g )NF)
r0   rN   r1   r2   r   ctx	processes
queue_sizeemergency_shutdownr3   )r,   rN   rG   rO   rP   s        r   __init__ParallelWorkerPool.__init__X   sV     #&)-*. +L 9,.**-DD"'48r   r!   r"   c                    U R                   R                  U R                  5      U l        U R                   R                  U R                  5      U l        U R                   R                  SU R                  5      n[        U[        5      (       d   eX l	        [        SU R                  5       H  n[        U R                   S5      (       d   eU R                   R                  [        U R                  U R                  U R                  U R                  UUR                  5       4S9nUR!                  5         U R"                  R%                  U5        M     g )Nir   Process)targetr    )rR   r   rT   r1   r2   ValuerN   
isinstance	BaseValuer3   rangehasattrrZ   rJ   r0   copyr(   rS   append)r,   r!   	ctx_valuer4   r-   s        r   r(   ParallelWorkerPool.starti   s   88>>$//: HHNN4??;HHNN3(8(89	)Y////"+q$"2"23I488Y////hh&&%%$$%%++KKM ' 
G MMONN!!'* 4r   streamr    c              /   f  #     U R                   " S	0 UD6  U R                  c   S5       eU R                  c   S5       eSnSnU H  nU R                  5         XE-
  U R                  :  a   U R                  R                  5       nO U R                  R                  [        S9nUb8  U[        R                  :X  a  U R                  5         [        S5      eUv   US-  nU R                  R                  U5        US-  nM     [        U R                  5       H,  n	U R                  R                  [        R                   5        M.     XT:  a\  U R                  R                  [        S9nU[        R                  :X  a  U R                  5         [        S5      eUv   US-  nXT:  a  M\  U R                  c   S5       eU R                  c   S5       eU R#                  5         U R                  R%                  5         U R                  R%                  5         U R&                  (       a5  U R                  R)                  5         U R                  R)                  5         g U R                  R+                  5         U R                  R+                  5         g ! [         a    S n GNf = f! [         a  nU R                  5         UeS nAff = f! U R                  c   S5       eU R                  c   S5       eU R#                  5         U R                  R%                  5         U R                  R%                  5         U R&                  (       a5  U R                  R)                  5         U R                  R)                  5         f U R                  R+                  5         U R                  R+                  5         f = f7f)
NzInput queue was not initializedz Output queue was not initializedr   timeoutzThread unexpectedly terminatedr;   zInput queue is NonezOutput queue is Noner   )r(   r1   r2   check_worker_healthrT   
get_nowaitr
   r7   processing_timeoutjoin_or_terminater   r   RuntimeErrorr@   r_   rN   r   joinrC   rU   cancel_join_threadrD   )
r,   re   r    r!   pushedreadr8   out_itemrI   _s
             r   unordered_map ParallelWorkerPool.unordered_map   s.    4	0JJ  ##/R1RR/$$0T2TT0FD((*=4??2(#'#4#4#?#?#A #'#4#4#8#8AS#8#T
 '<#5#55..0*+KLL"NAID  $$T*!+ . 4++,  $$\%6%67 - -,,009K0L|111**,&'GHH	 - ##/F1FF/$$0H2HH0IIK""$##%&&  335!!446  ,,.!!--/O ! (#'(
 !  ..0 0 ##/F1FF/$$0H2HH0IIK""$##%&&  335!!446  ,,.!!--/sh   N1A%K *JK J"#DK (C'N1JK JK "
K,J>>KK C(N..N1c                 @    U R                   " [        U5      /UQ70 UD6$ r$   )rt   	enumerate)r,   re   r    r!   s       r   semi_ordered_map#ParallelWorkerPool.semi_ordered_map   s"    !!)F"3EdEfEEr   c              /      #    [        [        5      nSnU R                  " U/UQ70 UD6 H/  u  pgXtU'   XT;   d  M  UR                  U5      v   US-  nXT;   a  M  M1     g 7f)Nr   r;   )r   intrx   pop)r,   re   r    r!   buffernext_expectedidxr8   s           r   ordered_mapParallelWorkerPool.ordered_map   sd     S!..vGGGIC3K)jj//"  ) Hs   4AAAc                     U R                    Hf  nUR                  5       (       a  M  UR                  S:w  d  M,  SU l        U R	                  5         [        SUR                   SUR                   35      e   g)z:
Checks if any worker process has terminated unexpectedly
r   TzWorker PID: z# terminated unexpectedly with code N)rS   is_aliveexitcoderU   rl   rm   pidr,   r-   s     r   ri   &ParallelWorkerPool.check_worker_health   sl     ~~G##%%'*:*:a*?*.'&&(""7;;-/RSZScScRde 	 &r   rh   c                     SU l         U R                   H9  nUR                  US9  UR                  5       (       d  M)  UR	                  5         M;     U R                  R                  5         g)z-
Emergency shutdown
@param timeout:
@return:
Trg   N)rU   rS   rn   r   	terminateclear)r,   rh   r-   s      r   rl   $ParallelWorkerPool.join_or_terminate   sU     #'~~GLLL)!!!!# & 	r   c                 ~    U R                    H  nUR                  5         M     U R                   R                  5         g r$   )rS   rn   r   r   s     r   rn   ParallelWorkerPool.join   s)    ~~GLLN &r   c                 x    U R                    H*  nUR                  5       (       d  M  UR                  5         M,     g)a  
Terminate processes if the user hasn't joined. This is necessary as
leaving stray processes running can corrupt shared state. In brief,
we've observed shared memory counters being reused (when the memory was
free from the perspective of the parent process) while the stray
workers still held a reference to them.
For a discussion of using destructors in Python in this manner, see
https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python/.
N)rS   r   r   r   s     r   __del__ParallelWorkerPool.__del__   s-     ~~G!!!!# &r   )	rR   rU   r1   r3   rN   r2   rS   rT   r0   )r"   N)r;   )r   r   r   r   MAX_INTERNAL_BATCH_SIZEr{   r   r   strrV   r   r(   r   rt   rx   r   ri   rl   rn   r   r   r   r   r   rL   rL   W   s    
 $('>99 V9 Dj	9
 "%9"+c +d +050HSM 50# 50 50QYZ]Q^ 50nFx} FS FC FT\]`Ta F#(3- # #s #xX[} #
t D 
$r   rL   r$   )r<   r>   collectionsr   enumr   multiprocessingr   r   multiprocessing.contextr   multiprocessing.processr   multiprocessing.sharedctypesr	   r^   queuer
   typingr   r   r   rk   r   r   r   r   r{   dictrJ   rL   r   r   r   <module>r      s     	 #  . / / B  & &   3 $ $ %)3<v,3<3< 3< "	3<
 3< cNT!3< 
3<lX$ X$r   