from __future__ import annotations
import copy
import os
from typing import TypeVar
import gymnasium as gym
import numpy as np
from gymnasium import Wrapper
from gymnasium.utils import RecordConstructorArgs
from gymnasium.wrappers import RecordVideo
from highway_env import utils
from highway_env.envs.common.action import Action, ActionType, action_factory
from highway_env.envs.common.finite_mdp import finite_mdp
from highway_env.envs.common.graphics import EnvViewer
from highway_env.envs.common.observation import ObservationType, observation_factory
from highway_env.vehicle.behavior import IDMVehicle
from highway_env.vehicle.kinematics import Vehicle
Observation = TypeVar("Observation")
[docs]
class AbstractEnv(gym.Env):
"""
A generic environment for various tasks involving a vehicle driving on a road.
The environment contains a road populated with vehicles, and a controlled ego-vehicle that can change lane and
speed. The action space is fixed, but the observation space and reward function must be defined in the
environment implementations.
"""
observation_type: ObservationType
action_type: ActionType
_record_video_wrapper: RecordVideo | None
metadata = {
"render_modes": ["human", "rgb_array"],
}
PERCEPTION_DISTANCE = 5.0 * Vehicle.MAX_SPEED
"""The maximum distance of any vehicle present in the observation [m]"""
def __init__(self, config: dict = None, render_mode: str | None = None) -> None:
super().__init__()
# Configuration
self.config = self.default_config()
self.configure(config)
# Scene
self.road = None
self.controlled_vehicles = []
# Spaces
self.action_type = None
self.action_space = None
self.observation_type = None
self.observation_space = None
self.define_spaces()
# Running
self.time = 0 # Simulation time
self.steps = 0 # Actions performed
self.done = False
# Rendering
self.viewer = None
self._record_video_wrapper = None
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
self.enable_auto_render = False
self.reset()
@property
def vehicle(self) -> Vehicle:
"""First (default) controlled vehicle."""
return self.controlled_vehicles[0] if self.controlled_vehicles else None
@vehicle.setter
def vehicle(self, vehicle: Vehicle) -> None:
"""Set a unique controlled vehicle."""
self.controlled_vehicles = [vehicle]
[docs]
@classmethod
def default_config(cls) -> dict:
"""
Default environment configuration.
Can be overloaded in environment implementations, or by calling configure().
:return: a configuration dict
"""
return {
"observation": {"type": "Kinematics"},
"action": {"type": "DiscreteMetaAction"},
"simulation_frequency": 15, # [Hz]
"policy_frequency": 1, # [Hz]
"other_vehicles_type": "highway_env.vehicle.behavior.IDMVehicle",
"screen_width": 600, # [px]
"screen_height": 150, # [px]
"centering_position": [0.3, 0.5],
"scaling": 5.5,
"show_trajectories": False,
"render_agent": True,
"offscreen_rendering": os.environ.get("OFFSCREEN_RENDERING", "0") == "1",
"manual_control": False,
"real_time_rendering": False,
}
def configure(self, config: dict) -> None:
if config:
self.config.update(config)
def update_metadata(self, video_real_time_ratio=2):
frames_freq = (
self.config["simulation_frequency"]
if self._record_video_wrapper
else self.config["policy_frequency"]
)
self.metadata["render_fps"] = video_real_time_ratio * frames_freq
[docs]
def define_spaces(self) -> None:
"""
Set the types and spaces of observation and action from config.
"""
self.observation_type = observation_factory(self, self.config["observation"])
self.action_type = action_factory(self, self.config["action"])
self.observation_space = self.observation_type.space()
self.action_space = self.action_type.space()
[docs]
def _reward(self, action: Action) -> float:
"""
Return the reward associated with performing a given action and ending up in the current state.
:param action: the last action performed
:return: the reward
"""
raise NotImplementedError
[docs]
def _rewards(self, action: Action) -> dict[str, float]:
"""
Returns a multi-objective vector of rewards.
If implemented, this reward vector should be aggregated into a scalar in _reward().
This vector value should only be returned inside the info dict.
:param action: the last action performed
:return: a dict of {'reward_name': reward_value}
"""
raise NotImplementedError
[docs]
def _is_terminated(self) -> bool:
"""
Check whether the current state is a terminal state
:return:is the state terminal
"""
raise NotImplementedError
[docs]
def _is_truncated(self) -> bool:
"""
Check we truncate the episode at the current step
:return: is the episode truncated
"""
raise NotImplementedError
[docs]
def _info(self, obs: Observation, action: Action | None = None) -> dict:
"""
Return a dictionary of additional information
:param obs: current observation
:param action: current action
:return: info dict
"""
info = {
"speed": self.vehicle.speed,
"crashed": self.vehicle.crashed,
"action": action,
}
try:
info["rewards"] = self._rewards(action)
except NotImplementedError:
pass
return info
[docs]
def reset(
self,
*,
seed: int | None = None,
options: dict | None = None,
) -> tuple[Observation, dict]:
"""
Reset the environment to it's initial configuration
:param seed: The seed that is used to initialize the environment's PRNG
:param options: Allows the environment configuration to specified through `options["config"]`
:return: the observation of the reset state
"""
super().reset(seed=seed, options=options)
if options and "config" in options:
self.configure(options["config"])
self.update_metadata()
self.define_spaces() # First, to set the controlled vehicle class depending on action space
self.time = self.steps = 0
self.done = False
self._reset()
self.define_spaces() # Second, to link the obs and actions to the vehicles once the scene is created
obs = self.observation_type.observe()
info = self._info(obs, action=self.action_space.sample())
if self.render_mode == "human":
self.render()
return obs, info
[docs]
def _reset(self) -> None:
"""
Reset the scene: roads and vehicles.
This method must be overloaded by the environments.
"""
raise NotImplementedError()
[docs]
def step(self, action: Action) -> tuple[Observation, float, bool, bool, dict]:
"""
Perform an action and step the environment dynamics.
The action is executed by the ego-vehicle, and all other vehicles on the road performs their default behaviour
for several simulation timesteps until the next decision making step.
:param action: the action performed by the ego-vehicle
:return: a tuple (observation, reward, terminated, truncated, info)
"""
if self.road is None or self.vehicle is None:
raise NotImplementedError(
"The road and vehicle must be initialized in the environment implementation"
)
self.time += 1 / self.config["policy_frequency"]
self._simulate(action)
obs = self.observation_type.observe()
reward = self._reward(action)
terminated = self._is_terminated()
truncated = self._is_truncated()
info = self._info(obs, action)
if self.render_mode == "human":
self.render()
return obs, reward, terminated, truncated, info
[docs]
def _simulate(self, action: Action | None = None) -> None:
"""Perform several steps of simulation with constant action."""
frames = int(
self.config["simulation_frequency"] // self.config["policy_frequency"]
)
for frame in range(frames):
# Forward action to the vehicle
if (
action is not None
and not self.config["manual_control"]
and self.steps
% int(
self.config["simulation_frequency"]
// self.config["policy_frequency"]
)
== 0
):
self.action_type.act(action)
self.road.act()
self.road.step(1 / self.config["simulation_frequency"])
self.steps += 1
# Automatically render intermediate simulation steps if a viewer has been launched
# Ignored if the rendering is done offscreen
if (
frame < frames - 1
): # Last frame will be rendered through env.render() as usual
self._automatic_rendering()
self.enable_auto_render = False
[docs]
def render(self) -> np.ndarray | None:
"""
Render the environment.
Create a viewer if none exists, and use it to render an image.
"""
if self.render_mode is None:
assert self.spec is not None
gym.logger.warn(
"You are calling render method without specifying any render mode. "
"You can specify the render_mode at initialization, "
f'e.g. gym.make("{self.spec.id}", render_mode="rgb_array")'
)
return
if self.viewer is None:
self.viewer = EnvViewer(self)
self.enable_auto_render = True
self.viewer.display()
if not self.viewer.offscreen:
self.viewer.handle_events()
if self.render_mode == "rgb_array":
image = self.viewer.get_image()
return image
[docs]
def close(self) -> None:
"""
Close the environment.
Will close the environment viewer if it exists.
"""
self.done = True
if self.viewer is not None:
self.viewer.close()
self.viewer = None
def get_available_actions(self) -> list[int]:
return self.action_type.get_available_actions()
def set_record_video_wrapper(self, wrapper: RecordVideo):
self._record_video_wrapper = wrapper
self.update_metadata()
[docs]
def _automatic_rendering(self) -> None:
"""
Automatically render the intermediate frames while an action is still ongoing.
This allows to render the whole video and not only single steps corresponding to agent decision-making.
If a RecordVideo wrapper has been set, use it to capture intermediate frames.
"""
if self.viewer is not None and self.enable_auto_render:
if self._record_video_wrapper and self._record_video_wrapper.video_recorder:
self._record_video_wrapper.video_recorder.capture_frame()
else:
self.render()
[docs]
def simplify(self) -> AbstractEnv:
"""
Return a simplified copy of the environment where distant vehicles have been removed from the road.
This is meant to lower the policy computational load while preserving the optimal actions set.
:return: a simplified environment state
"""
state_copy = copy.deepcopy(self)
state_copy.road.vehicles = [
state_copy.vehicle
] + state_copy.road.close_vehicles_to(
state_copy.vehicle, self.PERCEPTION_DISTANCE
)
return state_copy
[docs]
def change_vehicles(self, vehicle_class_path: str) -> AbstractEnv:
"""
Change the type of all vehicles on the road
:param vehicle_class_path: The path of the class of behavior for other vehicles
Example: "highway_env.vehicle.behavior.IDMVehicle"
:return: a new environment with modified behavior model for other vehicles
"""
vehicle_class = utils.class_from_path(vehicle_class_path)
env_copy = copy.deepcopy(self)
vehicles = env_copy.road.vehicles
for i, v in enumerate(vehicles):
if v is not env_copy.vehicle:
vehicles[i] = vehicle_class.create_from(v)
return env_copy
def set_preferred_lane(self, preferred_lane: int = None) -> AbstractEnv:
env_copy = copy.deepcopy(self)
if preferred_lane:
for v in env_copy.road.vehicles:
if isinstance(v, IDMVehicle):
v.route = [(lane[0], lane[1], preferred_lane) for lane in v.route]
# Vehicle with lane preference are also less cautious
v.LANE_CHANGE_MAX_BRAKING_IMPOSED = 1000
return env_copy
def set_route_at_intersection(self, _to: str) -> AbstractEnv:
env_copy = copy.deepcopy(self)
for v in env_copy.road.vehicles:
if isinstance(v, IDMVehicle):
v.set_route_at_intersection(_to)
return env_copy
def set_vehicle_field(self, args: tuple[str, object]) -> AbstractEnv:
field, value = args
env_copy = copy.deepcopy(self)
for v in env_copy.road.vehicles:
if v is not self.vehicle:
setattr(v, field, value)
return env_copy
def call_vehicle_method(self, args: tuple[str, tuple[object]]) -> AbstractEnv:
method, method_args = args
env_copy = copy.deepcopy(self)
for i, v in enumerate(env_copy.road.vehicles):
if hasattr(v, method):
env_copy.road.vehicles[i] = getattr(v, method)(*method_args)
return env_copy
def randomize_behavior(self) -> AbstractEnv:
env_copy = copy.deepcopy(self)
for v in env_copy.road.vehicles:
if isinstance(v, IDMVehicle):
v.randomize_behavior()
return env_copy
def to_finite_mdp(self):
return finite_mdp(self, time_quantization=1 / self.config["policy_frequency"])
def __deepcopy__(self, memo):
"""Perform a deep copy but without copying the environment viewer."""
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k not in ["viewer", "_record_video_wrapper"]:
setattr(result, k, copy.deepcopy(v, memo))
else:
setattr(result, k, None)
return result
[docs]
class MultiAgentWrapper(Wrapper, RecordConstructorArgs):
def __init__(self, env):
Wrapper.__init__(self, env)
RecordConstructorArgs.__init__(self)
[docs]
def step(self, action):
obs, _, _, truncated, info = super().step(action)
reward = info["agents_rewards"]
terminated = info["agents_terminated"]
return obs, reward, terminated, truncated, info