
    izW                       S SK Jr  S SKrS SKrS SKrS SKrS SKJrJrJ	r	J
r
JrJrJr  S SKJrJr  S SKJr  S SKJr  S SKJrJr  S SKJrJrJr  S S	KJr  S S
KJr  S SK J!r!  S SK"J#r#J$r$  S SK%J&r&  S SK'J(r(  SSK)J*r*J+r+  SSK,J-r-J.r.J/r/  \(       a  S SK0J1r1  \" S\S9r2\3\4\5\   4   r6 " S S\75      r8\Rr                  " S\85         " S S\
\-   5      r:\Rv                  " SS9SS j5       r<\Rv                  " SS9SS j5       r=g)    )annotationsN)TYPE_CHECKINGAnyAsyncGeneratorGenericTypeTypeVarcast)	BaseModelValidationError)SerializedContext)
StepConfig)ContextSerdeErrorWorkflowRuntimeError)Event
StartEvent	StopEvent)BrokerState)WorkflowBroker)basic_runtime)PluginWorkflowRuntime)
RunResultT)WorkflowHandler   )BaseSerializerJsonSerializer)MODEL_T	DictStateInMemoryStateStore)WorkflowT)boundc                      \ rS rSrSrg)UnserializableKeyWarning5    N)__name__
__module____qualname____firstlineno____static_attributes__r'       S/home/james-whalen/.local/lib/python3.13/site-packages/workflows/context/context.pyr%   r%   5   s    r-   r%   oncec                     \ rS rSr% SrSrS\S'   S\S'   S\S	'   S
\S'   SS\4         SS jjr\	S S j5       r
 S!     S"S jjr  S#       S$S jjrS%S jr\	S&S j5       r\	S'S j5       rS!S(S jjr\ S!       S)S jj5       rS*S jr S!       S+S jjrS!S,S jjr    S-           S.S jjrS/S jrS0S jrS1S jr\	S2S j5       rSrg)3Context<   a  
Global, per-run context for a `Workflow`. Provides an interface into the
underlying broker run, for both external (workflow run oberservers) and
internal consumption by workflow steps.

The `Context` coordinates event delivery between steps, tracks in-flight work,
exposes a global state store, and provides utilities for streaming and
synchronization. It is created by a `Workflow` at run time and can be
persisted and restored.

Args:
    workflow (Workflow): The owning workflow instance. Used to infer
        step configuration and instrumentation.
    previous_context: A previous context snapshot to resume from.
    serializer: A serializer to use for serializing and deserializing the current and previous context snapshots.

Attributes:
    is_running (bool): Whether the workflow is currently running.
    store (InMemoryStateStore[MODEL_T]): Type-safe, async state store shared
        across steps. See also
        [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore].

Examples:
    Basic usage inside a step:

    ```python
    from workflows import step
    from workflows.events import StartEvent, StopEvent

    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        await ctx.store.set("query", ev.topic)
        ctx.write_event_to_stream(ev)  # surface progress to UI
        return StopEvent(result="ok")
    ```

    Persisting the state of a workflow across runs:

    ```python
    from workflows import Context

    # Create a context and run the workflow with the same context
    ctx = Context(my_workflow)
    result_1 = await my_workflow.run(..., ctx=ctx)
    result_2 = await my_workflow.run(..., ctx=ctx)

    # Serialize the context and restore it
    ctx_dict = ctx.to_dict()
    restored_ctx = Context.from_dict(my_workflow, ctx_dict)
    result_3 = await my_workflow.run(..., ctx=restored_ctx)
    ```


See Also:
    - [Workflow][workflows.Workflow]
    - [Event][workflows.events.Event]
    - [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore]
)memoryInMemoryStateStore[MODEL_T]_state_storezWorkflowBroker[MODEL_T] | None_broker_runr   _pluginr!   	_workflowNc           	        U=(       d
    [        5       U l        S U l        X@l        Xl        U=(       d
    [        5       nUb/   [
        R                  " U5      n[        R                  " XQU5        O
[        5       nXPl        [        5       nUR                  5       R                  5        Ht  u  pU	R                  n
U
R                   c  M   U
R                   ["        :w  d  M6  [%        U
R                   [&        5      (       d  MW  U
R                   nUR)                  U5        Mv     [+        U5      S:  a7  [-        SSR/                  U Vs/ s H  oR0                  PM     sn5      -   5      eU(       a  UR3                  5       O["        nUR4                  (       a}  [6        R8                  " UR4                  U5      nUR:                  U:w  a/  [-        SUR0                   SUR:                  R0                   35      e[=        [6        [>           U5      U l         g  [=        [>        U" 5       5      n[7        U5      U l         g ! [         a  n[        SU 35      UeS nAff = fs  snf ! [B         a  n[E        SU SU 35      UeS nAff = f)	Nz-Context dict specified in an invalid format: r   zqMultiple state types are not supported. Make sure that each Context[...] has the same generic state type. Found: z, z/State type mismatch. Workflow context expected z, got z#Failed to initialize state of type zB. Does your state define defaults for all fields? Original error:
)#r   _serializerr6   r7   r8   r   from_dict_autor   from_serializedr   r   _init_snapshotset
_get_stepsitems_step_configcontext_state_typer   
issubclassr   addlen
ValueErrorjoinr(   popstater    	from_dict
state_typer
   r   r5   	Exceptionr   )selfworkflowprevious_context
serializerpluginprevious_context_parsedestate_types_	step_funcstep_configrK   store_statestate_instances                 r.   __init__Context.__init__   sl    &9)9!  3>#3
'*;*J*J$+' +++z '8&9#5
 -0E$//1779LA&/&<&<K..:22i?{==yII(;;

+ : {a D));O;Z00;OPQ  +6[__&9
"((,66'--zK %%3 EjFYFYEZZ`alawaw  bA  bA  aB  C  !%%7%@+ ND!%gz|!<$6~$F!U # 'CA3G4 P"  *9*  FI  JK  IL  Ms6   -H9 I%I 9
III
J )I;;J c                t    U R                   c  U R                  R                  $ U R                   R                  $ )z*Whether the workflow is currently running.)r6   r=   
is_runningrM   s    r.   r]   Context.is_running   s4     #&&111##...r-   c                    U R                   b  [        S5      eU=(       d7    U R                  R                  [	        [
        R                  " 5       5      5      n[        UU UU R                  S9U l         U R                   $ )NzBroker already initialized)rN   contextruntimerQ   )r6   r   r7   new_runtimestruuiduuid4r   )rM   rN   rQ   rb   s       r.   _init_brokerContext._init_broker   sk     '&'CDD#)#XT\\-E-Ec$**,FW-X)<<	
 r-   c                (  ^^ SmU R                   b  U R                   mSU l         U R                  U5      U l         SUU4S jjnSU4S jjn[        R                  " U R                  XR
                  5      nU R                   R                  UUUUUS9$ )z:
called by package internally from the workflow to run it
Nc                    >#    T b   T R                  5       I S h  vN   Tb  TR                  5       I S h  vN   g g  N!! [         a     N+f = f N7fN)shutdownrL   acquire)prev_broker	semaphores   r.   before_start+Context._workflow_run.<locals>.before_start   s^     &%..000 $''))) % 1   *s;   A? =? AAA? 
A	AAAc                 6   >#    T b  T R                  5         g g 7frk   )release)ro   s   r.   after_complete-Context._workflow_run.<locals>.after_complete   s     $!!# %s   )rN   previousstart_eventrp   rt   returnNone)r6   rg   r   r<   r=   r:   start)rM   rN   rw   ro   rp   rt   rI   rn   s      `   @r.   _workflow_runContext._workflow_run   s     7;'**K#D,,X6	* 	*	$ +++;+;
 %%#%) & 
 	
r-   c                8    U R                   R                  5         g)z>
Called internally from the handler to cancel a context's run
N)_running_broker
cancel_runr^   s    r.   _workflow_cancel_runContext._workflow_cancel_run
  s     	'')r-   c                J    U R                   c  [        S5      eU R                   $ )Nz}Workflow run is not yet running. Make sure to only call this method after the context has been passed to a workflow.run call.)r6   r   r^   s    r.   r   Context._running_broker  s/    #& P  r-   c                    U R                   $ )zTyped, process-local state store shared across steps.

If no state was initialized yet, a default
[DictState][workflows.context.state_store.DictState] store is created.

Returns:
    InMemoryStateStore[MODEL_T]: The state store instance.
)r5   r^   s    r.   storeContext.store  s        r-   c                l   U=(       d    U R                   n0 nU R                  b  U R                  R                  U5      nU R                  b  U R                  R                  nO,[
        R                  " U R                  U R                  U5      nUR                  U5      nX$l
        UR                  SS9$ )a  Serialize the context to a JSON-serializable dict.

Persists the global state store, event queues, buffers, accepted events,
broker log, and running flag. This payload can be fed to
[from_dict][workflows.context.context.Context.from_dict] to resume a run
or carry state across runs.

Args:
    serializer (BaseSerializer | None): Value serializer used for state
        and event payloads. Defaults to
        [JsonSerializer][workflows.context.serializers.JsonSerializer].

Returns:
    dict[str, Any]: A dict suitable for JSON encoding and later
    restoration via `from_dict`.

See Also:
    - [InMemoryStateStore.to_dict][workflows.context.state_store.InMemoryStateStore.to_dict]

Examples:
    ```python
    ctx_dict = ctx.to_dict()
    my_db.set("key", json.dumps(ctx_dict))

    ctx_dict = my_db.get("key")
    restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
    result = await my_workflow.run(..., ctx=restored_ctx)
    ```
python)mode)r:   r5   to_dictr6   _stater   r<   r=   r8   to_serializedrI   
model_dump)rM   rP   
state_databroker_statera   s        r.   r   Context.to_dict$  s    <  34#3#3
 
(**22:>J '++22L '66##T^^ZL ,,Z8"!!x!00r-   c                R     U " XUS9$ ! [          a  nSn[        U5      UeSnAff = f)aS  Reconstruct a `Context` from a serialized payload.

Args:
    workflow (Workflow): The workflow instance that will own this
        context.
    data (dict[str, Any]): Payload produced by
        [to_dict][workflows.context.context.Context.to_dict].
    serializer (BaseSerializer | None): Serializer used to decode state
        and events. Defaults to JSON.

Returns:
    Context[MODEL_T]: A context instance initialized with the persisted
        state and queues.

Raises:
    ContextSerdeError: If the payload is missing required fields or is
        in an incompatible format.

Examples:
    ```python
    ctx_dict = ctx.to_dict()
    my_db.set("key", json.dumps(ctx_dict))

    ctx_dict = my_db.get("key")
    restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
    result = await my_workflow.run(..., ctx=restored_ctx)
    ```
)rO   rP   zRError creating a Context instance: the provided payload has a wrong or old format.N)KeyErrorr   )clsrN   datarP   rS   msgs         r.   rJ   Context.from_dictX  s7    F	0x:NN 	0fC#C(a/	0s   	 
&!&c                R   #    U R                   R                  5       I Sh  vN $  N7f)zReturn the list of currently running step names.

Returns:
    list[str]: Names of steps that have at least one active worker.
N)r   running_stepsr^   s    r.   r   Context.running_steps  s"      ))779999s   '%'c                :    U R                   R                  XU5      $ )a8  
Buffer events until all expected types are available, then return them.

This utility is helpful when a step can receive multiple event types
and needs to proceed only when it has a full set. The returned list is
ordered according to `expected`.

Args:
    ev (Event): The incoming event to add to the buffer.
    expected (list[Type[Event]]): Event types to collect, in order.
    buffer_id (str | None): Optional stable key to isolate buffers across
        steps or workers. Defaults to an internal key derived from the
        task name or expected types.

Returns:
    list[Event] | None: The events in the requested order when complete,
    otherwise `None`.

Examples:
    ```python
    @step
    async def synthesize(
        self, ctx: Context, ev: QueryEvent | RetrieveEvent
    ) -> StopEvent | None:
        events = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
        if events is None:
            return None
        query_ev, retrieve_ev = events
        # ... proceed with both inputs present ...
    ```

See Also:
    - [Event][workflows.events.Event]
)r   collect_events)rM   evexpected	buffer_ids       r.   r   Context.collect_events  s    J ##222KKr-   c                8    U R                   R                  X5      $ )ag  Dispatch an event to one or all workflow steps.

If `step` is omitted, the event is broadcast to all step queues and
non-matching steps will ignore it. When `step` is provided, the target
step must accept the event type or a
[WorkflowRuntimeError][workflows.errors.WorkflowRuntimeError] is raised.

Args:
    message (Event): The event to enqueue.
    step (str | None): Optional step name to target.

Raises:
    WorkflowRuntimeError: If the target step does not exist or does not
        accept the event type.

Examples:
    It's common to use this method to fan-out events:

    ```python
    @step
    async def my_step(self, ctx: Context, ev: StartEvent) -> WorkerEvent | GatherEvent:
        for i in range(10):
            ctx.send_event(WorkerEvent(msg=i))
        return GatherEvent()
    ```

    You also see this method used from the caller side to send events into the workflow:

    ```python
    handler = my_workflow.run(...)
    async for ev in handler.stream_events():
        if isinstance(ev, SomeEvent):
            handler.ctx.send_event(SomeOtherEvent(msg="Hello!"))

    result = await handler
    ```
)r   
send_event)rM   messagesteps      r.   r   Context.send_event  s    L ##..w==r-   c                X   #    U R                   R                  XX4U5      I Sh  vN $  N7f)a0  Wait for the next matching event of type `event_type`.

Optionally emits a `waiter_event` to the event stream once per `waiter_id` to
inform callers that the workflow is waiting for external input.
This helps to prevent duplicate waiter events from being sent to the event stream.

Args:
    event_type (type[T]): Concrete event class to wait for.
    waiter_event (Event | None): Optional event to write to the stream
        once when the wait begins.
    waiter_id (str | None): Stable identifier to avoid emitting multiple
        waiter events for the same logical wait.
    requirements (dict[str, Any] | None): Key/value filters that must be
        satisfied by the event via `event.get(key) == value`.
    timeout (float | None): Max seconds to wait. `None` means no
        timeout. Defaults to 2000 seconds.

Returns:
    T: The received event instance of the requested type.

Raises:
    asyncio.TimeoutError: If the timeout elapses.

Examples:
    ```python
    @step
    async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
        response = await ctx.wait_for_event(
            HumanResponseEvent,
            waiter_event=InputRequiredEvent(msg="What's your name?"),
            waiter_id="user_name",
            timeout=60,
        )
        return StopEvent(result=response.response)
    ```
N)r   wait_for_event)rM   
event_typewaiter_event	waiter_idrequirementstimeouts         r.   r   Context.wait_for_event  s2     X ))88iw
 
 	
 
s   !*(*c                :    U R                   R                  U5        g)a  Enqueue an event for streaming to [WorkflowHandler]](workflows.handler.WorkflowHandler).

Args:
    ev (Event | None): The event to stream. `None` can be used as a
        sentinel in some streaming modes.

Examples:
    ```python
    @step
    async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
        ctx.write_event_to_stream(ev)
        return StopEvent(result="ok")
    ```
N)r   write_event_to_stream)rM   r   s     r.   r   Context.write_event_to_stream  s     	2226r-   c                    [        5         U R                  R                  c  [        S5      eU R                  R                  R	                  5       $ )a  Return the final result of the workflow run.

Deprecated:
    This method is deprecated and will be removed in a future release.
    Prefer awaiting the handler returned by `Workflow.run`, e.g.:
    `result = await workflow.run(..., ctx=ctx)`.

Examples:
    ```python
    # Preferred
    result = await my_workflow.run(..., ctx=ctx)

    # Deprecated
    result_agent = ctx.get_result()
    ```

Returns:
    RunResultT: The value provided via a `StopEvent`.
zWorkflow handler is not set)_warn_get_resultr   _handlerr   resultr^   s    r.   
get_resultContext.get_result  sC    ( 	((0&'DEE##,,3355r-   c                6    U R                   R                  5       $ )z8The internal queue used for streaming events to callers.)r   stream_published_eventsr^   s    r.   stream_eventsContext.stream_events2  s    ##;;==r-   c                
  ^ ^ [        5         [        R                  " 5       mSUU 4S jjn [        R                  " U" 5       5        T$ ! [         a/    [        R
                  " 5       nUR                  U" 5       5         T$ f = f)zDeprecated queue-based event stream.

Returns an asyncio.Queue that is populated by iterating this context's
stream_events(). A deprecation warning is emitted once per process.
c                    >#    TR                  5         S h  vN n TR                  U 5      I S h  vN   [        U [        5      (       d  M9    g  N6 N
 g 7frk   )r   put
isinstancer   )r   qrM   s    r.   _pump&Context.streaming_queue.<locals>._pump@  sE      ..0 beeBib),, 1s;   AAAAAAAAAAArx   )_warn_streaming_queueasyncioQueuecreate_taskRuntimeErrorget_event_loop)rM   r   loopr   s   `  @r.   streaming_queueContext.streaming_queue6  sp     	")--/	 		&(   	&))+DUW%	&s   A	 	5BB)r6   r=   r7   r:   r5   r8   )
rN   r!   rO   dict[str, Any] | NonerP   BaseSerializer | NonerQ   r   ry   rz   )ry   boolrk   )rN   r!   rQ   zWorkflowRuntime | Nonery   WorkflowBroker[MODEL_T])NN)rN   r!   rw   zStartEvent | Nonero   zasyncio.Semaphore | Nonery   r   rx   )ry   r   )ry   r4   )rP   r   ry   dict[str, Any])rN   z
'Workflow'r   r   rP   r   ry   z'Context[MODEL_T]')ry   z	list[str])r   r   r   zlist[Type[Event]]r   
str | Nonery   zlist[Event] | None)r   r   r   r   ry   rz   )NNNi  )r   zType[T]r   Event | Noner   r   r   r   r   zfloat | Nonery   r"   )r   r   ry   rz   )ry   r   )ry   zAsyncGenerator[Event, None])ry   zasyncio.Queue)r(   r)   r*   r+   __doc__known_unserializable_keys__annotations__r   rZ   propertyr]   rg   r|   r   r   r   r   classmethodrJ   r   r   r   r   r   r   r   r   r,   r'   r-   r.   r1   r1   <   s   9z !, .-//O
 37,0&FF 0F *	F
 F 
FP / / DH   *@ 	  & *..2	&
&
 '&
 ,	&

 
&
P*     	! 	!21h 
 -1	&0&0 &0 *	&0
 
&0 &0P: OS%L%L#4%LAK%L	%LN&>V &* $.2 $.
.
 #.
 	.

 ,.
 .
 
.
`7"62>  r-   r1   )maxsizec                 8    [         R                  " S[        SS9  g )NzContext.get_result() is deprecated and will be removed in a future release. Prefer awaiting the WorkflowHandler returned by Workflow.run: `result = await workflow.run(..., ctx=ctx)`.   
stacklevelwarningswarnDeprecationWarningr'   r-   r.   r   r   N  s    MMI 	r-   c                 8    [         R                  " S[        SS9  g )NzContext.streaming_queue is deprecated and will be removed in a future release. Prefer iterating Context.stream_events(): `async for ev in ctx.stream_events(): ...`r   r   r   r'   r-   r.   r   r   [  s    MM9 	r-   rx   )>
__future__r   r   	functoolsre   r   typingr   r   r   r   r   r	   r
   pydanticr   r   workflows.context.context_typesr   workflows.decoratorsr   workflows.errorsr   r   workflows.eventsr   r   r   &workflows.runtime.types.internal_stater   workflows.runtime.brokerr   workflows.plugins.basicr   workflows.runtime.types.pluginr   r   workflows.typesr   workflows.handlerr   serializersr   r   state_storer   r   r    	workflowsr!   r"   dictrd   listEventBufferWarningr%   simplefilterr1   	lru_cacher   r   r'   r-   r.   <module>r      s    #       0 = + 
 ? 3 1 B & - 7 ? ? " Cu3U#$	w 	 	  f6 7Ogg Od Q	  	 Q	  	r-   