from __future__ import annotations
from abc import ABCMeta, abstractmethod
import numpy as np
from highway_env import utils
from highway_env.road.spline import LinearSpline2D
from highway_env.utils import Vector, class_from_path, get_class_path, wrap_to_pi
[docs]
class AbstractLane:
"""A lane on the road, described by its central curve."""
metaclass__ = ABCMeta
DEFAULT_WIDTH: float = 4
VEHICLE_LENGTH: float = 5
length: float = 0
line_types: list[LineType]
[docs]
@abstractmethod
def position(self, longitudinal: float, lateral: float) -> np.ndarray:
"""
Convert local lane coordinates to a world position.
:param longitudinal: longitudinal lane coordinate [m]
:param lateral: lateral lane coordinate [m]
:return: the corresponding world position [m]
"""
raise NotImplementedError()
[docs]
@abstractmethod
def local_coordinates(self, position: np.ndarray) -> tuple[float, float]:
"""
Convert a world position to local lane coordinates.
:param position: a world position [m]
:return: the (longitudinal, lateral) lane coordinates [m]
"""
raise NotImplementedError()
[docs]
@abstractmethod
def heading_at(self, longitudinal: float) -> float:
"""
Get the lane heading at a given longitudinal lane coordinate.
:param longitudinal: longitudinal lane coordinate [m]
:return: the lane heading [rad]
"""
raise NotImplementedError()
[docs]
@abstractmethod
def width_at(self, longitudinal: float) -> float:
"""
Get the lane width at a given longitudinal lane coordinate.
:param longitudinal: longitudinal lane coordinate [m]
:return: the lane width [m]
"""
raise NotImplementedError()
[docs]
@classmethod
def from_config(cls, config: dict):
"""
Create lane instance from config
:param config: json dict with lane parameters
"""
raise NotImplementedError()
[docs]
@abstractmethod
def to_config(self) -> dict:
"""
Write lane parameters to dict which can be serialized to json
:return: dict of lane parameters
"""
raise NotImplementedError()
[docs]
def on_lane(
self,
position: np.ndarray,
longitudinal: float = None,
lateral: float = None,
margin: float = 0,
) -> bool:
"""
Whether a given world position is on the lane.
:param position: a world position [m]
:param longitudinal: (optional) the corresponding longitudinal lane coordinate, if known [m]
:param lateral: (optional) the corresponding lateral lane coordinate, if known [m]
:param margin: (optional) a supplementary margin around the lane width
:return: is the position on the lane?
"""
if longitudinal is None or lateral is None:
longitudinal, lateral = self.local_coordinates(position)
is_on = (
np.abs(lateral) <= self.width_at(longitudinal) / 2 + margin
and -self.VEHICLE_LENGTH <= longitudinal < self.length + self.VEHICLE_LENGTH
)
return is_on
[docs]
def is_reachable_from(self, position: np.ndarray) -> bool:
"""
Whether the lane is reachable from a given world position
:param position: the world position [m]
:return: is the lane reachable?
"""
if self.forbidden:
return False
longitudinal, lateral = self.local_coordinates(position)
is_close = (
np.abs(lateral) <= 2 * self.width_at(longitudinal)
and 0 <= longitudinal < self.length + self.VEHICLE_LENGTH
)
return is_close
def after_end(
self, position: np.ndarray, longitudinal: float = None, lateral: float = None
) -> bool:
if not longitudinal:
longitudinal, _ = self.local_coordinates(position)
return longitudinal > self.length - self.VEHICLE_LENGTH / 2
[docs]
def distance(self, position: np.ndarray):
"""Compute the L1 distance [m] from a position to the lane."""
s, r = self.local_coordinates(position)
return abs(r) + max(s - self.length, 0) + max(0 - s, 0)
[docs]
def distance_with_heading(
self,
position: np.ndarray,
heading: float | None,
heading_weight: float = 1.0,
):
"""Compute a weighted distance in position and heading to the lane."""
if heading is None:
return self.distance(position)
s, r = self.local_coordinates(position)
angle = np.abs(self.local_angle(heading, s))
return abs(r) + max(s - self.length, 0) + max(0 - s, 0) + heading_weight * angle
[docs]
def local_angle(self, heading: float, long_offset: float):
"""Compute non-normalised angle of heading to the lane."""
return wrap_to_pi(heading - self.heading_at(long_offset))
[docs]
class LineType:
"""A lane side line type."""
NONE = 0
STRIPED = 1
CONTINUOUS = 2
CONTINUOUS_LINE = 3
[docs]
class StraightLane(AbstractLane):
"""A lane going in straight line."""
def __init__(
self,
start: Vector,
end: Vector,
width: float = AbstractLane.DEFAULT_WIDTH,
line_types: tuple[LineType, LineType] = None,
forbidden: bool = False,
speed_limit: float = 20,
priority: int = 0,
) -> None:
"""
New straight lane.
:param start: the lane starting position [m]
:param end: the lane ending position [m]
:param width: the lane width [m]
:param line_types: the type of lines on both sides of the lane
:param forbidden: is changing to this lane forbidden
:param priority: priority level of the lane, for determining who has right of way
"""
self.start = np.array(start)
self.end = np.array(end)
self.width = width
self.heading = np.arctan2(
self.end[1] - self.start[1], self.end[0] - self.start[0]
)
self.length = np.linalg.norm(self.end - self.start)
self.line_types = line_types or [LineType.STRIPED, LineType.STRIPED]
self.direction = (self.end - self.start) / self.length
self.direction_lateral = np.array([-self.direction[1], self.direction[0]])
self.forbidden = forbidden
self.priority = priority
self.speed_limit = speed_limit
[docs]
def position(self, longitudinal: float, lateral: float) -> np.ndarray:
return (
self.start
+ longitudinal * self.direction
+ lateral * self.direction_lateral
)
[docs]
def heading_at(self, longitudinal: float) -> float:
return self.heading
[docs]
def width_at(self, longitudinal: float) -> float:
return self.width
[docs]
def local_coordinates(self, position: np.ndarray) -> tuple[float, float]:
delta = position - self.start
longitudinal = np.dot(delta, self.direction)
lateral = np.dot(delta, self.direction_lateral)
return float(longitudinal), float(lateral)
[docs]
@classmethod
def from_config(cls, config: dict):
config["start"] = np.array(config["start"])
config["end"] = np.array(config["end"])
return cls(**config)
[docs]
def to_config(self) -> dict:
return {
"class_path": get_class_path(self.__class__),
"config": {
"start": _to_serializable(self.start),
"end": _to_serializable(self.end),
"width": self.width,
"line_types": self.line_types,
"forbidden": self.forbidden,
"speed_limit": self.speed_limit,
"priority": self.priority,
},
}
[docs]
class SineLane(StraightLane):
"""A sinusoidal lane."""
def __init__(
self,
start: Vector,
end: Vector,
amplitude: float,
pulsation: float,
phase: float,
width: float = StraightLane.DEFAULT_WIDTH,
line_types: list[LineType] = None,
forbidden: bool = False,
speed_limit: float = 20,
priority: int = 0,
) -> None:
"""
New sinusoidal lane.
:param start: the lane starting position [m]
:param end: the lane ending position [m]
:param amplitude: the lane oscillation amplitude [m]
:param pulsation: the lane pulsation [rad/m]
:param phase: the lane initial phase [rad]
"""
super().__init__(
start, end, width, line_types, forbidden, speed_limit, priority
)
self.amplitude = amplitude
self.pulsation = pulsation
self.phase = phase
[docs]
def position(self, longitudinal: float, lateral: float) -> np.ndarray:
return super().position(
longitudinal,
lateral
+ self.amplitude * np.sin(self.pulsation * longitudinal + self.phase),
)
[docs]
def heading_at(self, longitudinal: float) -> float:
return super().heading_at(longitudinal) + np.arctan(
self.amplitude
* self.pulsation
* np.cos(self.pulsation * longitudinal + self.phase)
)
[docs]
def local_coordinates(self, position: np.ndarray) -> tuple[float, float]:
longitudinal, lateral = super().local_coordinates(position)
return longitudinal, lateral - self.amplitude * np.sin(
self.pulsation * longitudinal + self.phase
)
[docs]
@classmethod
def from_config(cls, config: dict):
config["start"] = np.array(config["start"])
config["end"] = np.array(config["end"])
return cls(**config)
[docs]
def to_config(self) -> dict:
config = super().to_config()
config.update(
{
"class_path": get_class_path(self.__class__),
}
)
config["config"].update(
{
"amplitude": self.amplitude,
"pulsation": self.pulsation,
"phase": self.phase,
}
)
return config
[docs]
class CircularLane(AbstractLane):
"""A lane going in circle arc."""
def __init__(
self,
center: Vector,
radius: float,
start_phase: float,
end_phase: float,
clockwise: bool = True,
width: float = AbstractLane.DEFAULT_WIDTH,
line_types: list[LineType] = None,
forbidden: bool = False,
speed_limit: float = 20,
priority: int = 0,
) -> None:
super().__init__()
self.center = np.array(center)
self.radius = radius
self.start_phase = start_phase
self.end_phase = end_phase
self.clockwise = clockwise
self.direction = 1 if clockwise else -1
self.width = width
self.line_types = line_types or [LineType.STRIPED, LineType.STRIPED]
self.forbidden = forbidden
self.length = radius * (end_phase - start_phase) * self.direction
self.priority = priority
self.speed_limit = speed_limit
[docs]
def position(self, longitudinal: float, lateral: float) -> np.ndarray:
phi = self.direction * longitudinal / self.radius + self.start_phase
return self.center + (self.radius - lateral * self.direction) * np.array(
[np.cos(phi), np.sin(phi)]
)
[docs]
def heading_at(self, longitudinal: float) -> float:
phi = self.direction * longitudinal / self.radius + self.start_phase
psi = phi + np.pi / 2 * self.direction
return psi
[docs]
def width_at(self, longitudinal: float) -> float:
return self.width
[docs]
def local_coordinates(self, position: np.ndarray) -> tuple[float, float]:
delta = position - self.center
phi = np.arctan2(delta[1], delta[0])
phi = self.start_phase + utils.wrap_to_pi(phi - self.start_phase)
r = np.linalg.norm(delta)
longitudinal = self.direction * (phi - self.start_phase) * self.radius
lateral = self.direction * (self.radius - r)
return longitudinal, lateral
[docs]
@classmethod
def from_config(cls, config: dict):
config["center"] = np.array(config["center"])
return cls(**config)
[docs]
def to_config(self) -> dict:
return {
"class_path": get_class_path(self.__class__),
"config": {
"center": _to_serializable(self.center),
"radius": self.radius,
"start_phase": self.start_phase,
"end_phase": self.end_phase,
"clockwise": self.clockwise,
"width": self.width,
"line_types": self.line_types,
"forbidden": self.forbidden,
"speed_limit": self.speed_limit,
"priority": self.priority,
},
}
[docs]
class PolyLaneFixedWidth(AbstractLane):
"""
A fixed-width lane defined by a set of points and approximated with a 2D Hermite polynomial.
"""
def __init__(
self,
lane_points: list[tuple[float, float]],
width: float = AbstractLane.DEFAULT_WIDTH,
line_types: tuple[LineType, LineType] = None,
forbidden: bool = False,
speed_limit: float = 20,
priority: int = 0,
) -> None:
self.curve = LinearSpline2D(lane_points)
self.length = self.curve.length
self.width = width
self.line_types = line_types
self.forbidden = forbidden
self.speed_limit = speed_limit
self.priority = priority
[docs]
def position(self, longitudinal: float, lateral: float) -> np.ndarray:
x, y = self.curve(longitudinal)
yaw = self.heading_at(longitudinal)
return np.array([x - np.sin(yaw) * lateral, y + np.cos(yaw) * lateral])
[docs]
def local_coordinates(self, position: np.ndarray) -> tuple[float, float]:
lon, lat = self.curve.cartesian_to_frenet(position)
return lon, lat
[docs]
def heading_at(self, longitudinal: float) -> float:
dx, dy = self.curve.get_dx_dy(longitudinal)
return np.arctan2(dy, dx)
[docs]
def width_at(self, longitudinal: float) -> float:
return self.width
[docs]
@classmethod
def from_config(cls, config: dict):
return cls(**config)
[docs]
def to_config(self) -> dict:
return {
"class_name": self.__class__.__name__,
"config": {
"lane_points": _to_serializable(
[_to_serializable(p.position) for p in self.curve.poses]
),
"width": self.width,
"line_types": self.line_types,
"forbidden": self.forbidden,
"speed_limit": self.speed_limit,
"priority": self.priority,
},
}
[docs]
class PolyLane(PolyLaneFixedWidth):
"""
A lane defined by a set of points and approximated with a 2D Hermite polynomial.
"""
def __init__(
self,
lane_points: list[tuple[float, float]],
left_boundary_points: list[tuple[float, float]],
right_boundary_points: list[tuple[float, float]],
line_types: tuple[LineType, LineType] = None,
forbidden: bool = False,
speed_limit: float = 20,
priority: int = 0,
):
super().__init__(
lane_points=lane_points,
line_types=line_types,
forbidden=forbidden,
speed_limit=speed_limit,
priority=priority,
)
self.right_boundary = LinearSpline2D(right_boundary_points)
self.left_boundary = LinearSpline2D(left_boundary_points)
self._init_width()
[docs]
def width_at(self, longitudinal: float) -> float:
if longitudinal < 0:
return self.width_samples[0]
elif longitudinal > len(self.width_samples) - 1:
return self.width_samples[-1]
else:
return self.width_samples[int(longitudinal)]
def _width_at_s(self, longitudinal: float) -> float:
"""
Calculate width by taking the minimum distance between centerline and each boundary at a given s-value. This compensates indentations in boundary lines.
"""
center_x, center_y = self.position(longitudinal, 0)
right_x, right_y = self.right_boundary(
self.right_boundary.cartesian_to_frenet([center_x, center_y])[0]
)
left_x, left_y = self.left_boundary(
self.left_boundary.cartesian_to_frenet([center_x, center_y])[0]
)
dist_to_center_right = np.linalg.norm(
np.array([right_x, right_y]) - np.array([center_x, center_y])
)
dist_to_center_left = np.linalg.norm(
np.array([left_x, left_y]) - np.array([center_x, center_y])
)
return max(
min(dist_to_center_right, dist_to_center_left) * 2,
AbstractLane.DEFAULT_WIDTH,
)
def _init_width(self):
"""
Pre-calculate sampled width values in about 1m distance to reduce computation during runtime. It is assumed that the width does not change significantly within 1-2m.
Using numpys linspace ensures that min and max s-values are contained in the samples.
"""
s_samples = np.linspace(
0,
self.curve.length,
num=int(np.ceil(self.curve.length)) + 1,
)
self.width_samples = [self._width_at_s(s) for s in s_samples]
[docs]
def to_config(self) -> dict:
config = super().to_config()
ordered_boundary_points = _to_serializable(
[_to_serializable(p.position) for p in reversed(self.left_boundary.poses)]
)
ordered_boundary_points += _to_serializable(
[_to_serializable(p.position) for p in self.right_boundary.poses]
)
config["class_name"] = self.__class__.__name__
config["config"]["ordered_boundary_points"] = ordered_boundary_points
del config["config"]["width"]
return config
def _to_serializable(arg: np.ndarray | list) -> list:
if isinstance(arg, np.ndarray):
return arg.tolist()
return arg
def lane_from_config(cfg: dict) -> AbstractLane:
return class_from_path(cfg["class_path"])(**cfg["config"])