from __future__ import annotations
import numpy as np
from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.envs.common.action import Action
from highway_env.road.road import Road, RoadNetwork
from highway_env.utils import near_split
from highway_env.vehicle.controller import ControlledVehicle
from highway_env.vehicle.kinematics import Vehicle
Observation = np.ndarray
[docs]
class HighwayEnv(AbstractEnv):
"""
A highway driving environment.
The vehicle is driving on a straight highway with several lanes, and is rewarded for reaching a high speed,
staying on the rightmost lanes and avoiding collisions.
"""
[docs]
@classmethod
def default_config(cls) -> dict:
config = super().default_config()
config.update(
{
"observation": {"type": "Kinematics"},
"action": {
"type": "DiscreteMetaAction",
},
"lanes_count": 4,
"vehicles_count": 50,
"controlled_vehicles": 1,
"initial_lane_id": None,
"duration": 40, # [s]
"ego_spacing": 2,
"vehicles_density": 1,
"collision_reward": -1, # The reward received when colliding with a vehicle.
"right_lane_reward": 0.1, # The reward received when driving on the right-most lanes, linearly mapped to
# zero for other lanes.
"high_speed_reward": 0.4, # The reward received when driving at full speed, linearly mapped to zero for
# lower speeds according to config["reward_speed_range"].
"lane_change_reward": 0, # The reward received at each lane change action.
"reward_speed_range": [20, 30],
"normalize_reward": True,
"offroad_terminal": False,
}
)
return config
def _reset(self) -> None:
self._create_road()
self._create_vehicles()
def _create_road(self) -> None:
"""Create a road composed of straight adjacent lanes."""
self.road = Road(
network=RoadNetwork.straight_road_network(
self.config["lanes_count"], speed_limit=30
),
np_random=self.np_random,
record_history=self.config["show_trajectories"],
)
def _create_vehicles(self) -> None:
"""Create some new random vehicles of a given type, and add them on the road."""
other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
other_per_controlled = near_split(
self.config["vehicles_count"], num_bins=self.config["controlled_vehicles"]
)
self.controlled_vehicles = []
for others in other_per_controlled:
vehicle = Vehicle.create_random(
self.road,
speed=25,
lane_id=self.config["initial_lane_id"],
spacing=self.config["ego_spacing"],
)
vehicle = self.action_type.vehicle_class(
self.road, vehicle.position, vehicle.heading, vehicle.speed
)
self.controlled_vehicles.append(vehicle)
self.road.vehicles.append(vehicle)
for _ in range(others):
vehicle = other_vehicles_type.create_random(
self.road, spacing=1 / self.config["vehicles_density"]
)
vehicle.randomize_behavior()
self.road.vehicles.append(vehicle)
def _reward(self, action: Action) -> float:
"""
The reward is defined to foster driving at high speed, on the rightmost lanes, and to avoid collisions.
:param action: the last action performed
:return: the corresponding reward
"""
rewards = self._rewards(action)
reward = sum(
self.config.get(name, 0) * reward for name, reward in rewards.items()
)
if self.config["normalize_reward"]:
reward = utils.lmap(
reward,
[
self.config["collision_reward"],
self.config["high_speed_reward"] + self.config["right_lane_reward"],
],
[0, 1],
)
reward *= rewards["on_road_reward"]
return reward
def _rewards(self, action: Action) -> dict[str, float]:
neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)
lane = (
self.vehicle.target_lane_index[2]
if isinstance(self.vehicle, ControlledVehicle)
else self.vehicle.lane_index[2]
)
# Use forward speed rather than speed, see https://github.com/eleurent/highway-env/issues/268
forward_speed = self.vehicle.speed * np.cos(self.vehicle.heading)
scaled_speed = utils.lmap(
forward_speed, self.config["reward_speed_range"], [0, 1]
)
return {
"collision_reward": float(self.vehicle.crashed),
"right_lane_reward": lane / max(len(neighbours) - 1, 1),
"high_speed_reward": np.clip(scaled_speed, 0, 1),
"on_road_reward": float(self.vehicle.on_road),
}
def _is_terminated(self) -> bool:
"""The episode is over if the ego vehicle crashed."""
return (
self.vehicle.crashed
or self.config["offroad_terminal"]
and not self.vehicle.on_road
)
def _is_truncated(self) -> bool:
"""The episode is truncated if the time limit is reached."""
return self.time >= self.config["duration"]
class HighwayEnvFast(HighwayEnv):
"""
A variant of highway-v0 with faster execution:
- lower simulation frequency
- fewer vehicles in the scene (and fewer lanes, shorter episode duration)
- only check collision of controlled vehicles with others
"""
@classmethod
def default_config(cls) -> dict:
cfg = super().default_config()
cfg.update(
{
"simulation_frequency": 5,
"lanes_count": 3,
"vehicles_count": 20,
"duration": 30, # [s]
"ego_spacing": 1.5,
}
)
return cfg
def _create_vehicles(self) -> None:
super()._create_vehicles()
# Disable collision check for uncontrolled vehicles
for vehicle in self.road.vehicles:
if vehicle not in self.controlled_vehicles:
vehicle.check_collisions = False