
    ph+                        S r SSKrSSKrSSKrSSKrSSKJrJr  SSKJ	r	  SSK
J
r
  SSKJr  \R                  " \5      rS r " S	 S
5      rSr\R&                  R(                  r " S S5      r " S S\R.                  5      rS rg)z%
Utilities for parallel computation.
    N)chainislice)	Traceback)tqdm   )configc                     [         R                  " 5       n [        R                  S:X  a  [	        S5      e[        R                  U :  a'  [
        R                  S[        R                  U 5        U $ [        R                  S:  a8  U [        R                  -   S-   nUS::  a  [	        SR                  X5      5      eU$ [        R                  $ )z2Return the number of processes to use in parallel.r   z,Invalid NUMBER_OF_CORES; value may not be 0.z&Requesting %s cores; only %s available   z[Invalid NUMBER_OF_CORES; negative value is too negative: requesting {} cores, {} available.)multiprocessing	cpu_countr   NUMBER_OF_CORES
ValueErrorloginfoformat)r   nums     P/home/james-whalen/.local/lib/python3.13/site-packages/pyphi/compute/parallel.pyget_num_processesr      s    ))+I":< 	< 	)9''	4!&00014!855;VC5KM M 
!!!    c                   $    \ rS rSrSrS rS rSrg)ExceptionWrapper1   zA picklable wrapper suitable for passing exception tracebacks through
instances of ``multiprocessing.Queue``.

Args:
    exception (Exception): The exception to wrap.
c                 `    Xl         [        R                  " 5       u    p#[        U5      U l        g )N)	exceptionsysexc_infor   tb)selfr   _r   s       r   __init__ExceptionWrapper.__init__9   s!    "<<>1B-r   c                 h    U R                   R                  U R                  R                  5       5      e)zRe-raise the exception.)r   with_tracebackr   as_traceback)r   s    r   reraiseExceptionWrapper.reraise>   s$    nn++DGG,@,@,BCCr   )r   r   N)__name__
__module____qualname____firstlineno____doc__r    r%   __static_attributes__ r   r   r   r   1   s     
Dr   r   c                       \ rS rSrSrSrS rS r\S 5       r	S r
SrS	 r\S
 5       rS rS rS rS rS rS rSS jrSrg)	MapReduceG   a  An engine for doing heavy computations over an iterable.

This is similar to ``multiprocessing.Pool``, but allows computations to
shortcircuit, and supports both parallel and sequential computations.

Args:
    iterable (Iterable): A collection of objects to perform a computation
        over.
    *context: Any additional data necessary to complete the computation.

Any subclass of ``MapReduce`` must implement three methods::

    - ``empty_result``,
    - ``compute``, (map), and
    - ``process_result`` (reduce).

The engine includes a builtin ``tqdm`` progress bar; this can be disabled
by setting ``pyphi.config.PROGRESS_BARS`` to ``False``.

Parallel operations start a daemon thread which handles log messages sent
from worker processes.

Subprocesses spawned by ``MapReduce`` cannot spawn more subprocesses; be
aware of this when composing nested computations. This is not an issue in
practice because it is typically most efficient to only parallelize the top
level computation.
 c                     Xl         X l        SU l        U R                  5       U l        S U l        S U l        S U l        S U l        S U l	        S U l
        S U l        S U l        g )NF)iterablecontextdoneinit_progress_barprogress
task_queueresult_queue	log_queue
log_thread	processesnum_processestaskscomplete)r   r3   r4   s      r   r    MapReduce.__init__g   s_     	..0  !
r   c                     [         e)z>Return the default result with which to begin the computation.NotImplementedError)r   r4   s     r   empty_resultMapReduce.empty_resultw   s    !!r   c                     [         e)z0Map over a single object from ``self.iterable``.rB   )objr4   s     r   computeMapReduce.compute{   s
     "!r   c                     [         e)a\  Reduce handler.

Every time a new result is generated by ``compute``, this method is
called with the result and the previous (accumulated) result. This
method compares or collates these two values, returning the new result.

Setting ``self.done`` to ``True`` in this method will abort the
remainder of the computation, returning this final result.
rB   )r   
new_result
old_results      r   process_resultMapReduce.process_result   s
     "!r   Fc                     [         R                  =(       d    [        R                  (       + nU(       a  SnO/[	        U R
                  5      U l        [        U R
                  5      n[        X!SU R                  S9$ )z%Initialize and return a progress bar.NF)totaldisableleavedesc)	r/   _forkedr   PROGRESS_BARSlistr3   lenr   description)r   rQ   rP   s      r   r6   MapReduce.init_progress_bar   s_     ##?6+?+?'? E /DM&E%))+ 	+r   c                 4    S[         l        [        R                  S5        [	        U5        [        UR                  [        5       Hs  nUR                  5       (       a  [        R                  S5          OH[        R                  SU5        UR                  U " U/UQ76 5        [        R                  SU5        Mu     UR                  [        5        [        R                  S5        g! [         a$  nUR                  [        U5      5         SnAgSnAff = f)z5A worker process, run by ``multiprocessing.Process``.TzWorker process starting...z&Worker received signal - exiting earlyzWorker got %szWorker finished %szWorker process exitingN)r/   rT   r   debugconfigure_worker_loggingitergetPOISON_PILLis_setput	Exceptionr   )rH   r8   r9   r:   r?   r4   rG   es           r   workerMapReduce.worker   s    	2 $III23$Y/JNNK8??$$IIFG		/3/  !7w!78		.4 9 [)II./ 	2-a011	2s   C&C) )
D3DDc           	         [        5       U l        [        R                  " [        S9U l        [        R                  " 5       U l        [        R                  " 5       U l        [        R                  " 5       U l	        U R                  U R
                  U R                  U R                  U R                  4U R                  -   n[        U R                  5       Vs/ s H#  n[        R                  " U R                  USS9PM%     snU l        U R                   H  nUR!                  5         M     [#        U R                  5      U l        U R$                  R!                  5         U R'                  5         gs  snf )zIInitialize all queues and start the worker processes and the log
thread.
)maxsizeT)targetargsdaemonN)r   r=   r   Queue
Q_MAX_SIZEr8   r9   r:   Eventr?   rH   r4   rangeProcessrd   r<   start	LogThreadr;   initialize_tasks)r   ri   iprocesss       r   start_parallelMapReduce.start_parallel   s    /0)//
C+113(..0 (--/doot/@/@/15> 4--.0. ##4;;T$O.0 ~~GMMO & $DNN30s   *E2c                    [        U R                  [        /U R                  -  5      U l        [        U R                  [        5       H4  n[        R                  SU5        U R                  R                  U5        M6     g)zLoad the input queue to capacity.

Overfilling causes a deadlock when `queue.put` blocks when
full, so further tasks are enqueued as results are returned.
Putting %s on queueN)r   r3   r_   r=   r>   r   rl   r   r[   r8   ra   r   tasks     r   rr   MapReduce.initialize_tasks   sY     4==;-$:L:L*LM
4::z2DII+T2OO% 3r   c                      [        U R                  5      n[        R                  SU5        U R                  R                  U5        g! [         a     gf = f)z0Enqueue the next task, if there are any waiting.rx   N)nextr>   r   r[   r8   ra   StopIterationry   s     r   maybe_put_taskMapReduce.maybe_put_task   sL    	&

#D II+T2OO%	  		s   A	 	
AAc                 0    U R                  5         U R                  " U R                  6 nU R                  S:  a  U R                  R                  5       nU R                  5         U[        L a  U =R                  S-  sl        O}[        U[        5      (       a  UR                  5         OWU R                  X!5      nU R                  R                  S5        U R                  (       a  U R                  R!                  5         U R                  S:  a  M  U R#                  5          [&        R)                  S5        U R                  R+                  5         U$ ! [$         a    e f = f! [&        R)                  S5        U R                  R+                  5         f = f)zsPerform the computation in parallel, reading results from the output
queue and passing them to ``process_result``.
r   r
   zRemoving progress bar)ru   rD   r4   r=   r9   r^   r   r_   
isinstancer   r%   rM   r7   updater5   r?   setfinish_parallelrb   r   r[   close)r   resultrs      r   run_parallelMapReduce.run_parallel   s8   	"!&&5F$$q(%%))+##%#&&!+&#344IIK "00;FMM((+ yy))+! $$q($   " II-.MM!  		 II-.MM!s   DE E E!!E$ $1Fc                    U R                    H  nUR                  5         M     [        R                  S5        U R                  R                  [        5        U R                  R                  5         U R                  R                  5         [        R                  S5        U R                  R                  5         U R                  R                  5         g)zOrderly shutdown of workers.zJoining log threadzClosing queuesN)r<   joinr   r[   r:   ra   r_   r;   r   r8   r9   )r   rt   s     r   r   MapReduce.finish_parallel  s    ~~GLLN & 			&';' 			"#!r   c                     U R                   " U R                  6 nU R                   H_  nU R                  " U/U R                  Q76 nU R	                  X15      nU R
                  R                  S5        U R                  (       d  M_    O    U R
                  R                  5         U$ ! [         a  nUeSnAff = f! U R
                  R                  5         f = f)z]Perform the computation sequentially, only holding two computed
objects in memory at a time.
r
   N)
rD   r4   r3   rH   rM   r7   r   r5   rb   r   )r   r   rG   r   rc   s        r   run_sequentialMapReduce.run_sequential  s    	"&&5F}}LL4t||4,,Q7$$Q' 999 % MM!  	G	 MM!s<   B B' B' B; B' 
B; '
B81B33B88B; ;Cc                 P    U(       a  U R                  5       $ U R                  5       $ )zPerform the computation.

Keyword Args:
    parallel (boolean): If True, run the computation in parallel.
        Otherwise, operate sequentially.
)r   r   )r   parallels     r   runMapReduce.run5  s%     $$&&""$$r   )r?   r4   r5   r3   r:   r;   r=   r<   r7   r9   r8   r>   N)T)r'   r(   r)   r*   r+   rX   r    rD   staticmethodrH   rM   rT   r6   rd   ru   rr   r   r   r   r   r   r,   r-   r   r   r/   r/   G   su    : K " " "
" G+  2 20 8
&&"H" ,	%r   r/   c                   2   ^  \ rS rSrSrU 4S jrS rSrU =r$ )rq   iB  zThread which handles log records sent from ``MapReduce`` processes.

It listens to an instance of ``multiprocessing.Queue``, rewriting log
messages to the PyPhi log handler.
c                 <   > Xl         [        TU ]	  5         SU l        g )NT)qsuperr    rj   )r   r   	__class__s     r   r    LogThread.__init__I  s    r   c                    [         R                  S5         U R                  R                  5       nU[        L a  O3[
        R                  " UR                  5      nUR                  U5        MW  [         R                  S5        g )NzLog thread startedzLog thread exiting)	r   r[   r   r^   r_   logging	getLoggernamehandle)r   recordloggers      r   r   LogThread.runN  s`    		&'VVZZ\F$&&v{{3FMM&!  			&'r   )rj   r   )	r'   r(   r)   r*   r+   r    r   r,   __classcell__)r   s   @r   rq   rq   B  s    
( (r   rq   c                 ^    [         R                  R                  SSSSU S.0SS/S.S.5        g	)
z<Configure a worker process to log all messages to ``queue``.r
   Fqueuezlogging.handlers.QueueHandler)classr   DEBUG)levelhandlers)versiondisable_existing_loggersr   rootN)r   r   
dictConfig)r   s    r   r\   r\   Y  sA    NN$)8
  	
 r   )r+   r   r   r   	threading	itertoolsr   r   tblibr   r   r1   r   r   r'   r   r   r   r_   synchronizeSEM_VALUE_MAXrl   r/   Threadrq   r\   r-   r   r   <module>r      s      
  #   !"2D D$ ((66
w% w%v(	   (.r   