from __future__ import annotations
import functools
import itertools
from typing import TYPE_CHECKING, Callable, Union
import numpy as np
from gymnasium import spaces
from highway_env import utils
from highway_env.utils import Vector
from highway_env.vehicle.controller import MDPVehicle
from highway_env.vehicle.dynamics import BicycleVehicle
from highway_env.vehicle.kinematics import Vehicle
if TYPE_CHECKING:
from highway_env.envs.common.abstract import AbstractEnv
Action = Union[int, np.ndarray]
[docs]
class ActionType:
"""A type of action specifies its definition space, and how actions are executed in the environment"""
def __init__(self, env: AbstractEnv, **kwargs) -> None:
self.env = env
self.__controlled_vehicle = None
[docs]
def space(self) -> spaces.Space:
"""The action space."""
raise NotImplementedError
@property
def vehicle_class(self) -> Callable:
"""
The class of a vehicle able to execute the action.
Must return a subclass of :py:class:`highway_env.vehicle.kinematics.Vehicle`.
"""
raise NotImplementedError
[docs]
def act(self, action: Action) -> None:
"""
Execute the action on the ego-vehicle.
Most of the action mechanics are actually implemented in vehicle.act(action), where
vehicle is an instance of the specified :py:class:`highway_env.envs.common.action.ActionType.vehicle_class`.
Must some pre-processing can be applied to the action based on the ActionType configurations.
:param action: the action to execute
"""
raise NotImplementedError
[docs]
def get_available_actions(self):
"""
For discrete action space, return the list of available actions.
"""
raise NotImplementedError
@property
def controlled_vehicle(self):
"""The vehicle acted upon.
If not set, the first controlled vehicle is used by default."""
return self.__controlled_vehicle or self.env.vehicle
@controlled_vehicle.setter
def controlled_vehicle(self, vehicle):
self.__controlled_vehicle = vehicle
[docs]
class ContinuousAction(ActionType):
"""
An continuous action space for throttle and/or steering angle.
If both throttle and steering are enabled, they are set in this order: [throttle, steering]
The space intervals are always [-1, 1], but are mapped to throttle/steering intervals through configurations.
"""
ACCELERATION_RANGE = (-5, 5.0)
"""Acceleration range: [-x, x], in m/s²."""
STEERING_RANGE = (-np.pi / 4, np.pi / 4)
"""Steering angle range: [-x, x], in rad."""
def __init__(
self,
env: AbstractEnv,
acceleration_range: tuple[float, float] | None = None,
steering_range: tuple[float, float] | None = None,
speed_range: tuple[float, float] | None = None,
longitudinal: bool = True,
lateral: bool = True,
dynamical: bool = False,
clip: bool = True,
**kwargs,
) -> None:
"""
Create a continuous action space.
:param env: the environment
:param acceleration_range: the range of acceleration values [m/s²]
:param steering_range: the range of steering values [rad]
:param speed_range: the range of reachable speeds [m/s]
:param longitudinal: enable throttle control
:param lateral: enable steering control
:param dynamical: whether to simulate dynamics (i.e. friction) rather than kinematics
:param clip: clip action to the defined range
"""
super().__init__(env)
self.acceleration_range = (
acceleration_range if acceleration_range else self.ACCELERATION_RANGE
)
self.steering_range = steering_range if steering_range else self.STEERING_RANGE
self.speed_range = speed_range
self.lateral = lateral
self.longitudinal = longitudinal
if not self.lateral and not self.longitudinal:
raise ValueError(
"Either longitudinal and/or lateral control must be enabled"
)
self.dynamical = dynamical
self.clip = clip
self.size = 2 if self.lateral and self.longitudinal else 1
self.last_action = np.zeros(self.size)
[docs]
def space(self) -> spaces.Box:
return spaces.Box(-1.0, 1.0, shape=(self.size,), dtype=np.float32)
@property
def vehicle_class(self) -> Callable:
return Vehicle if not self.dynamical else BicycleVehicle
def get_action(self, action: np.ndarray):
if self.clip:
action = np.clip(action, -1, 1)
if self.speed_range:
(
self.controlled_vehicle.MIN_SPEED,
self.controlled_vehicle.MAX_SPEED,
) = self.speed_range
if self.longitudinal and self.lateral:
return {
"acceleration": utils.lmap(action[0], [-1, 1], self.acceleration_range),
"steering": utils.lmap(action[1], [-1, 1], self.steering_range),
}
elif self.longitudinal:
return {
"acceleration": utils.lmap(action[0], [-1, 1], self.acceleration_range),
"steering": 0,
}
elif self.lateral:
return {
"acceleration": 0,
"steering": utils.lmap(action[0], [-1, 1], self.steering_range),
}
[docs]
def act(self, action: np.ndarray) -> None:
self.controlled_vehicle.act(self.get_action(action))
self.last_action = action
[docs]
class DiscreteAction(ContinuousAction):
def __init__(
self,
env: AbstractEnv,
acceleration_range: tuple[float, float] | None = None,
steering_range: tuple[float, float] | None = None,
longitudinal: bool = True,
lateral: bool = True,
dynamical: bool = False,
clip: bool = True,
actions_per_axis: int = 3,
**kwargs,
) -> None:
super().__init__(
env,
acceleration_range=acceleration_range,
steering_range=steering_range,
longitudinal=longitudinal,
lateral=lateral,
dynamical=dynamical,
clip=clip,
)
self.actions_per_axis = actions_per_axis
[docs]
def space(self) -> spaces.Discrete:
return spaces.Discrete(self.actions_per_axis**self.size)
[docs]
def act(self, action: int) -> None:
cont_space = super().space()
axes = np.linspace(cont_space.low, cont_space.high, self.actions_per_axis).T
all_actions = list(itertools.product(*axes))
super().act(all_actions[action])
[docs]
class MultiAgentAction(ActionType):
def __init__(self, env: AbstractEnv, action_config: dict, **kwargs) -> None:
super().__init__(env)
self.action_config = action_config
self.agents_action_types = []
for vehicle in self.env.controlled_vehicles:
action_type = action_factory(self.env, self.action_config)
action_type.controlled_vehicle = vehicle
self.agents_action_types.append(action_type)
[docs]
def space(self) -> spaces.Space:
return spaces.Tuple(
[action_type.space() for action_type in self.agents_action_types]
)
@property
def vehicle_class(self) -> Callable:
return action_factory(self.env, self.action_config).vehicle_class
[docs]
def act(self, action: Action) -> None:
assert isinstance(action, tuple)
for agent_action, action_type in zip(action, self.agents_action_types):
action_type.act(agent_action)
[docs]
def get_available_actions(self):
return itertools.product(
*[
action_type.get_available_actions()
for action_type in self.agents_action_types
]
)
def action_factory(env: AbstractEnv, config: dict) -> ActionType:
if config["type"] == "ContinuousAction":
return ContinuousAction(env, **config)
if config["type"] == "DiscreteAction":
return DiscreteAction(env, **config)
elif config["type"] == "DiscreteMetaAction":
return DiscreteMetaAction(env, **config)
elif config["type"] == "MultiAgentAction":
return MultiAgentAction(env, **config)
else:
raise ValueError("Unknown action type")