from __future__ import annotations
import numpy as np
from highway_env import utils
from highway_env.envs.common.abstract import AbstractEnv
from highway_env.road.lane import CircularLane, LineType, SineLane, StraightLane
from highway_env.road.road import Road, RoadNetwork
from highway_env.vehicle.controller import MDPVehicle
[docs]
class RoundaboutEnv(AbstractEnv):
[docs]
@classmethod
def default_config(cls) -> dict:
config = super().default_config()
config.update(
{
"observation": {
"type": "Kinematics",
"absolute": True,
"features_range": {
"x": [-100, 100],
"y": [-100, 100],
"vx": [-15, 15],
"vy": [-15, 15],
},
},
"action": {"type": "DiscreteMetaAction", "target_speeds": [0, 8, 16]},
"incoming_vehicle_destination": None,
"collision_reward": -1,
"high_speed_reward": 0.2,
"right_lane_reward": 0,
"lane_change_reward": -0.05,
"screen_width": 600,
"screen_height": 600,
"centering_position": [0.5, 0.6],
"duration": 11,
"normalize_reward": True,
}
)
return config
def _reward(self, action: int) -> float:
rewards = self._rewards(action)
reward = sum(
self.config.get(name, 0) * reward for name, reward in rewards.items()
)
if self.config["normalize_reward"]:
reward = utils.lmap(
reward,
[self.config["collision_reward"], self.config["high_speed_reward"]],
[0, 1],
)
reward *= rewards["on_road_reward"]
return reward
def _rewards(self, action: int) -> dict[str, float]:
return {
"collision_reward": self.vehicle.crashed,
"high_speed_reward": MDPVehicle.get_speed_index(self.vehicle)
/ (MDPVehicle.DEFAULT_TARGET_SPEEDS.size - 1),
"lane_change_reward": action in [0, 2],
"on_road_reward": self.vehicle.on_road,
}
def _is_terminated(self) -> bool:
return self.vehicle.crashed
def _is_truncated(self) -> bool:
return self.time >= self.config["duration"]
def _reset(self) -> None:
self._make_road()
self._make_vehicles()
def _make_road(self) -> None:
# Circle lanes: (s)outh/(e)ast/(n)orth/(w)est (e)ntry/e(x)it.
center = [0, 0] # [m]
radius = 20 # [m]
alpha = 24 # [deg]
net = RoadNetwork()
radii = [radius, radius + 4]
n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED
line = [[c, s], [n, c]]
for lane in [0, 1]:
net.add_lane(
"se",
"ex",
CircularLane(
center,
radii[lane],
np.deg2rad(90 - alpha),
np.deg2rad(alpha),
clockwise=False,
line_types=line[lane],
),
)
net.add_lane(
"ex",
"ee",
CircularLane(
center,
radii[lane],
np.deg2rad(alpha),
np.deg2rad(-alpha),
clockwise=False,
line_types=line[lane],
),
)
net.add_lane(
"ee",
"nx",
CircularLane(
center,
radii[lane],
np.deg2rad(-alpha),
np.deg2rad(-90 + alpha),
clockwise=False,
line_types=line[lane],
),
)
net.add_lane(
"nx",
"ne",
CircularLane(
center,
radii[lane],
np.deg2rad(-90 + alpha),
np.deg2rad(-90 - alpha),
clockwise=False,
line_types=line[lane],
),
)
net.add_lane(
"ne",
"wx",
CircularLane(
center,
radii[lane],
np.deg2rad(-90 - alpha),
np.deg2rad(-180 + alpha),
clockwise=False,
line_types=line[lane],
),
)
net.add_lane(
"wx",
"we",
CircularLane(
center,
radii[lane],
np.deg2rad(-180 + alpha),
np.deg2rad(-180 - alpha),
clockwise=False,
line_types=line[lane],
),
)
net.add_lane(
"we",
"sx",
CircularLane(
center,
radii[lane],
np.deg2rad(180 - alpha),
np.deg2rad(90 + alpha),
clockwise=False,
line_types=line[lane],
),
)
net.add_lane(
"sx",
"se",
CircularLane(
center,
radii[lane],
np.deg2rad(90 + alpha),
np.deg2rad(90 - alpha),
clockwise=False,
line_types=line[lane],
),
)
# Access lanes: (r)oad/(s)ine
access = 170 # [m]
dev = 85 # [m]
a = 5 # [m]
delta_st = 0.2 * dev # [m]
delta_en = dev - delta_st
w = 2 * np.pi / dev
net.add_lane(
"ser", "ses", StraightLane([2, access], [2, dev / 2], line_types=(s, c))
)
net.add_lane(
"ses",
"se",
SineLane(
[2 + a, dev / 2],
[2 + a, dev / 2 - delta_st],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"sx",
"sxs",
SineLane(
[-2 - a, -dev / 2 + delta_en],
[-2 - a, dev / 2],
a,
w,
-np.pi / 2 + w * delta_en,
line_types=(c, c),
),
)
net.add_lane(
"sxs", "sxr", StraightLane([-2, dev / 2], [-2, access], line_types=(n, c))
)
net.add_lane(
"eer", "ees", StraightLane([access, -2], [dev / 2, -2], line_types=(s, c))
)
net.add_lane(
"ees",
"ee",
SineLane(
[dev / 2, -2 - a],
[dev / 2 - delta_st, -2 - a],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"ex",
"exs",
SineLane(
[-dev / 2 + delta_en, 2 + a],
[dev / 2, 2 + a],
a,
w,
-np.pi / 2 + w * delta_en,
line_types=(c, c),
),
)
net.add_lane(
"exs", "exr", StraightLane([dev / 2, 2], [access, 2], line_types=(n, c))
)
net.add_lane(
"ner", "nes", StraightLane([-2, -access], [-2, -dev / 2], line_types=(s, c))
)
net.add_lane(
"nes",
"ne",
SineLane(
[-2 - a, -dev / 2],
[-2 - a, -dev / 2 + delta_st],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"nx",
"nxs",
SineLane(
[2 + a, dev / 2 - delta_en],
[2 + a, -dev / 2],
a,
w,
-np.pi / 2 + w * delta_en,
line_types=(c, c),
),
)
net.add_lane(
"nxs", "nxr", StraightLane([2, -dev / 2], [2, -access], line_types=(n, c))
)
net.add_lane(
"wer", "wes", StraightLane([-access, 2], [-dev / 2, 2], line_types=(s, c))
)
net.add_lane(
"wes",
"we",
SineLane(
[-dev / 2, 2 + a],
[-dev / 2 + delta_st, 2 + a],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"wx",
"wxs",
SineLane(
[dev / 2 - delta_en, -2 - a],
[-dev / 2, -2 - a],
a,
w,
-np.pi / 2 + w * delta_en,
line_types=(c, c),
),
)
net.add_lane(
"wxs", "wxr", StraightLane([-dev / 2, -2], [-access, -2], line_types=(n, c))
)
road = Road(
network=net,
np_random=self.np_random,
record_history=self.config["show_trajectories"],
)
self.road = road
def _make_vehicles(self) -> None:
"""
Populate a road with several vehicles on the highway and on the merging lane, as well as an ego-vehicle.
:return: the ego-vehicle
"""
position_deviation = 2.0
speed_deviation = 2.0
# Ego-vehicle
ego_lane = self.road.network.get_lane(("ser", "ses", 0))
ego_vehicle = self.action_type.vehicle_class(
self.road,
ego_lane.position(125.0, 0.0),
speed=8.0,
heading=ego_lane.heading_at(140.0),
)
try:
ego_vehicle.plan_route_to("nxs")
except AttributeError:
pass
self.road.vehicles.append(ego_vehicle)
self.vehicle = ego_vehicle
# Incoming vehicle
destinations = ["exr", "sxr", "nxr"]
other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
vehicle = other_vehicles_type.make_on_lane(
self.road,
("we", "sx", 1),
longitudinal=5.0 + self.np_random.normal() * position_deviation,
speed=16 + self.np_random.normal() * speed_deviation,
)
if self.config["incoming_vehicle_destination"] is not None:
destination = destinations[self.config["incoming_vehicle_destination"]]
else:
destination = self.np_random.choice(destinations)
vehicle.plan_route_to(destination)
vehicle.randomize_behavior()
self.road.vehicles.append(vehicle)
# Other vehicles
for i in list(range(1, 2)) + list(range(-1, 0)):
vehicle = other_vehicles_type.make_on_lane(
self.road,
("we", "sx", 0),
longitudinal=20.0 * float(i)
+ self.np_random.normal() * position_deviation,
speed=16.0 + self.np_random.normal() * speed_deviation,
)
vehicle.plan_route_to(self.np_random.choice(destinations))
vehicle.randomize_behavior()
self.road.vehicles.append(vehicle)
# Entering vehicle
vehicle = other_vehicles_type.make_on_lane(
self.road,
("eer", "ees", 0),
longitudinal=50.0 + self.np_random.normal() * position_deviation,
speed=16.0 + self.np_random.normal() * speed_deviation,
)
vehicle.plan_route_to(self.np_random.choice(destinations))
vehicle.randomize_behavior()
self.road.vehicles.append(vehicle)
class RoundaboutGenericEnv(RoundaboutEnv):
"""
A generic version of the roundabout environment.
Additionally supports changing:
- the number of lanes in the roundabout
- the roundabout radius
- the number of spawned other vehicles
"""
@classmethod
def default_config(cls) -> dict:
config = super().default_config()
config.update(
{
"roundabout_radius": 20,
"roundabout_lanes": 2,
"vehicles_count": 5,
"duration": 17,
}
)
return config
def _make_road(self) -> None:
center = [0, 0] # [m]
radius = self.config["roundabout_radius"]
num_lanes = self.config["roundabout_lanes"]
alpha = 24 # [deg]
net = RoadNetwork()
radii = [radius + (4 * i) for i in range(num_lanes)]
n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED
nodes = ["se", "ex", "ee", "nx", "ne", "wx", "we", "sx", "se"]
angles = [
(90 - alpha, alpha),
(alpha, -alpha),
(-alpha, -90 + alpha),
(-90 + alpha, -90 - alpha),
(-90 - alpha, -180 + alpha),
(-180 + alpha, -180 - alpha),
(180 - alpha, 90 + alpha),
(90 + alpha, 90 - alpha),
]
for lane in range(num_lanes):
if num_lanes == 1:
line_types = [c, c]
elif lane == 0:
line_types = [c, s]
elif lane == num_lanes - 1:
line_types = [n, c]
else:
line_types = [n, s]
for i in range(8):
net.add_lane(
nodes[i],
nodes[i + 1],
CircularLane(
center,
radii[lane],
np.deg2rad(angles[i][0]),
np.deg2rad(angles[i][1]),
clockwise=False,
line_types=line_types,
),
)
# Dynamically calculate exact coordinates on the outermost circle
outer_radius = radii[-1]
def pt(angle_deg: float) -> list[float]:
rad = np.deg2rad(angle_deg)
return [outer_radius * np.cos(rad), outer_radius * np.sin(rad)]
p_se = pt(90 - alpha)
p_ex = pt(alpha)
p_ee = pt(-alpha)
p_nx = pt(-90 + alpha)
p_ne = pt(-90 - alpha)
p_wx = pt(-180 + alpha)
p_we = pt(180 - alpha)
p_sx = pt(90 + alpha)
# In case radius is very large
dev = max(100.0, 2 * outer_radius + 40.0)
access = dev + 40.0
# South Entry (ses -> se)
dy = dev / 2 - p_se[1]
a = (p_se[0] - 2) / 2
w = np.pi / dy
net.add_lane(
"ser",
"ses",
StraightLane([2, access], [2, dev / 2], line_types=(s, c)),
)
net.add_lane(
"ses",
"se",
SineLane(
[2 + a, dev / 2], [2 + a, p_se[1]], a, w, -np.pi / 2, line_types=(c, c)
),
)
# South Exit (sx -> sxs)
dy = dev / 2 - p_sx[1]
a = (p_sx[0] + 2) / 2
w = np.pi / dy
net.add_lane(
"sx",
"sxs",
SineLane(
[p_sx[0] - a, p_sx[1]],
[p_sx[0] - a, dev / 2],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"sxs",
"sxr",
StraightLane([-2, dev / 2], [-2, access], line_types=(n, c)),
)
# East Entry (ees -> ee)
dx = dev / 2 - p_ee[0]
a = (-2 - p_ee[1]) / 2
w = np.pi / dx
net.add_lane(
"eer",
"ees",
StraightLane([access, -2], [dev / 2, -2], line_types=(s, c)),
)
net.add_lane(
"ees",
"ee",
SineLane(
[dev / 2, -2 - a],
[p_ee[0], -2 - a],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
# East Exit (ex -> exs)
dx = dev / 2 - p_ex[0]
a = (2 - p_ex[1]) / 2
w = np.pi / dx
net.add_lane(
"ex",
"exs",
SineLane(
[p_ex[0], p_ex[1] + a],
[dev / 2, p_ex[1] + a],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"exs",
"exr",
StraightLane([dev / 2, 2], [access, 2], line_types=(n, c)),
)
# North Entry (nes -> ne)
dy = p_ne[1] - (-dev / 2)
a = (-2 - p_ne[0]) / 2
w = np.pi / dy
net.add_lane(
"ner",
"nes",
StraightLane([-2, -access], [-2, -dev / 2], line_types=(s, c)),
)
net.add_lane(
"nes",
"ne",
SineLane(
[-2 - a, -dev / 2],
[-2 - a, p_ne[1]],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
# North Exit (nx -> nxs)
dy = p_nx[1] - (-dev / 2)
a = (2 - p_nx[0]) / 2
w = np.pi / dy
net.add_lane(
"nx",
"nxs",
SineLane(
[p_nx[0] + a, p_nx[1]],
[p_nx[0] + a, -dev / 2],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"nxs",
"nxr",
StraightLane([2, -dev / 2], [2, -access], line_types=(n, c)),
)
# West Entry (wes -> we)
dx = p_we[0] - (-dev / 2)
a = (p_we[1] - 2) / 2
w = np.pi / dx
net.add_lane(
"wer",
"wes",
StraightLane([-access, 2], [-dev / 2, 2], line_types=(s, c)),
)
net.add_lane(
"wes",
"we",
SineLane(
[-dev / 2, 2 + a], [p_we[0], 2 + a], a, w, -np.pi / 2, line_types=(c, c)
),
)
# West Exit (wx -> wxs)
dx = p_wx[0] - (-dev / 2)
a = (p_wx[1] + 2) / 2
w = np.pi / dx
net.add_lane(
"wx",
"wxs",
SineLane(
[p_wx[0], p_wx[1] - a],
[-dev / 2, p_wx[1] - a],
a,
w,
-np.pi / 2,
line_types=(c, c),
),
)
net.add_lane(
"wxs",
"wxr",
StraightLane([-dev / 2, -2], [-access, -2], line_types=(n, c)),
)
road = Road(
network=net,
np_random=self.np_random,
record_history=self.config["show_trajectories"],
)
self.road = road
def _make_vehicles(self) -> None:
speed_deviation = 2.0
vehicle_count = self.config["vehicles_count"]
other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
destinations = ["exr", "sxr", "nxr", "wxr"]
ego_lane_id = ("ser", "ses", 0)
ego_lane = self.road.network.get_lane(ego_lane_id)
ego_longitudinal = ego_lane.length - 2.5 # Placed at end of straight lane
ego_vehicle = self.action_type.vehicle_class(
self.road,
ego_lane.position(ego_longitudinal, 0.0),
speed=8.0,
heading=ego_lane.heading_at(ego_longitudinal),
)
try:
ego_vehicle.plan_route_to("nxs")
except AttributeError:
pass
self.road.vehicles.append(ego_vehicle)
self.vehicle = ego_vehicle
spawned_positions = {}
spawned_positions[ego_lane_id] = [ego_longitudinal]
spawn_lanes = [
("we", "sx"),
("sx", "se"),
("ee", "nx"),
("nx", "ne"),
("eer", "ees"),
("ner", "nes"),
("wer", "wes"),
]
spawned_points = []
spawned_points.append(ego_lane.position(ego_longitudinal, 0.0))
safe_distance = 7.0 # safe distance to spawn vehicles from each other
tries = 10 # number of times it tries to spawn a vehicle
for _ in range(vehicle_count):
for _ in range(tries):
lane_tuple = spawn_lanes[self.np_random.integers(0, len(spawn_lanes))]
available_lanes = len(
self.road.network.graph[lane_tuple[0]][lane_tuple[1]]
)
lane_index = self.np_random.integers(0, available_lanes)
lane_id = (lane_tuple[0], lane_tuple[1], lane_index)
lane = self.road.network.get_lane(lane_id)
longitudinal = self.np_random.uniform(
5.0,
max(5.0, lane.length - 5.0),
)
candidate_position = lane.position(longitudinal, 0.0)
is_safe = True
for point in spawned_points:
if np.linalg.norm(candidate_position - point) < safe_distance:
is_safe = False
break
if is_safe:
vehicle = other_vehicles_type.make_on_lane(
self.road,
lane_id,
longitudinal=longitudinal,
speed=14.0 + self.np_random.normal() * speed_deviation,
)
if self.config.get("incoming_vehicle_destination") is not None:
dest_idx = min(
self.config["incoming_vehicle_destination"],
len(destinations) - 1,
)
destination = destinations[dest_idx]
else:
destination = destinations[
self.np_random.integers(0, len(destinations))
]
vehicle.plan_route_to(destination)
vehicle.randomize_behavior()
self.road.vehicles.append(vehicle)
spawned_points.append(candidate_position)
break