from __future__ import annotations
import numpy as np
from highway_env import utils
from highway_env.road.road import LaneIndex, Road, Route
from highway_env.utils import Vector
from highway_env.vehicle.controller import ControlledVehicle
from highway_env.vehicle.kinematics import Vehicle
[docs]
class IDMVehicle(ControlledVehicle):
"""
A vehicle using both a longitudinal and a lateral decision policies.
- Longitudinal: the IDM model computes an acceleration given the preceding vehicle's distance and speed.
- Lateral: the MOBIL model decides when to change lane by maximizing the acceleration of nearby vehicles.
"""
# Longitudinal policy parameters
ACC_MAX = 6.0 # [m/s2]
"""Maximum acceleration."""
COMFORT_ACC_MAX = 3.0 # [m/s2]
"""Desired maximum acceleration."""
COMFORT_ACC_MIN = -5.0 # [m/s2]
"""Desired maximum deceleration."""
DISTANCE_WANTED = 5.0 + ControlledVehicle.LENGTH # [m]
"""Desired jam distance to the front vehicle."""
TIME_WANTED = 1.5 # [s]
"""Desired time gap to the front vehicle."""
DELTA = 4.0 # []
"""Exponent of the velocity term."""
DELTA_RANGE = [3.5, 4.5]
"""Range of delta when chosen randomly."""
# Lateral policy parameters
POLITENESS = 0.0 # in [0, 1]
LANE_CHANGE_MIN_ACC_GAIN = 0.2 # [m/s2]
LANE_CHANGE_MAX_BRAKING_IMPOSED = 2.0 # [m/s2]
LANE_CHANGE_DELAY = 1.0 # [s]
def __init__(
self,
road: Road,
position: Vector,
heading: float = 0,
speed: float = 0,
target_lane_index: int = None,
target_speed: float = None,
route: Route = None,
enable_lane_change: bool = True,
timer: float = None,
):
super().__init__(
road, position, heading, speed, target_lane_index, target_speed, route
)
self.enable_lane_change = enable_lane_change
self.timer = timer or (np.sum(self.position) * np.pi) % self.LANE_CHANGE_DELAY
def randomize_behavior(self):
self.DELTA = self.road.np_random.uniform(
low=self.DELTA_RANGE[0], high=self.DELTA_RANGE[1]
)
[docs]
@classmethod
def create_from(cls, vehicle: ControlledVehicle) -> IDMVehicle:
"""
Create a new vehicle from an existing one.
The vehicle dynamics and target dynamics are copied, other properties are default.
:param vehicle: a vehicle
:return: a new vehicle at the same dynamical state
"""
v = cls(
vehicle.road,
vehicle.position,
heading=vehicle.heading,
speed=vehicle.speed,
target_lane_index=vehicle.target_lane_index,
target_speed=vehicle.target_speed,
route=vehicle.route,
timer=getattr(vehicle, "timer", None),
)
return v
[docs]
def act(self, action: dict | str = None):
"""
Execute an action.
For now, no action is supported because the vehicle takes all decisions
of acceleration and lane changes on its own, based on the IDM and MOBIL models.
:param action: the action
"""
if self.crashed:
return
action = {}
# Lateral: MOBIL
self.follow_road()
if self.enable_lane_change:
self.change_lane_policy()
action["steering"] = self.steering_control(self.target_lane_index)
action["steering"] = np.clip(
action["steering"], -self.MAX_STEERING_ANGLE, self.MAX_STEERING_ANGLE
)
# Longitudinal: IDM
front_vehicle, rear_vehicle = self.road.neighbour_vehicles(
self, self.lane_index
)
action["acceleration"] = self.acceleration(
ego_vehicle=self, front_vehicle=front_vehicle, rear_vehicle=rear_vehicle
)
# When changing lane, check both current and target lanes
if self.lane_index != self.target_lane_index:
front_vehicle, rear_vehicle = self.road.neighbour_vehicles(
self, self.target_lane_index
)
target_idm_acceleration = self.acceleration(
ego_vehicle=self, front_vehicle=front_vehicle, rear_vehicle=rear_vehicle
)
action["acceleration"] = min(
action["acceleration"], target_idm_acceleration
)
# action['acceleration'] = self.recover_from_stop(action['acceleration'])
action["acceleration"] = np.clip(
action["acceleration"], -self.ACC_MAX, self.ACC_MAX
)
# Skip ControlledVehicle.act(), or the command will be overridden.
Vehicle.act(self, action)
[docs]
def step(self, dt: float):
"""
Step the simulation.
Increases a timer used for decision policies, and step the vehicle dynamics.
:param dt: timestep
"""
self.timer += dt
super().step(dt)
[docs]
def acceleration(
self,
ego_vehicle: ControlledVehicle,
front_vehicle: Vehicle = None,
rear_vehicle: Vehicle = None,
) -> float:
"""
Compute an acceleration command with the Intelligent Driver Model.
The acceleration is chosen so as to:
- reach a target speed;
- maintain a minimum safety distance (and safety time) w.r.t the front vehicle.
:param ego_vehicle: the vehicle whose desired acceleration is to be computed. It does not have to be an
IDM vehicle, which is why this method is a class method. This allows an IDM vehicle to
reason about other vehicles behaviors even though they may not IDMs.
:param front_vehicle: the vehicle preceding the ego-vehicle
:param rear_vehicle: the vehicle following the ego-vehicle
:return: the acceleration command for the ego-vehicle [m/s2]
"""
if not ego_vehicle or not isinstance(ego_vehicle, Vehicle):
return 0
ego_target_speed = getattr(ego_vehicle, "target_speed", 0)
if ego_vehicle.lane and ego_vehicle.lane.speed_limit is not None:
ego_target_speed = np.clip(
ego_target_speed, 0, ego_vehicle.lane.speed_limit
)
acceleration = self.COMFORT_ACC_MAX * (
1
- np.power(
max(ego_vehicle.speed, 0) / abs(utils.not_zero(ego_target_speed)),
self.DELTA,
)
)
if front_vehicle:
d = ego_vehicle.lane_distance_to(front_vehicle)
acceleration -= self.COMFORT_ACC_MAX * np.power(
self.desired_gap(ego_vehicle, front_vehicle) / utils.not_zero(d), 2
)
return acceleration
[docs]
def desired_gap(
self,
ego_vehicle: Vehicle,
front_vehicle: Vehicle = None,
projected: bool = True,
) -> float:
"""
Compute the desired distance between a vehicle and its leading vehicle.
:param ego_vehicle: the vehicle being controlled
:param front_vehicle: its leading vehicle
:param projected: project 2D velocities in 1D space
:return: the desired distance between the two [m]
"""
d0 = self.DISTANCE_WANTED
tau = self.TIME_WANTED
ab = -self.COMFORT_ACC_MAX * self.COMFORT_ACC_MIN
dv = (
np.dot(ego_vehicle.velocity - front_vehicle.velocity, ego_vehicle.direction)
if projected
else ego_vehicle.speed - front_vehicle.speed
)
d_star = (
d0 + ego_vehicle.speed * tau + ego_vehicle.speed * dv / (2 * np.sqrt(ab))
)
return d_star
[docs]
def change_lane_policy(self) -> None:
"""
Decide when to change lane.
Based on:
- frequency;
- closeness of the target lane;
- MOBIL model.
"""
# If a lane change is already ongoing
if self.lane_index != self.target_lane_index:
# If we are on correct route but bad lane: abort it if someone else is already changing into the same lane
if self.lane_index[:2] == self.target_lane_index[:2]:
for v in self.road.vehicles:
if (
v is not self
and v.lane_index != self.target_lane_index
and isinstance(v, ControlledVehicle)
and v.target_lane_index == self.target_lane_index
):
d = self.lane_distance_to(v)
d_star = self.desired_gap(self, v)
if 0 < d < d_star:
self.target_lane_index = self.lane_index
break
return
# else, at a given frequency,
if not utils.do_every(self.LANE_CHANGE_DELAY, self.timer):
return
self.timer = 0
# decide to make a lane change
for lane_index in self.road.network.side_lanes(self.lane_index):
# Is the candidate lane close enough?
if not self.road.network.get_lane(lane_index).is_reachable_from(
self.position
):
continue
# Only change lane when the vehicle is moving
if np.abs(self.speed) < 1:
continue
# Does the MOBIL model recommend a lane change?
if self.mobil(lane_index):
self.target_lane_index = lane_index
[docs]
def mobil(self, lane_index: LaneIndex) -> bool:
"""
MOBIL lane change model: Minimizing Overall Braking Induced by a Lane change
The vehicle should change lane only if:
- after changing it (and/or following vehicles) can accelerate more;
- it doesn't impose an unsafe braking on its new following vehicle.
:param lane_index: the candidate lane for the change
:return: whether the lane change should be performed
"""
# Is the maneuver unsafe for the new following vehicle?
new_preceding, new_following = self.road.neighbour_vehicles(self, lane_index)
new_following_a = self.acceleration(
ego_vehicle=new_following, front_vehicle=new_preceding
)
new_following_pred_a = self.acceleration(
ego_vehicle=new_following, front_vehicle=self
)
if new_following_pred_a < -self.LANE_CHANGE_MAX_BRAKING_IMPOSED:
return False
# Do I have a planned route for a specific lane which is safe for me to access?
old_preceding, old_following = self.road.neighbour_vehicles(self)
self_pred_a = self.acceleration(ego_vehicle=self, front_vehicle=new_preceding)
if self.route and self.route[0][2] is not None:
# Wrong direction
if np.sign(lane_index[2] - self.target_lane_index[2]) != np.sign(
self.route[0][2] - self.target_lane_index[2]
):
return False
# Unsafe braking required
elif self_pred_a < -self.LANE_CHANGE_MAX_BRAKING_IMPOSED:
return False
# Is there an acceleration advantage for me and/or my followers to change lane?
else:
self_a = self.acceleration(ego_vehicle=self, front_vehicle=old_preceding)
old_following_a = self.acceleration(
ego_vehicle=old_following, front_vehicle=self
)
old_following_pred_a = self.acceleration(
ego_vehicle=old_following, front_vehicle=old_preceding
)
jerk = (
self_pred_a
- self_a
+ self.POLITENESS
* (
new_following_pred_a
- new_following_a
+ old_following_pred_a
- old_following_a
)
)
if jerk < self.LANE_CHANGE_MIN_ACC_GAIN:
return False
# All clear, let's go!
return True
[docs]
def recover_from_stop(self, acceleration: float) -> float:
"""
If stopped on the wrong lane, try a reversing maneuver.
:param acceleration: desired acceleration from IDM
:return: suggested acceleration to recover from being stuck
"""
stopped_speed = 5
safe_distance = 200
# Is the vehicle stopped on the wrong lane?
if self.target_lane_index != self.lane_index and self.speed < stopped_speed:
_, rear = self.road.neighbour_vehicles(self)
_, new_rear = self.road.neighbour_vehicles(
self, self.road.network.get_lane(self.target_lane_index)
)
# Check for free room behind on both lanes
if (not rear or rear.lane_distance_to(self) > safe_distance) and (
not new_rear or new_rear.lane_distance_to(self) > safe_distance
):
# Reverse
return -self.COMFORT_ACC_MAX / 2
return acceleration
[docs]
class LinearVehicle(IDMVehicle):
"""A Vehicle whose longitudinal and lateral controllers are linear with respect to parameters."""
ACCELERATION_PARAMETERS = [0.3, 0.3, 2.0]
STEERING_PARAMETERS = [
ControlledVehicle.KP_HEADING,
ControlledVehicle.KP_HEADING * ControlledVehicle.KP_LATERAL,
]
ACCELERATION_RANGE = np.array(
[
0.5 * np.array(ACCELERATION_PARAMETERS),
1.5 * np.array(ACCELERATION_PARAMETERS),
]
)
STEERING_RANGE = np.array(
[
np.array(STEERING_PARAMETERS) - np.array([0.07, 1.5]),
np.array(STEERING_PARAMETERS) + np.array([0.07, 1.5]),
]
)
TIME_WANTED = 2.5
def __init__(
self,
road: Road,
position: Vector,
heading: float = 0,
speed: float = 0,
target_lane_index: int = None,
target_speed: float = None,
route: Route = None,
enable_lane_change: bool = True,
timer: float = None,
data: dict = None,
):
super().__init__(
road,
position,
heading,
speed,
target_lane_index,
target_speed,
route,
enable_lane_change,
timer,
)
self.data = data if data is not None else {}
self.collecting_data = True
[docs]
def act(self, action: dict | str = None):
if self.collecting_data:
self.collect_data()
super().act(action)
def randomize_behavior(self):
ua = self.road.np_random.uniform(size=np.shape(self.ACCELERATION_PARAMETERS))
self.ACCELERATION_PARAMETERS = self.ACCELERATION_RANGE[0] + ua * (
self.ACCELERATION_RANGE[1] - self.ACCELERATION_RANGE[0]
)
ub = self.road.np_random.uniform(size=np.shape(self.STEERING_PARAMETERS))
self.STEERING_PARAMETERS = self.STEERING_RANGE[0] + ub * (
self.STEERING_RANGE[1] - self.STEERING_RANGE[0]
)
[docs]
def acceleration(
self,
ego_vehicle: ControlledVehicle,
front_vehicle: Vehicle = None,
rear_vehicle: Vehicle = None,
) -> float:
"""
Compute an acceleration command with a Linear Model.
The acceleration is chosen so as to:
- reach a target speed;
- reach the speed of the leading (resp following) vehicle, if it is lower (resp higher) than ego's;
- maintain a minimum safety distance w.r.t the leading vehicle.
:param ego_vehicle: the vehicle whose desired acceleration is to be computed. It does not have to be an
Linear vehicle, which is why this method is a class method. This allows a Linear vehicle to
reason about other vehicles behaviors even though they may not Linear.
:param front_vehicle: the vehicle preceding the ego-vehicle
:param rear_vehicle: the vehicle following the ego-vehicle
:return: the acceleration command for the ego-vehicle [m/s2]
"""
return float(
np.dot(
self.ACCELERATION_PARAMETERS,
self.acceleration_features(ego_vehicle, front_vehicle, rear_vehicle),
)
)
def acceleration_features(
self,
ego_vehicle: ControlledVehicle,
front_vehicle: Vehicle = None,
rear_vehicle: Vehicle = None,
) -> np.ndarray:
vt, dv, dp = 0, 0, 0
if ego_vehicle:
vt = (
getattr(ego_vehicle, "target_speed", ego_vehicle.speed)
- ego_vehicle.speed
)
d_safe = (
self.DISTANCE_WANTED
+ np.maximum(ego_vehicle.speed, 0) * self.TIME_WANTED
)
if front_vehicle:
d = ego_vehicle.lane_distance_to(front_vehicle)
dv = min(front_vehicle.speed - ego_vehicle.speed, 0)
dp = min(d - d_safe, 0)
return np.array([vt, dv, dp])
[docs]
def steering_control(self, target_lane_index: LaneIndex) -> float:
"""
Linear controller with respect to parameters.
Overrides the non-linear controller ControlledVehicle.steering_control()
:param target_lane_index: index of the lane to follow
:return: a steering wheel angle command [rad]
"""
return float(
np.dot(
np.array(self.STEERING_PARAMETERS),
self.steering_features(target_lane_index),
)
)
[docs]
def steering_features(self, target_lane_index: LaneIndex) -> np.ndarray:
"""
A collection of features used to follow a lane
:param target_lane_index: index of the lane to follow
:return: a array of features
"""
lane = self.road.network.get_lane(target_lane_index)
lane_coords = lane.local_coordinates(self.position)
lane_next_coords = lane_coords[0] + self.speed * self.TAU_PURSUIT
lane_future_heading = lane.heading_at(lane_next_coords)
features = np.array(
[
utils.wrap_to_pi(lane_future_heading - self.heading)
* self.LENGTH
/ utils.not_zero(self.speed),
-lane_coords[1] * self.LENGTH / (utils.not_zero(self.speed) ** 2),
]
)
return features
def longitudinal_structure(self):
# Nominal dynamics: integrate speed
A = np.array([[0, 0, 1, 0], [0, 0, 0, 1], [0, 0, 0, 0], [0, 0, 0, 0]])
# Target speed dynamics
phi0 = np.array([[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, -1, 0], [0, 0, 0, -1]])
# Front speed control
phi1 = np.array([[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, -1, 1], [0, 0, 0, 0]])
# Front position control
phi2 = np.array(
[[0, 0, 0, 0], [0, 0, 0, 0], [-1, 1, -self.TIME_WANTED, 0], [0, 0, 0, 0]]
)
# Disable speed control
front_vehicle, _ = self.road.neighbour_vehicles(self)
if not front_vehicle or self.speed < front_vehicle.speed:
phi1 *= 0
# Disable front position control
if front_vehicle:
d = self.lane_distance_to(front_vehicle)
if d != self.DISTANCE_WANTED + self.TIME_WANTED * self.speed:
phi2 *= 0
else:
phi2 *= 0
phi = np.array([phi0, phi1, phi2])
return A, phi
def lateral_structure(self):
A = np.array([[0, 1], [0, 0]])
phi0 = np.array([[0, 0], [0, -1]])
phi1 = np.array([[0, 0], [-1, 0]])
phi = np.array([phi0, phi1])
return A, phi
[docs]
def collect_data(self):
"""Store features and outputs for parameter regression."""
self.add_features(self.data, self.target_lane_index)
def add_features(self, data, lane_index, output_lane=None):
front_vehicle, rear_vehicle = self.road.neighbour_vehicles(self)
features = self.acceleration_features(self, front_vehicle, rear_vehicle)
output = np.dot(self.ACCELERATION_PARAMETERS, features)
if "longitudinal" not in data:
data["longitudinal"] = {"features": [], "outputs": []}
data["longitudinal"]["features"].append(features)
data["longitudinal"]["outputs"].append(output)
if output_lane is None:
output_lane = lane_index
features = self.steering_features(lane_index)
out_features = self.steering_features(output_lane)
output = np.dot(self.STEERING_PARAMETERS, out_features)
if "lateral" not in data:
data["lateral"] = {"features": [], "outputs": []}
data["lateral"]["features"].append(features)
data["lateral"]["outputs"].append(output)
[docs]
class AggressiveVehicle(LinearVehicle):
LANE_CHANGE_MIN_ACC_GAIN = 1.0 # [m/s2]
MERGE_ACC_GAIN = 0.8
MERGE_VEL_RATIO = 0.75
MERGE_TARGET_VEL = 30
ACCELERATION_PARAMETERS = [
MERGE_ACC_GAIN / ((1 - MERGE_VEL_RATIO) * MERGE_TARGET_VEL),
MERGE_ACC_GAIN / (MERGE_VEL_RATIO * MERGE_TARGET_VEL),
0.5,
]
[docs]
class DefensiveVehicle(LinearVehicle):
LANE_CHANGE_MIN_ACC_GAIN = 1.0 # [m/s2]
MERGE_ACC_GAIN = 1.2
MERGE_VEL_RATIO = 0.75
MERGE_TARGET_VEL = 30
ACCELERATION_PARAMETERS = [
MERGE_ACC_GAIN / ((1 - MERGE_VEL_RATIO) * MERGE_TARGET_VEL),
MERGE_ACC_GAIN / (MERGE_VEL_RATIO * MERGE_TARGET_VEL),
2.0,
]