Source code for highway_env.vehicle.kinematics

from __future__ import annotations

import copy
from collections import deque

import numpy as np

from highway_env.road.road import Road
from highway_env.utils import Vector
from highway_env.vehicle.objects import RoadObject


[docs] class Vehicle(RoadObject): """ A moving vehicle on a road, and its kinematics. The vehicle is represented by a dynamical system: a modified bicycle model. It's state is propagated depending on its steering and acceleration actions. """ LENGTH = 5.0 """ Vehicle length [m] """ WIDTH = 2.0 """ Vehicle width [m] """ DEFAULT_INITIAL_SPEEDS = [23, 25] """ Range for random initial speeds [m/s] """ MAX_SPEED = 40.0 """ Maximum reachable speed [m/s] """ MIN_SPEED = -40.0 """ Minimum reachable speed [m/s] """ HISTORY_SIZE = 30 """ Length of the vehicle state history, for trajectory display""" def __init__( self, road: Road, position: Vector, heading: float = 0, speed: float = 0, predition_type: str = "constant_steering", ): super().__init__(road, position, heading, speed) self.prediction_type = predition_type self.action = {"steering": 0, "acceleration": 0} self.crashed = False self.impact = None self.log = [] self.history = deque(maxlen=self.HISTORY_SIZE)
[docs] @classmethod def create_random( cls, road: Road, speed: float = None, lane_from: str | None = None, lane_to: str | None = None, lane_id: int | None = None, spacing: float = 1, ) -> Vehicle: """ Create a random vehicle on the road. The lane and /or speed are chosen randomly, while longitudinal position is chosen behind the last vehicle in the road with density based on the number of lanes. :param road: the road where the vehicle is driving :param speed: initial speed in [m/s]. If None, will be chosen randomly :param lane_from: start node of the lane to spawn in :param lane_to: end node of the lane to spawn in :param lane_id: id of the lane to spawn in :param spacing: ratio of spacing to the front vehicle, 1 being the default :return: A vehicle with random position and/or speed """ _from = lane_from or road.np_random.choice(list(road.network.graph.keys())) _to = lane_to or road.np_random.choice(list(road.network.graph[_from].keys())) _id = ( lane_id if lane_id is not None else road.np_random.choice(len(road.network.graph[_from][_to])) ) lane = road.network.get_lane((_from, _to, _id)) if speed is None: if lane.speed_limit is not None: speed = road.np_random.uniform( 0.7 * lane.speed_limit, 0.8 * lane.speed_limit ) else: speed = road.np_random.uniform( Vehicle.DEFAULT_INITIAL_SPEEDS[0], Vehicle.DEFAULT_INITIAL_SPEEDS[1] ) default_spacing = 12 + 1.0 * speed offset = ( spacing * default_spacing * np.exp(-5 / 40 * len(road.network.graph[_from][_to])) ) x0 = ( np.max([lane.local_coordinates(v.position)[0] for v in road.vehicles]) if len(road.vehicles) else 3 * offset ) x0 += offset * road.np_random.uniform(0.9, 1.1) v = cls(road, lane.position(x0, 0), lane.heading_at(x0), speed) return v
[docs] @classmethod def create_from(cls, vehicle: Vehicle) -> Vehicle: """ Create a new vehicle from an existing one. Only the vehicle dynamics are copied, other properties are default. :param vehicle: a vehicle :return: a new vehicle at the same dynamical state """ v = cls(vehicle.road, vehicle.position, vehicle.heading, vehicle.speed) if hasattr(vehicle, "color"): v.color = vehicle.color return v
[docs] def act(self, action: dict | str = None) -> None: """ Store an action to be repeated. :param action: the input action """ if action: self.action = action
[docs] def step(self, dt: float) -> None: """ Propagate the vehicle state given its actions. Integrate a modified bicycle model with a 1st-order response on the steering wheel dynamics. If the vehicle is crashed, the actions are overridden with erratic steering and braking until complete stop. The vehicle's current lane is updated. :param dt: timestep of integration of the model [s] """ self.clip_actions() delta_f = self.action["steering"] beta = np.arctan(1 / 2 * np.tan(delta_f)) v = self.speed * np.array( [np.cos(self.heading + beta), np.sin(self.heading + beta)] ) self.position += v * dt if self.impact is not None: self.position += self.impact self.crashed = True self.impact = None self.heading += self.speed * np.sin(beta) / (self.LENGTH / 2) * dt self.speed += self.action["acceleration"] * dt self.on_state_update()
def clip_actions(self) -> None: if self.crashed: self.action["steering"] = 0 self.action["acceleration"] = -1.0 * self.speed self.action["steering"] = float(self.action["steering"]) self.action["acceleration"] = float(self.action["acceleration"]) if self.speed > self.MAX_SPEED: self.action["acceleration"] = min( self.action["acceleration"], 1.0 * (self.MAX_SPEED - self.speed) ) elif self.speed < self.MIN_SPEED: self.action["acceleration"] = max( self.action["acceleration"], 1.0 * (self.MIN_SPEED - self.speed) ) def on_state_update(self) -> None: if self.road: self.lane_index = self.road.network.get_closest_lane_index( self.position, self.heading ) self.lane = self.road.network.get_lane(self.lane_index) if self.road.record_history: self.history.appendleft(self.create_from(self)) def predict_trajectory_constant_speed( self, times: np.ndarray ) -> tuple[list[np.ndarray], list[float]]: if self.prediction_type == "zero_steering": action = {"acceleration": 0.0, "steering": 0.0} elif self.prediction_type == "constant_steering": action = {"acceleration": 0.0, "steering": self.action["steering"]} else: raise ValueError("Unknown predition type") dt = np.diff(np.concatenate(([0.0], times))) positions = [] headings = [] v = copy.deepcopy(self) v.act(action) for t in dt: v.step(t) positions.append(v.position.copy()) headings.append(v.heading) return (positions, headings) @property def velocity(self) -> np.ndarray: return self.speed * self.direction # TODO: slip angle beta should be used here @property def destination(self) -> np.ndarray: if getattr(self, "route", None): last_lane_index = self.route[-1] last_lane_index = ( last_lane_index if last_lane_index[-1] is not None else (*last_lane_index[:-1], 0) ) last_lane = self.road.network.get_lane(last_lane_index) return last_lane.position(last_lane.length, 0) else: return self.position @property def destination_direction(self) -> np.ndarray: if (self.destination != self.position).any(): return (self.destination - self.position) / np.linalg.norm( self.destination - self.position ) else: return np.zeros((2,)) @property def lane_offset(self) -> np.ndarray: if self.lane is not None: long, lat = self.lane.local_coordinates(self.position) ang = self.lane.local_angle(self.heading, long) return np.array([long, lat, ang]) else: return np.zeros((3,)) def to_dict( self, origin_vehicle: Vehicle = None, observe_intentions: bool = True ) -> dict: d = { "presence": 1, "x": self.position[0], "y": self.position[1], "vx": self.velocity[0], "vy": self.velocity[1], "heading": self.heading, "cos_h": self.direction[0], "sin_h": self.direction[1], "cos_d": self.destination_direction[0], "sin_d": self.destination_direction[1], "long_off": self.lane_offset[0], "lat_off": self.lane_offset[1], "ang_off": self.lane_offset[2], } if not observe_intentions: d["cos_d"] = d["sin_d"] = 0 if origin_vehicle: origin_dict = origin_vehicle.to_dict() for key in ["x", "y", "vx", "vy"]: d[key] -= origin_dict[key] return d def __str__(self): return "{} #{}: {}".format( self.__class__.__name__, id(self) % 1000, self.position ) def __repr__(self): return self.__str__()
[docs] def predict_trajectory( self, actions: list, action_duration: float, trajectory_timestep: float, dt: float, ) -> list[Vehicle]: """ Predict the future trajectory of the vehicle given a sequence of actions. :param actions: a sequence of future actions. :param action_duration: the duration of each action. :param trajectory_timestep: the duration between each save of the vehicle state. :param dt: the timestep of the simulation :return: the sequence of future states """ states = [] v = copy.deepcopy(self) t = 0 for action in actions: v.act(action) # Low-level control action for _ in range(int(action_duration / dt)): t += 1 v.step(dt) if (t % int(trajectory_timestep / dt)) == 0: states.append(copy.deepcopy(v)) return states