Skip to content

Reference Trajectory

commonroad_control.control.reference_trajectory_factory

ReferenceTrajectoryFactory

Stores the reference trajectory (provided by a given planner) and provides the required snippet to the controller. For most controllers, this snippet consists of a single pair of state and control input at a given point in time. For model predictive control (MPC), the snippet spans the full MPC prediction horizon. If desired, a look-ahead horizon can be included; it will be handled automatically when querying the reference trajectory.

Source code in commonroad_control/control/reference_trajectory_factory.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class ReferenceTrajectoryFactory:
    """
    Stores the reference trajectory (provided by a given planner) and provides the required snippet to the controller.
    For most controllers, this snippet consists of a single pair of state and control input at a given point in time. For model predictive control (MPC), the snippet spans the full MPC prediction horizon. If desired, a look-ahead horizon can be included; it will be handled automatically when querying the reference trajectory.
    """

    def __init__(
        self,
        delta_t_controller: float,
        sidt_factory: StateInputDisturbanceTrajectoryFactoryInterface,
        mpc_horizon: int = 0,
        t_look_ahead: float = 0.0,
    ):
        """
        Initialize class.
        :param delta_t_controller: sampling time of the controller - float
        :param sidt_factory: factory for creating Trajectory objects
        :param mpc_horizon: prediction horizon of a model predictive controller (number of time steps) - int
        :param t_look_ahead: look-ahead for non-MPCs (in seconds) - float
        """

        self._horizon: int = mpc_horizon
        self._dt_controller: float = delta_t_controller
        self._t_look_ahead: float = t_look_ahead
        self._sidt_factory: StateInputDisturbanceTrajectoryFactoryInterface = sidt_factory

        self._x_ref: Union[TrajectoryInterface, None] = None
        self._u_ref: Union[TrajectoryInterface, None] = None
        if mpc_horizon > 0:
            self._x_ref_steps: List[int] = [kk for kk in range(self._horizon + 1)]
            self._u_ref_steps: List[int] = [kk for kk in range(self._horizon)]
        else:
            self._x_ref_steps = [0]
            self._u_ref_steps = [0]

    @property
    def state_trajectory(self) -> Union[TrajectoryInterface, None]:
        """
        :return: state trajectory
        """
        return self._x_ref

    def set_reference_trajectory(
        self, state_ref: TrajectoryInterface, input_ref: TrajectoryInterface, t_0: float
    ) -> None:
        """
        Function for updating the reference trajectory (e.g. given a new planned trajectory).
        :param state_ref: state reference trajectory - TrajectoryInterface
        :param input_ref: input reference trajectory - TrajectoryInterface
        :param t_0: initial time step of the reference trajectories
        """

        # consistency checks
        if state_ref.mode is not TrajectoryMode.State:
            logger.error(f"Invalid mode of state reference trajectory: expected {TrajectoryMode.State}")
            raise TypeError(f"Invalid mode of state reference trajectory: expected {TrajectoryMode.State}")
        if input_ref.mode is not TrajectoryMode.Input:
            logger.error(f"Invalid mode of input reference trajectory: expected {TrajectoryMode.Input}")
            raise TypeError(f"Invalid mode of input reference trajectory: expected {TrajectoryMode.Input}")
        if abs(state_ref.t_0 - t_0) > 1e-12 or abs(input_ref.t_0 - t_0) > 1e-12:
            logger.error("Inconsistent initial time for state and/or reference input trajectory")
            raise ValueError("Inconsistent initial time for state and/or reference input trajectory")
        if (
            t_0 + self._t_look_ahead + max(self._x_ref_steps) * self._dt_controller > state_ref.t_final
            or t_0 + self._t_look_ahead + max(self._u_ref_steps) * self._dt_controller > input_ref.t_final
        ):
            logger.error("Prediction/look-ahead horizon exceeds the final time of the given reference trajectories")
            raise ValueError("Prediction/look-ahead horizon exceeds the final time of the given reference trajectories")

        self._x_ref = state_ref
        self._u_ref = input_ref

    def get_reference_trajectory_at_time(self, t: float) -> Tuple[TrajectoryInterface, TrajectoryInterface]:
        """
        Returns a snippet from the reference trajectory (single or multiple points, depending on the controller, which
        is passed to the controller for computing the control inputs.
        :param t: current time - float
        :return: state and input reference trajectory - TrajectoryInterface
        """

        # ... extract state reference trajectory
        t_0 = t + self._t_look_ahead
        tmp_x_ref_points = [
            self._x_ref.get_point_at_time(time=t_0 + kk * self._dt_controller, factory=self._sidt_factory)
            for kk in self._x_ref_steps
        ]
        x_ref = self._sidt_factory.trajectory_from_points(
            trajectory_dict=dict(zip(self._x_ref_steps, tmp_x_ref_points)),
            mode=TrajectoryMode.State,
            t_0=t_0,
            delta_t=self._dt_controller,
        )
        # ... extract input reference trajectory
        tmp_u_ref_points = [
            self._u_ref.get_point_at_time(time=t_0 + kk * self._dt_controller, factory=self._sidt_factory)
            for kk in self._u_ref_steps
        ]
        u_ref = self._sidt_factory.trajectory_from_points(
            trajectory_dict=dict(zip(self._u_ref_steps, tmp_u_ref_points)),
            mode=TrajectoryMode.Input,
            t_0=t_0,
            delta_t=self._dt_controller,
        )

        return x_ref, u_ref
state_trajectory property

Returns:

Type Description
Union[TrajectoryInterface, None]

state trajectory

__init__(delta_t_controller, sidt_factory, mpc_horizon=0, t_look_ahead=0.0)

Initialize class.

Parameters:

Name Type Description Default
delta_t_controller float

sampling time of the controller - float

required
sidt_factory StateInputDisturbanceTrajectoryFactoryInterface

factory for creating Trajectory objects

required
mpc_horizon int

prediction horizon of a model predictive controller (number of time steps) - int

0
t_look_ahead float

look-ahead for non-MPCs (in seconds) - float

0.0
Source code in commonroad_control/control/reference_trajectory_factory.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
    self,
    delta_t_controller: float,
    sidt_factory: StateInputDisturbanceTrajectoryFactoryInterface,
    mpc_horizon: int = 0,
    t_look_ahead: float = 0.0,
):
    """
    Initialize class.
    :param delta_t_controller: sampling time of the controller - float
    :param sidt_factory: factory for creating Trajectory objects
    :param mpc_horizon: prediction horizon of a model predictive controller (number of time steps) - int
    :param t_look_ahead: look-ahead for non-MPCs (in seconds) - float
    """

    self._horizon: int = mpc_horizon
    self._dt_controller: float = delta_t_controller
    self._t_look_ahead: float = t_look_ahead
    self._sidt_factory: StateInputDisturbanceTrajectoryFactoryInterface = sidt_factory

    self._x_ref: Union[TrajectoryInterface, None] = None
    self._u_ref: Union[TrajectoryInterface, None] = None
    if mpc_horizon > 0:
        self._x_ref_steps: List[int] = [kk for kk in range(self._horizon + 1)]
        self._u_ref_steps: List[int] = [kk for kk in range(self._horizon)]
    else:
        self._x_ref_steps = [0]
        self._u_ref_steps = [0]
get_reference_trajectory_at_time(t)

Returns a snippet from the reference trajectory (single or multiple points, depending on the controller, which is passed to the controller for computing the control inputs.

Parameters:

Name Type Description Default
t float

current time - float

required

Returns:

Type Description
Tuple[TrajectoryInterface, TrajectoryInterface]

state and input reference trajectory - TrajectoryInterface

Source code in commonroad_control/control/reference_trajectory_factory.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def get_reference_trajectory_at_time(self, t: float) -> Tuple[TrajectoryInterface, TrajectoryInterface]:
    """
    Returns a snippet from the reference trajectory (single or multiple points, depending on the controller, which
    is passed to the controller for computing the control inputs.
    :param t: current time - float
    :return: state and input reference trajectory - TrajectoryInterface
    """

    # ... extract state reference trajectory
    t_0 = t + self._t_look_ahead
    tmp_x_ref_points = [
        self._x_ref.get_point_at_time(time=t_0 + kk * self._dt_controller, factory=self._sidt_factory)
        for kk in self._x_ref_steps
    ]
    x_ref = self._sidt_factory.trajectory_from_points(
        trajectory_dict=dict(zip(self._x_ref_steps, tmp_x_ref_points)),
        mode=TrajectoryMode.State,
        t_0=t_0,
        delta_t=self._dt_controller,
    )
    # ... extract input reference trajectory
    tmp_u_ref_points = [
        self._u_ref.get_point_at_time(time=t_0 + kk * self._dt_controller, factory=self._sidt_factory)
        for kk in self._u_ref_steps
    ]
    u_ref = self._sidt_factory.trajectory_from_points(
        trajectory_dict=dict(zip(self._u_ref_steps, tmp_u_ref_points)),
        mode=TrajectoryMode.Input,
        t_0=t_0,
        delta_t=self._dt_controller,
    )

    return x_ref, u_ref
set_reference_trajectory(state_ref, input_ref, t_0)

Function for updating the reference trajectory (e.g. given a new planned trajectory).

Parameters:

Name Type Description Default
state_ref TrajectoryInterface

state reference trajectory - TrajectoryInterface

required
input_ref TrajectoryInterface

input reference trajectory - TrajectoryInterface

required
t_0 float

initial time step of the reference trajectories

required
Source code in commonroad_control/control/reference_trajectory_factory.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def set_reference_trajectory(
    self, state_ref: TrajectoryInterface, input_ref: TrajectoryInterface, t_0: float
) -> None:
    """
    Function for updating the reference trajectory (e.g. given a new planned trajectory).
    :param state_ref: state reference trajectory - TrajectoryInterface
    :param input_ref: input reference trajectory - TrajectoryInterface
    :param t_0: initial time step of the reference trajectories
    """

    # consistency checks
    if state_ref.mode is not TrajectoryMode.State:
        logger.error(f"Invalid mode of state reference trajectory: expected {TrajectoryMode.State}")
        raise TypeError(f"Invalid mode of state reference trajectory: expected {TrajectoryMode.State}")
    if input_ref.mode is not TrajectoryMode.Input:
        logger.error(f"Invalid mode of input reference trajectory: expected {TrajectoryMode.Input}")
        raise TypeError(f"Invalid mode of input reference trajectory: expected {TrajectoryMode.Input}")
    if abs(state_ref.t_0 - t_0) > 1e-12 or abs(input_ref.t_0 - t_0) > 1e-12:
        logger.error("Inconsistent initial time for state and/or reference input trajectory")
        raise ValueError("Inconsistent initial time for state and/or reference input trajectory")
    if (
        t_0 + self._t_look_ahead + max(self._x_ref_steps) * self._dt_controller > state_ref.t_final
        or t_0 + self._t_look_ahead + max(self._u_ref_steps) * self._dt_controller > input_ref.t_final
    ):
        logger.error("Prediction/look-ahead horizon exceeds the final time of the given reference trajectories")
        raise ValueError("Prediction/look-ahead horizon exceeds the final time of the given reference trajectories")

    self._x_ref = state_ref
    self._u_ref = input_ref