Source code for highway_env.envs.common.abstract

import copy
import os
from typing import List, Tuple, Optional, Callable, TypeVar, Generic, Union, Dict, Text
import gymnasium as gym
from gymnasium import Wrapper
from gymnasium.wrappers import RecordVideo
from gymnasium.utils import seeding
import numpy as np

from highway_env import utils
from highway_env.envs.common.action import action_factory, Action, DiscreteMetaAction, ActionType
from highway_env.envs.common.observation import observation_factory, ObservationType
from highway_env.envs.common.finite_mdp import finite_mdp
from highway_env.envs.common.graphics import EnvViewer
from highway_env.vehicle.behavior import IDMVehicle, LinearVehicle
from highway_env.vehicle.controller import MDPVehicle
from highway_env.vehicle.kinematics import Vehicle

Observation = TypeVar("Observation")


[docs]class AbstractEnv(gym.Env): """ A generic environment for various tasks involving a vehicle driving on a road. The environment contains a road populated with vehicles, and a controlled ego-vehicle that can change lane and speed. The action space is fixed, but the observation space and reward function must be defined in the environment implementations. """ observation_type: ObservationType action_type: ActionType _record_video_wrapper: Optional[RecordVideo] metadata = { 'render_modes': ['human', 'rgb_array'], } PERCEPTION_DISTANCE = 5.0 * Vehicle.MAX_SPEED """The maximum distance of any vehicle present in the observation [m]""" def __init__(self, config: dict = None, render_mode: Optional[str] = None) -> None: super().__init__() # Configuration self.config = self.default_config() self.configure(config) # Scene self.road = None self.controlled_vehicles = [] # Spaces self.action_type = None self.action_space = None self.observation_type = None self.observation_space = None self.define_spaces() # Running self.time = 0 # Simulation time self.steps = 0 # Actions performed self.done = False # Rendering self.viewer = None self._record_video_wrapper = None assert render_mode is None or render_mode in self.metadata["render_modes"] self.render_mode = render_mode self.enable_auto_render = False self.reset() @property def vehicle(self) -> Vehicle: """First (default) controlled vehicle.""" return self.controlled_vehicles[0] if self.controlled_vehicles else None @vehicle.setter def vehicle(self, vehicle: Vehicle) -> None: """Set a unique controlled vehicle.""" self.controlled_vehicles = [vehicle]
[docs] @classmethod def default_config(cls) -> dict: """ Default environment configuration. Can be overloaded in environment implementations, or by calling configure(). :return: a configuration dict """ return { "observation": { "type": "Kinematics" }, "action": { "type": "DiscreteMetaAction" }, "simulation_frequency": 15, # [Hz] "policy_frequency": 1, # [Hz] "other_vehicles_type": "highway_env.vehicle.behavior.IDMVehicle", "screen_width": 600, # [px] "screen_height": 150, # [px] "centering_position": [0.3, 0.5], "scaling": 5.5, "show_trajectories": False, "render_agent": True, "offscreen_rendering": os.environ.get("OFFSCREEN_RENDERING", "0") == "1", "manual_control": False, "real_time_rendering": False }
def configure(self, config: dict) -> None: if config: self.config.update(config) def update_metadata(self, video_real_time_ratio=2): frames_freq = self.config["simulation_frequency"] \ if self._record_video_wrapper else self.config["policy_frequency"] self.metadata['render_fps'] = video_real_time_ratio * frames_freq
[docs] def define_spaces(self) -> None: """ Set the types and spaces of observation and action from config. """ self.observation_type = observation_factory(self, self.config["observation"]) self.action_type = action_factory(self, self.config["action"]) self.observation_space = self.observation_type.space() self.action_space = self.action_type.space()
[docs] def _reward(self, action: Action) -> float: """ Return the reward associated with performing a given action and ending up in the current state. :param action: the last action performed :return: the reward """ raise NotImplementedError
[docs] def _rewards(self, action: Action) -> Dict[Text, float]: """ Returns a multi-objective vector of rewards. If implemented, this reward vector should be aggregated into a scalar in _reward(). This vector value should only be returned inside the info dict. :param action: the last action performed :return: a dict of {'reward_name': reward_value} """ raise NotImplementedError
[docs] def _is_terminated(self) -> bool: """ Check whether the current state is a terminal state :return:is the state terminal """ raise NotImplementedError
[docs] def _is_truncated(self) -> bool: """ Check we truncate the episode at the current step :return: is the episode truncated """ raise NotImplementedError
[docs] def _info(self, obs: Observation, action: Optional[Action] = None) -> dict: """ Return a dictionary of additional information :param obs: current observation :param action: current action :return: info dict """ info = { "speed": self.vehicle.speed, "crashed": self.vehicle.crashed, "action": action, } try: info["rewards"] = self._rewards(action) except NotImplementedError: pass return info
[docs] def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None, ) -> Tuple[Observation, dict]: """ Reset the environment to it's initial configuration :param seed: The seed that is used to initialize the environment's PRNG :param options: Allows the environment configuration to specified through `options["config"]` :return: the observation of the reset state """ super().reset(seed=seed, options=options) if options and "config" in options: self.configure(options["config"]) self.update_metadata() self.define_spaces() # First, to set the controlled vehicle class depending on action space self.time = self.steps = 0 self.done = False self._reset() self.define_spaces() # Second, to link the obs and actions to the vehicles once the scene is created obs = self.observation_type.observe() info = self._info(obs, action=self.action_space.sample()) if self.render_mode == 'human': self.render() return obs, info
[docs] def _reset(self) -> None: """ Reset the scene: roads and vehicles. This method must be overloaded by the environments. """ raise NotImplementedError()
[docs] def step(self, action: Action) -> Tuple[Observation, float, bool, bool, dict]: """ Perform an action and step the environment dynamics. The action is executed by the ego-vehicle, and all other vehicles on the road performs their default behaviour for several simulation timesteps until the next decision making step. :param action: the action performed by the ego-vehicle :return: a tuple (observation, reward, terminated, truncated, info) """ if self.road is None or self.vehicle is None: raise NotImplementedError("The road and vehicle must be initialized in the environment implementation") self.time += 1 / self.config["policy_frequency"] self._simulate(action) obs = self.observation_type.observe() reward = self._reward(action) terminated = self._is_terminated() truncated = self._is_truncated() info = self._info(obs, action) if self.render_mode == 'human': self.render() return obs, reward, terminated, truncated, info
[docs] def _simulate(self, action: Optional[Action] = None) -> None: """Perform several steps of simulation with constant action.""" frames = int(self.config["simulation_frequency"] // self.config["policy_frequency"]) for frame in range(frames): # Forward action to the vehicle if action is not None \ and not self.config["manual_control"] \ and self.steps % int(self.config["simulation_frequency"] // self.config["policy_frequency"]) == 0: self.action_type.act(action) self.road.act() self.road.step(1 / self.config["simulation_frequency"]) self.steps += 1 # Automatically render intermediate simulation steps if a viewer has been launched # Ignored if the rendering is done offscreen if frame < frames - 1: # Last frame will be rendered through env.render() as usual self._automatic_rendering() self.enable_auto_render = False
[docs] def render(self) -> Optional[np.ndarray]: """ Render the environment. Create a viewer if none exists, and use it to render an image. """ if self.render_mode is None: assert self.spec is not None gym.logger.warn( "You are calling render method without specifying any render mode. " "You can specify the render_mode at initialization, " f'e.g. gym.make("{self.spec.id}", render_mode="rgb_array")' ) return if self.viewer is None: self.viewer = EnvViewer(self) self.enable_auto_render = True self.viewer.display() if not self.viewer.offscreen: self.viewer.handle_events() if self.render_mode == 'rgb_array': image = self.viewer.get_image() return image
[docs] def close(self) -> None: """ Close the environment. Will close the environment viewer if it exists. """ self.done = True if self.viewer is not None: self.viewer.close() self.viewer = None
def get_available_actions(self) -> List[int]: return self.action_type.get_available_actions() def set_record_video_wrapper(self, wrapper: RecordVideo): self._record_video_wrapper = wrapper self.update_metadata()
[docs] def _automatic_rendering(self) -> None: """ Automatically render the intermediate frames while an action is still ongoing. This allows to render the whole video and not only single steps corresponding to agent decision-making. If a RecordVideo wrapper has been set, use it to capture intermediate frames. """ if self.viewer is not None and self.enable_auto_render: if self._record_video_wrapper and self._record_video_wrapper.video_recorder: self._record_video_wrapper.video_recorder.capture_frame() else: self.render()
[docs] def simplify(self) -> 'AbstractEnv': """ Return a simplified copy of the environment where distant vehicles have been removed from the road. This is meant to lower the policy computational load while preserving the optimal actions set. :return: a simplified environment state """ state_copy = copy.deepcopy(self) state_copy.road.vehicles = [state_copy.vehicle] + state_copy.road.close_vehicles_to( state_copy.vehicle, self.PERCEPTION_DISTANCE) return state_copy
[docs] def change_vehicles(self, vehicle_class_path: str) -> 'AbstractEnv': """ Change the type of all vehicles on the road :param vehicle_class_path: The path of the class of behavior for other vehicles Example: "highway_env.vehicle.behavior.IDMVehicle" :return: a new environment with modified behavior model for other vehicles """ vehicle_class = utils.class_from_path(vehicle_class_path) env_copy = copy.deepcopy(self) vehicles = env_copy.road.vehicles for i, v in enumerate(vehicles): if v is not env_copy.vehicle: vehicles[i] = vehicle_class.create_from(v) return env_copy
def set_preferred_lane(self, preferred_lane: int = None) -> 'AbstractEnv': env_copy = copy.deepcopy(self) if preferred_lane: for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): v.route = [(lane[0], lane[1], preferred_lane) for lane in v.route] # Vehicle with lane preference are also less cautious v.LANE_CHANGE_MAX_BRAKING_IMPOSED = 1000 return env_copy def set_route_at_intersection(self, _to: str) -> 'AbstractEnv': env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): v.set_route_at_intersection(_to) return env_copy def set_vehicle_field(self, args: Tuple[str, object]) -> 'AbstractEnv': field, value = args env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if v is not self.vehicle: setattr(v, field, value) return env_copy def call_vehicle_method(self, args: Tuple[str, Tuple[object]]) -> 'AbstractEnv': method, method_args = args env_copy = copy.deepcopy(self) for i, v in enumerate(env_copy.road.vehicles): if hasattr(v, method): env_copy.road.vehicles[i] = getattr(v, method)(*method_args) return env_copy def randomize_behavior(self) -> 'AbstractEnv': env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): v.randomize_behavior() return env_copy def to_finite_mdp(self): return finite_mdp(self, time_quantization=1/self.config["policy_frequency"]) def __deepcopy__(self, memo): """Perform a deep copy but without copying the environment viewer.""" cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): if k not in ['viewer', '_record_video_wrapper']: setattr(result, k, copy.deepcopy(v, memo)) else: setattr(result, k, None) return result
[docs]class MultiAgentWrapper(Wrapper):
[docs] def step(self, action): obs, reward, terminated, truncated, info = super().step(action) reward = info["agents_rewards"] terminated = info["agents_terminated"] truncated = info["agents_truncated"] return obs, reward, terminated, truncated, info