
    h8E                    z   S r SSKJr  SSKJrJr  SSKJr  SSKJ	r	  SSK
rSSKJr  SSKJrJrJr  SS	KJr  SS
KJrJr  SSKJrJrJrJr  SSKJrJr  SSKJ r    " S S\5      r! " S S\5      r" " S S\"5      r# " S S\"5      r$ " S S\"5      r% " S S\"5      r& " S S\"5      r' " S S\"5      r( " S S\"5      r)g) z9Vectorizes observation wrappers to works for `VectorEnv`.    )annotations)CallableSequence)deepcopy)AnyN)Space)ActTypeEnvObsType)warn)	VectorEnvVectorObservationWrapper)batch_spaceconcatenatecreate_empty_arrayiterate)	ArrayTypeAutoresetMode)transform_observationc                  P   ^  \ rS rSrSr  S       SU 4S jjjrSS jrSrU =r$ )	TransformObservation   a  Transforms an observation via a function provided to the wrapper.

This function allows the manual specification of the vector-observation function as well as the single-observation function.
This is desirable when, for example, it is possible to process vector observations in parallel or via other more optimized methods.
Otherwise, the ``VectorizeTransformObservation`` should be used instead, where only ``single_func`` needs to be defined.

Example - Without observation transformation:
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> obs
    array([[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282],
           [ 0.02852531,  0.02858594,  0.0469136 ,  0.02480598],
           [ 0.03517495, -0.000635  , -0.01098382, -0.03203924]],
          dtype=float32)
      >>> envs.close()

Example - With observation transformation:
    >>> import gymnasium as gym
    >>> from gymnasium.spaces import Box
    >>> def scale_and_shift(obs):
    ...     return (obs - 1.0) * 2.0
    ...
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync")
    >>> new_obs_space = Box(low=envs.observation_space.low, high=envs.observation_space.high)
    >>> envs = TransformObservation(envs, func=scale_and_shift, observation_space=new_obs_space)
    >>> obs, info = envs.reset(seed=123)
    >>> obs
    array([[-1.9635296, -2.0892358, -2.055928 , -2.0631256],
           [-1.9429494, -1.9428282, -1.9061728, -1.9503881],
           [-1.9296501, -2.00127  , -2.0219676, -2.0640786]], dtype=float32)
    >>> envs.close()
c                p  > [         TU ]  U5        Uc$  Ub   X@l        [        X@R                  5      U l        OX0l        Ub  X@l        U R
                  [        U R                  U R                  5      :w  a<  [        SU SUR
                   S[        U R                  U R                  5       35        X l        g)a  Constructor for the transform observation wrapper.

Args:
    env: The vector environment to wrap
    func: A function that will transform the vector observation. If this transformed observation is outside the observation space of ``env.observation_space`` then provide an ``observation_space``.
    observation_space: The observation spaces of the wrapper. If None, then it is computed from ``single_observation_space``. If ``single_observation_space`` is not provided either, then it is assumed to be the same as ``env.observation_space``.
    single_observation_space: The observation space of the non-vectorized environment. If None, then it is assumed the same as ``env.single_observation_space``.
NzFor zl, the observation space and the batched single observation space don't match as expected, observation_space=z#, batched single_observation_space=)	super__init__single_observation_spacer   num_envsobservation_space_single_observation_spacer   func)selfenvr    r   r   	__class__s        i/home/james-whalen/.local/lib/python3.13/site-packages/gymnasium/wrappers/vector/vectorize_observation.pyr   TransformObservation.__init__8   s     	$'30H-)4,mm*& &7"'31I.!![))4==&
 
 se  H  IL  I^  I^  H_  _B  CN  OS  Ol  Ol  nr  n{  n{  C|  B}  ~ 	    c                $    U R                  U5      $ )z)Apply function to the vector observation.)r    r!   observationss     r$   r)   !TransformObservation.observations]   s    yy&&r&   )r   r    r   r   )NN)r"   r   r    zCallable[[ObsType], Any]r   Space | Noner   r+   r)   r   returnr   )	__name__
__module____qualname____firstlineno____doc__r   r)   __static_attributes____classcell__r#   s   @r$   r   r      sL    !N +/15## '# (	#
 #/# #J' 'r&   r   c                  n   ^  \ rS rSrSr " S S\5      r      S	U 4S jjr    S
S jrSS jr	Sr
U =r$ )VectorizeTransformObservationb   a  Vectorizes a single-agent transform observation wrapper for vector environments.

Most of the lambda observation wrappers for single agent environments have vectorized implementations,
it is advised that users simply use those instead via importing from `gymnasium.wrappers.vector...`.
The following example illustrate use-cases where a custom lambda observation wrapper is required.

Example - The normal observation:
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> envs.close()
    >>> obs
    array([[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282],
           [ 0.02852531,  0.02858594,  0.0469136 ,  0.02480598],
           [ 0.03517495, -0.000635  , -0.01098382, -0.03203924]],
          dtype=float32)

Example - Applying a custom lambda observation wrapper that duplicates the observation from the environment
    >>> import numpy as np
    >>> import gymnasium as gym
    >>> from gymnasium.spaces import Box
    >>> from gymnasium.wrappers import TransformObservation
    >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync")
    >>> old_space = envs.single_observation_space
    >>> new_space = Box(low=np.array([old_space.low, old_space.low]), high=np.array([old_space.high, old_space.high]))
    >>> envs = VectorizeTransformObservation(envs, wrapper=TransformObservation, func=lambda x: np.array([x, x]), observation_space=new_space)
    >>> obs, info = envs.reset(seed=123)
    >>> envs.close()
    >>> obs
    array([[[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282],
            [ 0.01823519, -0.0446179 , -0.02796401, -0.03156282]],
    <BLANKLINE>
           [[ 0.02852531,  0.02858594,  0.0469136 ,  0.02480598],
            [ 0.02852531,  0.02858594,  0.0469136 ,  0.02480598]],
    <BLANKLINE>
           [[ 0.03517495, -0.000635  , -0.01098382, -0.03203924],
            [ 0.03517495, -0.000635  , -0.01098382, -0.03203924]]],
          dtype=float32)
c                  "    \ rS rSrSrSS jrSrg)(VectorizeTransformObservation._SingleEnv   z@Fake single-agent environment used for the single-agent wrapper.c                    Xl         g)z%Constructor for the fake environment.Nr   )r!   r   s     r$   r   1VectorizeTransformObservation._SingleEnv.__init__   s    %6"r&   r=   N)r   r   )r.   r/   r0   r1   r2   r   r3    r&   r$   
_SingleEnvr:      s
    N	7r&   r@   c                z  > [         TU ]  U5        SUR                  ;  a%  [        SU S35        [        R
                  U l        O8[        UR                  S   [        5      (       d   eUR                  S   U l        U" U R                  U R                  R                  5      40 UD6U l        U R                  R                  U l
        [        U R                  U R                  5      U l        U R                  U R                  R                  :H  U l        [!        U R                  U R                  5      U l        g)zConstructor for the vectorized transform observation wrapper.

Args:
    env: The vector environment to wrap.
    wrapper: The wrapper to vectorize
    **kwargs: Keyword argument for the wrapper
autoreset_modezVector environment (z+) is missing `autoreset_mode` metadata key.N)r   r   metadatar   r   	NEXT_STEPrB   
isinstancer@   r"   r   wrapperr   r   r   same_outr   out)r!   r"   rF   kwargsr#   s       r$   r   &VectorizeTransformObservation.__init__   s    	3<</&se+VW #0"9"9Dcll+;<mLLLL"%,,/?"@DOODHH==>
BH
 )-(F(F%!,))4=="
 ..$((2L2LL%d&C&CT]]Sr&   c                P   U R                   R                  U5      u  p#pEnU R                  U5      nU R                  [        R
                  :X  aT  SU;   aN  US   n[        [        XvS   5      5       H.  u  nu  pU
(       d  M  U R                  R                  U	5      Xx'   M0     X#XEU4$ )zoSteps through the vector environments, transforming the observation and for final obs individually transformed.	final_obs
_final_obs)
r"   stepr)   rB   r   	SAME_STEP	enumerateziprF   observation)r!   actionsobsrewardsterminationstruncationsinfosrL   isub_obshas_final_obss              r$   rN   "VectorizeTransformObservation.step   s     :>w9O6l$-"9"99kU>Rk*I/8I\230++G !=#'<<#;#;G#DIL	0 \==r&   c                X  ^  T R                   (       a>  [        T R                  [        U 4S j[	        T R
                  U5       5       5      U5      $ [        [        T R                  [        U 4S j[	        T R                  R
                  U5       5       5      T R                  5      5      $ )zIterates over the vector observations applying the single-agent wrapper ``observation`` then concatenates the observations together again.c              3  Z   >#    U  H   nTR                   R                  U5      v   M"     g 7fNrF   r    .0rT   r!   s     r$   	<genexpr>=VectorizeTransformObservation.observations.<locals>.<genexpr>   s*      L LL%%c**L   (+c              3  Z   >#    U  H   nTR                   R                  U5      v   M"     g 7fr_   r`   ra   s     r$   rc   rd      s*      #TC ))#..#Tre   )	rG   r   r   tupler   r   r   r"   rH   r(   s   ` r$   r)   *VectorizeTransformObservation.observations   s    ==-- &t'='=|L    11 #*488+E+E|#T  HH	 	r&   )rB   r   rH   rG   r   rF   )r"   r   rF   z0type[transform_observation.TransformObservation]rI   r   )rS   r	   r-   z?tuple[ObsType, ArrayType, ArrayType, ArrayType, dict[str, Any]]r,   )r.   r/   r0   r1   r2   r
   r@   r   rN   r)   r3   r4   r5   s   @r$   r7   r7   b   s[    &P7S 7!T!T B!T 	!TF>>	H>$ r&   r7   c                  0   ^  \ rS rSrSrSU 4S jjrSrU =r$ )FilterObservation   a|  Vector wrapper for filtering dict or tuple observation spaces.

Example - Create a vectorized environment with a Dict space to demonstrate how to filter keys:
    >>> import numpy as np
    >>> import gymnasium as gym
    >>> from gymnasium.spaces import Dict, Box
    >>> from gymnasium.wrappers import TransformObservation
    >>> from gymnasium.wrappers.vector import VectorizeTransformObservation, FilterObservation
    >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync")
    >>> make_dict = lambda x: {"obs": x, "junk": np.array([0.0])}
    >>> new_space = Dict({"obs": envs.single_observation_space, "junk": Box(low=-1.0, high=1.0)})
    >>> envs = VectorizeTransformObservation(env=envs, wrapper=TransformObservation, func=make_dict, observation_space=new_space)
    >>> envs = FilterObservation(envs, ["obs"])
    >>> obs, info = envs.reset(seed=123)
    >>> envs.close()
    >>> obs
    {'obs': array([[ 0.01823519, -0.0446179 , -0.02796401, -0.03156282],
           [ 0.02852531,  0.02858594,  0.0469136 ,  0.02480598],
           [ 0.03517495, -0.000635  , -0.01098382, -0.03203924]],
          dtype=float32)}
c                @   > [         TU ]  U[        R                  US9  g)zConstructor for the filter observation wrapper.

Args:
    env: The vector environment to wrap
    filter_keys: The subspaces to be included, use a list of strings or integers for ``Dict`` and ``Tuple`` spaces respectivesly
)filter_keysN)r   r   r   rj   )r!   r"   rm   r#   s      r$   r   FilterObservation.__init__   s%     	&88k 	 	
r&   r?   )r"   r   rm   zSequence[str | int]r.   r/   r0   r1   r2   r   r3   r4   r5   s   @r$   rj   rj      s    ,	
 	
r&   rj   c                  0   ^  \ rS rSrSrSU 4S jjrSrU =r$ )FlattenObservationi  a  Observation wrapper that flattens the observation.

Example:
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 96, 96, 3)
    >>> envs = FlattenObservation(envs)
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 27648)
    >>> envs.close()
c                B   > [         TU ]  U[        R                  5        g)zConstructor for any environment's observation space that implements ``spaces.utils.flatten_space`` and ``spaces.utils.flatten``.

Args:
    env:  The vector environment to wrap
N)r   r   r   rq   )r!   r"   r#   s     r$   r   FlattenObservation.__init__  s     	3FFGr&   r?   )r"   r   ro   r5   s   @r$   rq   rq     s    H Hr&   rq   c                  4   ^  \ rS rSrSrSSU 4S jjjrSrU =r$ )GrayscaleObservationi  a  Observation wrapper that converts an RGB image to grayscale.

Example:
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 96, 96, 3)
    >>> envs = GrayscaleObservation(envs)
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 96, 96)
    >>> envs.close()
c                @   > [         TU ]  U[        R                  US9  g)zConstructor for an RGB image based environments to make the image grayscale.

Args:
    env: The vector environment to wrap
    keep_dim: If to keep the channel in the observation, if ``True``, ``obs.shape == 3`` else ``obs.shape == 2``
)keep_dimN)r   r   r   ru   )r!   r"   rw   r#   s      r$   r   GrayscaleObservation.__init__+  s%     	&;;h 	 	
r&   r?   )F)r"   r   rw   boolro   r5   s   @r$   ru   ru     s    	
 	
r&   ru   c                  0   ^  \ rS rSrSrSU 4S jjrSrU =r$ )ResizeObservationi7  a  Resizes image observations using OpenCV to shape.

Example:
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 96, 96, 3)
    >>> envs = ResizeObservation(envs, shape=(28, 28))
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 28, 28, 3)
    >>> envs.close()
c                @   > [         TU ]  U[        R                  US9  g)zConstructor that requires an image environment observation space with a shape.

Args:
    env: The vector environment to wrap
    shape: The resized observation shape
shapeN)r   r   r   r{   r!   r"   r~   r#   s      r$   r   ResizeObservation.__init__G  s      	3EEUSr&   r?   )r"   r   r~   ztuple[int, ...]ro   r5   s   @r$   r{   r{   7  s    T Tr&   r{   c                  0   ^  \ rS rSrSrSU 4S jjrSrU =r$ )ReshapeObservationiQ  a  Reshapes array based observations to shapes.

Example:
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CarRacing-v3", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 96, 96, 3)
    >>> envs = ReshapeObservation(envs, shape=(9216, 3))
    >>> obs, info = envs.reset(seed=123)
    >>> obs.shape
    (3, 9216, 3)
    >>> envs.close()
c                @   > [         TU ]  U[        R                  US9  g)zConstructor for env with Box observation space that has a shape product equal to the new shape product.

Args:
    env: The vector environment to wrap
    shape: The reshaped observation space
r}   N)r   r   r   r   r   s      r$   r   ReshapeObservation.__init__a  s      	3FFeTr&   r?   )r"   r   r~   zint | tuple[int, ...]ro   r5   s   @r$   r   r   Q  s    U Ur&   r   c                  <   ^  \ rS rSrSr      SU 4S jjrSrU =r$ )RescaleObservationik  a  Linearly rescales observation to between a minimum and maximum value.

Example:
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("MountainCar-v0", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> obs.min()
    np.float32(-0.46352962)
    >>> obs.max()
    np.float32(0.0)
    >>> envs = RescaleObservation(envs, min_obs=-5.0, max_obs=5.0)
    >>> obs, info = envs.reset(seed=123)
    >>> obs.min()
    np.float32(-0.90849805)
    >>> obs.max()
    np.float32(0.0)
    >>> envs.close()
c                B   > [         TU ]  U[        R                  UUS9  g)zConstructor that requires the env observation spaces to be a :class:`Box`.

Args:
    env: The vector environment to wrap
    min_obs: The new minimum observation bound
    max_obs: The new maximum observation bound
)min_obsmax_obsN)r   r   r   r   )r!   r"   r   r   r#   s       r$   r   RescaleObservation.__init__  s)     	!44	 	 	
r&   r?   )r"   r   r   %np.floating | np.integer | np.ndarrayr   r   ro   r5   s   @r$   r   r   k  s.    &

 7
 7	
 
r&   r   c                  0   ^  \ rS rSrSrSU 4S jjrSrU =r$ )DtypeObservationi  a  Observation wrapper for transforming the dtype of an observation.

Example:
    >>> import numpy as np
    >>> import gymnasium as gym
    >>> envs = gym.make_vec("CartPole-v1", num_envs=3, vectorization_mode="sync")
    >>> obs, info = envs.reset(seed=123)
    >>> obs.dtype
    dtype('float32')
    >>> envs = DtypeObservation(envs, dtype=np.float64)
    >>> obs, info = envs.reset(seed=123)
    >>> obs.dtype
    dtype('float64')
    >>> envs.close()
c                @   > [         TU ]  U[        R                  US9  g)zConstructor for Dtype observation wrapper.

Args:
    env: The vector environment to wrap
    dtype: The new dtype of the observation
)dtypeN)r   r   r   r   )r!   r"   r   r#   s      r$   r   DtypeObservation.__init__  s      	3DDERr&   r?   )r"   r   r   r   ro   r5   s   @r$   r   r     s     S Sr&   r   )*r2   
__future__r   collections.abcr   r   copyr   typingr   numpynp	gymnasiumr   gymnasium.corer	   r
   r   gymnasium.loggerr   gymnasium.vectorr   r   gymnasium.vector.utilsr   r   r   r   gymnasium.vector.vector_envr   r   gymnasium.wrappersr   r   r7   rj   rq   ru   r{   r   r   r   r?   r&   r$   <module>r      s    ? " .     0 0 ! @ X X @ 4K'3 K'\z$< zz 
5  
FH6 H2
8 
8T5 T4U6 U4&
6 &
RS4 Sr&   