Skip to content

Schema#

AREEvaluatedTrajectory #

Bases: EvaluatedTrajectory

Trajectory with ARE (Agents Research Environment) oracle matching evaluation.

This subclass extends Trajectory to include ARE-specific evaluation results from the oracle matching system. It follows the same pattern as LLMEvaluatedTrajectory, where environment-specific evaluation is stored in a dedicated subclass.

The AREEvaluatedTrajectory is an intermediate type that gets converted to LLMEvaluatedTrajectory by the ARERewardShaper transform.

Attributes:

Name Type Description
are_evaluation dict[str, Any]

Dictionary containing ARE's validation results: - success (bool): Whether all oracle events were successfully matched - rationale (str): Detailed explanation of the validation result - judge_model (str): LLM model used for oracle matching - oracle_matching_failures (list[dict]): Detailed failure information - duration (float): Time taken for validation in seconds

Examples:

>>> are_traj = AREEvaluatedTrajectory(
...     timesteps=[ts1, ts2, ts3],
...     reset_kwargs={"task": task_dict},
...     are_evaluation={
...         "success": True,
...         "rationale": "All oracle events matched successfully",
...         "judge_model": "gpt-5-mini",
...         "oracle_matching_failures": [],
...         "duration": 45.2,
...     },
... )
Source code in src/agoge/schema/trajectories.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
class AREEvaluatedTrajectory(EvaluatedTrajectory):
    """Trajectory with ARE (Agents Research Environment) oracle matching evaluation.

    This subclass extends Trajectory to include ARE-specific evaluation results from
    the oracle matching system. It follows the same pattern as LLMEvaluatedTrajectory,
    where environment-specific evaluation is stored in a dedicated subclass.

    The AREEvaluatedTrajectory is an intermediate type that gets converted to
    LLMEvaluatedTrajectory by the ARERewardShaper transform.

    Attributes:
        are_evaluation: Dictionary containing ARE's validation results:
            - success (bool): Whether all oracle events were successfully matched
            - rationale (str): Detailed explanation of the validation result
            - judge_model (str): LLM model used for oracle matching
            - oracle_matching_failures (list[dict]): Detailed failure information
            - duration (float): Time taken for validation in seconds

    Examples:
        >>> are_traj = AREEvaluatedTrajectory(
        ...     timesteps=[ts1, ts2, ts3],
        ...     reset_kwargs={"task": task_dict},
        ...     are_evaluation={
        ...         "success": True,
        ...         "rationale": "All oracle events matched successfully",
        ...         "judge_model": "gpt-5-mini",
        ...         "oracle_matching_failures": [],
        ...         "duration": 45.2,
        ...     },
        ... )
    """

    are_evaluation: dict[str, Any] = Field(..., description="ARE oracle matching evaluation results")

    @property
    def evaluation(self) -> dict[str, Any]:
        """Return ARE evaluation data via the unified interface.

        Returns:
            The are_evaluation dictionary containing ARE's validation results.
        """
        return self.are_evaluation

evaluation property #

Return ARE evaluation data via the unified interface.

Returns:

Type Description
dict[str, Any]

The are_evaluation dictionary containing ARE's validation results.

Chat #

Bases: BaseModel

A contiguous LLM call composed of one or more messages.

A Chat represents a complete conversation exchange with an LLM, containing a sequence of messages (system, user, assistant, tool) that form a logical unit of interaction. This is the fundamental building block for tracking LLM conversations in the RL pipeline.

The Chat is immutable (frozen=True) to ensure data integrity during training and inference. When new messages are added, a new Chat instance is created.

Attributes:

Name Type Description
messages list[ChatMessage]

List of ChatMessage objects representing the conversation. Each message can be a SystemMessage, UserMessage, AssistantMessage, or ToolMessage with various content types (text, images, etc.).

logprobs list[float] | None

Optional PyTorch tensor containing log probabilities from the LLM response. Set to None when messages are modified to ensure consistency between messages and their associated probabilities.

Logprobs Size Requirement

If not None, the logprobs tensor should have the same size as the output of tokenizer.apply_chat_template(chat).

Examples:

>>> from agoge.schema import Chat
>>> chat = Chat.model_validate({
...     "messages": [
...         {"role": "system", "content": "You are a helpful assistant."},
...         {
...             "role": "user",
...             "content": [
...                 {"type": "text", "text": "What's the weather like?"}
...             ],
...         },
...         {
...             "role": "assistant",
...             "content": [
...                 {"type": "text", "text": "It's sunny and 25°C."}
...             ],
...         },
...     ]
... })

Adding messages creates a new Chat instance#

>>> new_chat = chat + Chat.model_validate({
...     "role": "user",
...     "content": [{"type": "text", "text": "Thanks!"}],
... })
>>> new_chat is chat  # False - new instance created
Source code in src/agoge/schema/trajectories.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class Chat(BaseModel, frozen=True):
    """
    A contiguous LLM call composed of one or more messages.

    A Chat represents a complete conversation exchange with an LLM, containing
    a sequence of messages (system, user, assistant, tool) that form a logical
    unit of interaction. This is the fundamental building block for tracking
    LLM conversations in the RL pipeline.

    The Chat is immutable (frozen=True) to ensure data integrity during training
    and inference. When new messages are added, a new Chat instance is created.

    Attributes:
        messages: List of ChatMessage objects representing the conversation.
                 Each message can be a SystemMessage, UserMessage, AssistantMessage,
                 or ToolMessage with various content types (text, images, etc.).
        logprobs: Optional PyTorch tensor containing log probabilities from the LLM
                 response. Set to None when messages are modified to ensure
                 consistency between messages and their associated probabilities.

    !!! warning "Logprobs Size Requirement"
        If not None, the `logprobs` tensor should have the same size as
        the output of `tokenizer.apply_chat_template(chat)`.

    Examples:
        >>> from agoge.schema import Chat
        >>> chat = Chat.model_validate({
        ...     "messages": [
        ...         {"role": "system", "content": "You are a helpful assistant."},
        ...         {
        ...             "role": "user",
        ...             "content": [
        ...                 {"type": "text", "text": "What's the weather like?"}
        ...             ],
        ...         },
        ...         {
        ...             "role": "assistant",
        ...             "content": [
        ...                 {"type": "text", "text": "It's sunny and 25°C."}
        ...             ],
        ...         },
        ...     ]
        ... })

        # Adding messages creates a new Chat instance
        >>> new_chat = chat + Chat.model_validate({
        ...     "role": "user",
        ...     "content": [{"type": "text", "text": "Thanks!"}],
        ... })
        >>> new_chat is chat  # False - new instance created
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)
    messages: list[ChatMessage]
    logprobs: list[float] | None = None
    top_tokens: list[list[str]] | None = None
    token_top_logprobs: list[list[float]] | None = None

    def to_dict(self, exclude: set[str] | None = None) -> list[dict]:
        exclude = exclude or set()
        return [message.model_dump(exclude=exclude) for message in self.messages]

    def apply_chat_template(self, tokenizer: AutoTokenizer, exclude: set[str] | None, **kwargs):
        """
        Apply the chat template to the messages.

        Args:
            tokenizer: The tokenizer to apply the chat template to.
            **kwargs: Additional keyword arguments to pass to the tokenizer.apply_chat_template method.
        """
        exclude = exclude or set()
        return tokenizer.apply_chat_template(
            self.model_dump(exclude=exclude)["messages"],
            **kwargs,
        )

    def extract_images(self, as_bytes=False):
        """Extract all images from the messages and return them as PIL images or bytes.

        Args:
            as_bytes: If True, return raw bytes instead of PIL images
        """

        def load_image_from_source(source):
            """Load PIL image or bytes from various source types."""
            try:
                if source.startswith("data:image"):
                    base64_data = source.split(",", 1)[1]
                    decoded_bytes = base64.b64decode(base64_data)
                    return decoded_bytes if as_bytes else Image.open(BytesIO(decoded_bytes))
                elif source.startswith(("http://", "https://")):
                    response = requests.get(source)
                    response.raise_for_status()
                    return response.content if as_bytes else Image.open(BytesIO(response.content))
                elif source.startswith("file://"):
                    with open(source[7:], "rb") as f:  # noqa: PTH123, doing string manupulation for volatile input
                        file_bytes = f.read()
                    return file_bytes if as_bytes else Image.open(BytesIO(file_bytes))
                else:
                    with open(source, "rb") as f:  # noqa: PTH123
                        file_bytes = f.read()
                    return file_bytes if as_bytes else Image.open(BytesIO(file_bytes))
            except Exception:
                logger.warning(f"Failed to load image from {source}", exc_info=True)
                return None

        images = []
        for message in self.messages:
            if not isinstance(message.content, list):
                continue

            for part in message.content:
                if part.type == "image":
                    if img := load_image_from_source(part.image):
                        images.append(img)
                elif part.type == "image_url":
                    if img := load_image_from_source(part.image_url["url"]):
                        images.append(img)
                elif part.type == "image_file":
                    logger.warning(f"ImageFilePart with file_id {part.image_file['file_id']} not yet supported")

        return images

    def history_view(
        self,
        limits: dict | None = None,
    ) -> Chat:
        """Context window management.

        Filters messages in reverse order, retaining a limited number of recent messages.
        """
        if limits is None:
            limits = {}
        label_counts = defaultdict(int)
        filtered_messages = []
        for message in reversed(self.messages):
            if message.label and message.label.value in limits:
                maximum_count = limits[message.label.value]
                if label_counts[message.label.value] < maximum_count:
                    filtered_messages.append(message)
                    label_counts[message.label.value] += 1
            else:
                filtered_messages.append(message)
        return Chat(messages=reversed(filtered_messages))

    # helper keeps the "invalidate logprobs" rule in one place
    def _new(self, msgs: list[ChatMessage]) -> Self:
        """
        Create a new Chat instance with updated messages.

        This helper method ensures that logprobs are invalidated whenever
        messages are modified, maintaining consistency between the conversation
        and its associated probability distributions.

        Args:
            msgs: New list of ChatMessage objects for the conversation.

        Returns:
            A new Chat instance with the updated messages and logprobs set to None.
        """
        return self.model_copy(update={"messages": msgs, "logprobs": None})

    # ------------------------------------------------------------------
    # Chat + X
    # ------------------------------------------------------------------
    def __add__(self, other: Chat | ChatMessage) -> Self:
        """
        Add a ChatMessage or concatenate another Chat to this Chat.

        This method enables intuitive concatenation of conversations:
        - Adding a single message: chat + message
        - Combining two chats: chat1 + chat2

        Args:
            other: Either a ChatMessage to append, or another Chat to concatenate.

        Returns:
            A new Chat instance with the combined messages.

        Examples:
            >>> chat = Chat(messages=[msg1, msg2])
            >>> combined = chat1 + chat2  # Concatenate two chats
        """
        if isinstance(other, _BaseMsg):
            return self._new([*self.messages, other])

        if isinstance(other, Chat):
            return self._new(self.messages + other.messages)

        return NotImplemented

    # reflected ChatMessage + Chat
    def __radd__(self, other: ChatMessage) -> Self:
        """
        Support for reflected addition (ChatMessage + Chat).

        Allows adding a ChatMessage to the beginning of a Chat:
        message + chat

        Args:
            other: A ChatMessage to prepend to this Chat.

        Returns:
            A new Chat instance with the message prepended.

        Examples:
            >>> msg = ChatMessage(role="system", content="You are helpful")
            >>> chat = Chat(messages=[user_msg, assistant_msg])
            >>> new_chat = msg + chat  # System message prepended
        """
        if isinstance(other, _BaseMsg):
            return self._new([other, *self.messages])
        return NotImplemented

    # in-place form (still returns a new frozen instance)
    __iadd__ = __add__

__add__(other) #

Add a ChatMessage or concatenate another Chat to this Chat.

This method enables intuitive concatenation of conversations: - Adding a single message: chat + message - Combining two chats: chat1 + chat2

Parameters:

Name Type Description Default
other Chat | ChatMessage

Either a ChatMessage to append, or another Chat to concatenate.

required

Returns:

Type Description
Self

A new Chat instance with the combined messages.

Examples:

>>> chat = Chat(messages=[msg1, msg2])
>>> combined = chat1 + chat2  # Concatenate two chats
Source code in src/agoge/schema/trajectories.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def __add__(self, other: Chat | ChatMessage) -> Self:
    """
    Add a ChatMessage or concatenate another Chat to this Chat.

    This method enables intuitive concatenation of conversations:
    - Adding a single message: chat + message
    - Combining two chats: chat1 + chat2

    Args:
        other: Either a ChatMessage to append, or another Chat to concatenate.

    Returns:
        A new Chat instance with the combined messages.

    Examples:
        >>> chat = Chat(messages=[msg1, msg2])
        >>> combined = chat1 + chat2  # Concatenate two chats
    """
    if isinstance(other, _BaseMsg):
        return self._new([*self.messages, other])

    if isinstance(other, Chat):
        return self._new(self.messages + other.messages)

    return NotImplemented

__radd__(other) #

Support for reflected addition (ChatMessage + Chat).

Allows adding a ChatMessage to the beginning of a Chat: message + chat

Parameters:

Name Type Description Default
other ChatMessage

A ChatMessage to prepend to this Chat.

required

Returns:

Type Description
Self

A new Chat instance with the message prepended.

Examples:

>>> msg = ChatMessage(role="system", content="You are helpful")
>>> chat = Chat(messages=[user_msg, assistant_msg])
>>> new_chat = msg + chat  # System message prepended
Source code in src/agoge/schema/trajectories.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def __radd__(self, other: ChatMessage) -> Self:
    """
    Support for reflected addition (ChatMessage + Chat).

    Allows adding a ChatMessage to the beginning of a Chat:
    message + chat

    Args:
        other: A ChatMessage to prepend to this Chat.

    Returns:
        A new Chat instance with the message prepended.

    Examples:
        >>> msg = ChatMessage(role="system", content="You are helpful")
        >>> chat = Chat(messages=[user_msg, assistant_msg])
        >>> new_chat = msg + chat  # System message prepended
    """
    if isinstance(other, _BaseMsg):
        return self._new([other, *self.messages])
    return NotImplemented

apply_chat_template(tokenizer, exclude, **kwargs) #

Apply the chat template to the messages.

Parameters:

Name Type Description Default
tokenizer AutoTokenizer

The tokenizer to apply the chat template to.

required
**kwargs

Additional keyword arguments to pass to the tokenizer.apply_chat_template method.

{}
Source code in src/agoge/schema/trajectories.py
82
83
84
85
86
87
88
89
90
91
92
93
94
def apply_chat_template(self, tokenizer: AutoTokenizer, exclude: set[str] | None, **kwargs):
    """
    Apply the chat template to the messages.

    Args:
        tokenizer: The tokenizer to apply the chat template to.
        **kwargs: Additional keyword arguments to pass to the tokenizer.apply_chat_template method.
    """
    exclude = exclude or set()
    return tokenizer.apply_chat_template(
        self.model_dump(exclude=exclude)["messages"],
        **kwargs,
    )

extract_images(as_bytes=False) #

Extract all images from the messages and return them as PIL images or bytes.

Parameters:

Name Type Description Default
as_bytes

If True, return raw bytes instead of PIL images

False
Source code in src/agoge/schema/trajectories.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def extract_images(self, as_bytes=False):
    """Extract all images from the messages and return them as PIL images or bytes.

    Args:
        as_bytes: If True, return raw bytes instead of PIL images
    """

    def load_image_from_source(source):
        """Load PIL image or bytes from various source types."""
        try:
            if source.startswith("data:image"):
                base64_data = source.split(",", 1)[1]
                decoded_bytes = base64.b64decode(base64_data)
                return decoded_bytes if as_bytes else Image.open(BytesIO(decoded_bytes))
            elif source.startswith(("http://", "https://")):
                response = requests.get(source)
                response.raise_for_status()
                return response.content if as_bytes else Image.open(BytesIO(response.content))
            elif source.startswith("file://"):
                with open(source[7:], "rb") as f:  # noqa: PTH123, doing string manupulation for volatile input
                    file_bytes = f.read()
                return file_bytes if as_bytes else Image.open(BytesIO(file_bytes))
            else:
                with open(source, "rb") as f:  # noqa: PTH123
                    file_bytes = f.read()
                return file_bytes if as_bytes else Image.open(BytesIO(file_bytes))
        except Exception:
            logger.warning(f"Failed to load image from {source}", exc_info=True)
            return None

    images = []
    for message in self.messages:
        if not isinstance(message.content, list):
            continue

        for part in message.content:
            if part.type == "image":
                if img := load_image_from_source(part.image):
                    images.append(img)
            elif part.type == "image_url":
                if img := load_image_from_source(part.image_url["url"]):
                    images.append(img)
            elif part.type == "image_file":
                logger.warning(f"ImageFilePart with file_id {part.image_file['file_id']} not yet supported")

    return images

history_view(limits=None) #

Context window management.

Filters messages in reverse order, retaining a limited number of recent messages.

Source code in src/agoge/schema/trajectories.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def history_view(
    self,
    limits: dict | None = None,
) -> Chat:
    """Context window management.

    Filters messages in reverse order, retaining a limited number of recent messages.
    """
    if limits is None:
        limits = {}
    label_counts = defaultdict(int)
    filtered_messages = []
    for message in reversed(self.messages):
        if message.label and message.label.value in limits:
            maximum_count = limits[message.label.value]
            if label_counts[message.label.value] < maximum_count:
                filtered_messages.append(message)
                label_counts[message.label.value] += 1
        else:
            filtered_messages.append(message)
    return Chat(messages=reversed(filtered_messages))

EvaluatedTrajectory #

Bases: Trajectory

Base class for trajectories with evaluation data.

This abstract base class provides a unified interface for accessing evaluation results across different evaluation types (ARE, LLM judge, etc.). Subclasses must implement the evaluation property to expose their evaluation data in a standardized format.

The base class enables polymorphic handling of evaluated trajectories without requiring isinstance checks for each specific evaluation type.

Examples:

>>> # Works with any EvaluatedTrajectory subclass
>>> if isinstance(traj, EvaluatedTrajectory):
...     eval_data = traj.evaluation  # Unified interface
Source code in src/agoge/schema/trajectories.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
class EvaluatedTrajectory(Trajectory):
    """Base class for trajectories with evaluation data.

    This abstract base class provides a unified interface for accessing evaluation
    results across different evaluation types (ARE, LLM judge, etc.). Subclasses
    must implement the `evaluation` property to expose their evaluation data in
    a standardized format.

    The base class enables polymorphic handling of evaluated trajectories without
    requiring isinstance checks for each specific evaluation type.

    Examples:
        >>> # Works with any EvaluatedTrajectory subclass
        >>> if isinstance(traj, EvaluatedTrajectory):
        ...     eval_data = traj.evaluation  # Unified interface
    """

    @property
    @abstractmethod
    def evaluation(self) -> dict[str, Any]:
        """Return evaluation data in a standardized format.

        Returns:
            Dictionary containing evaluation results. Structure may vary by subclass
            but should contain at minimum a success indicator and rationale.
        """
        pass

evaluation abstractmethod property #

Return evaluation data in a standardized format.

Returns:

Type Description
dict[str, Any]

Dictionary containing evaluation results. Structure may vary by subclass

dict[str, Any]

but should contain at minimum a success indicator and rationale.

LLMEvaluatedTrajectory #

Bases: EvaluatedTrajectory

Source code in src/agoge/schema/trajectories.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
class LLMEvaluatedTrajectory(EvaluatedTrajectory):
    judge_response: dict[str, float | str | dict] | None = None
    timesteps: list[LLMEvaluatedTimeStep]

    @property
    def evaluation(self) -> dict[str, Any]:
        """Return LLM judge evaluation data via the unified interface.

        Returns:
            The judge_response dictionary containing LLM evaluation results,
            or an empty dict if judge_response is None.
        """
        return self.judge_response or {}

    @classmethod
    def from_trajectory(
        cls,
        trajectory: Trajectory,
        *,
        timesteps: list[LLMEvaluatedTimeStep],
        judge_response: dict[str, float | str | dict] | None = None,
    ) -> LLMEvaluatedTrajectory:
        return cls(
            timesteps=timesteps,
            reset_kwargs=trajectory.reset_kwargs,
            judge_response=judge_response,
        )

evaluation property #

Return LLM judge evaluation data via the unified interface.

Returns:

Type Description
dict[str, Any]

The judge_response dictionary containing LLM evaluation results,

dict[str, Any]

or an empty dict if judge_response is None.

Task #

Bases: BaseModel

Represents a task to be executed by an agent in an episode.

A Task provides the input prompt/instructions, evaluation criteria, and metadata that guide the agent's behavior and enable assessment of its performance.

Attributes:

Name Type Description
task_id str

Unique identifier for the task

inputs dict

A dictionary with the task prompt, instructions or image (for offline dataset)

eval_criteria dict

Dictionary containing evaluation criteria and expected outcomes

metadata dict

Additional task metadata (difficulty, category, source, etc.)

Examples:

>>> task = Task(
...     task_id="math_001",
...     inputs={"instruction": "Calculate: What is 15% of 240?"},
...     eval_criteria={"correct_answer": 36, "tolerance": 0.01},
...     metadata={"category": "arithmetic", "difficulty": "medium"},
... )
Source code in src/agoge/schema/task.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Task(BaseModel):
    """
    Represents a task to be executed by an agent in an episode.

    A Task provides the input prompt/instructions, evaluation criteria,
    and metadata that guide the agent's behavior and enable assessment
    of its performance.

    Attributes:
        task_id: Unique identifier for the task
        inputs: A dictionary with the task prompt, instructions or image (for offline dataset)
        eval_criteria: Dictionary containing evaluation criteria and expected outcomes
        metadata: Additional task metadata (difficulty, category, source, etc.)

    Examples:
        >>> task = Task(
        ...     task_id="math_001",
        ...     inputs={"instruction": "Calculate: What is 15% of 240?"},
        ...     eval_criteria={"correct_answer": 36, "tolerance": 0.01},
        ...     metadata={"category": "arithmetic", "difficulty": "medium"},
        ... )
    """

    task_id: str = Field(..., description="Unique identifier for the task")
    inputs: dict = Field(default_factory=dict, description="Task prompt, instructions or images (for offline dataset)")
    eval_criteria: dict = Field(default_factory=dict, description="Evaluation criteria and expected outcomes")
    metadata: dict = Field(default_factory=dict, description="Additional task metadata")

TimeStep #

Bases: BaseModel

Represents a single transition point in the RL environment, corresponding to one step of agent-environment interaction.

Each TimeStep contains all LLM exchanges (Chats) that occurred during this step, the immediate reward assigned after those exchanges, and a flag indicating whether this is the terminal step of the episode.

Attributes:

Name Type Description
chats Annotated[list[Chat], Field(min_length=1, description='All LLM exchanges that occurred during this step')]

List of Chat objects representing all LLM interactions in this step. There must be at least one Chat per step. Multiple Chats may be present if, for example, the agent queries the LLM multiple times for clarification, summaries, or actions within a single environment step.

reward float | None

The immediate reward assigned after the chats, as determined by the environment or reward model. May be None if reward is not yet assigned.

done bool

Boolean flag indicating whether this is the final step in the episode (i.e., the environment is in a terminal state).

available_tool_schemas list[dict] | None

List of tool schemas that were available during this step

advantage float | None

Advantage of the timestep

Examples: >>> timestep = TimeStep( ... chats=[chat1, chat2], # Multiple LLM exchanges ... reward=1.0, # Immediate reward ... mc_return=1.0, # MC return ... done=False # Episode continues ... available_tool_schemas=[tool_schema1, tool_schema2] ... advantage=None # Advantage of the timestep ... )

Source code in src/agoge/schema/trajectories.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class TimeStep(BaseModel, frozen=True):
    """
    Represents a single transition point in the RL environment,
    corresponding to one step of agent-environment interaction.

    Each TimeStep contains all LLM exchanges (Chats) that occurred during this step,
    the immediate reward assigned after those exchanges, and a flag indicating whether
    this is the terminal step of the episode.

    Attributes:
        chats: List of Chat objects representing all LLM interactions in this step.
               There must be at least one Chat per step.
               Multiple Chats may be present if, for example, the agent queries the LLM multiple times
               for clarification, summaries, or actions within a single environment step.
        reward: The immediate reward assigned after the chats, as determined by the environment or reward model.
                May be None if reward is not yet assigned.
        done: Boolean flag indicating whether this is the final step in the
              episode (i.e., the environment is in a terminal state).
        available_tool_schemas: List of tool schemas that were available during this step
        advantage: Advantage of the timestep
    Examples:
        >>> timestep = TimeStep(
        ...     chats=[chat1, chat2],  # Multiple LLM exchanges
        ...     reward=1.0,            # Immediate reward
        ...     mc_return=1.0,            # MC return
        ...     done=False             # Episode continues
        ...     available_tool_schemas=[tool_schema1, tool_schema2]
        ...     advantage=None           # Advantage of the timestep
        ... )
    """

    chats: Annotated[
        list[Chat],
        Field(min_length=1, description="All LLM exchanges that occurred during this step"),
    ]
    reward: float | None = Field(None, description="Immediate reward assigned after the chats")
    mc_return: float | None = Field(None, description="MC return of the trajectory")
    done: bool = Field(False, description="Marks terminal step in an episode")
    available_tool_schemas: list[dict] | None = Field(
        None, description="List of tool schemas that were available during this step"
    )
    advantage: float | None = Field(None, description="Advantage of the timestep")

Trajectory #

Bases: BaseModel

Represents a complete episode in the RL environment, consisting of an ordered sequence of TimeSteps.

A Trajectory is the primary data structure passed between the runner, inference manager, and trainer in the RL pipeline. It contains all the information needed to train or evaluate an agent's behavior over a full episode, including all LLM interactions, rewards, and episode boundaries.

Attributes:

Name Type Description
timesteps Annotated[list[TimeStep], Field(description='Ordered timesteps from t=0 to T')]

List of TimeStep objects, ordered from t=0 to T (the end of the episode). There must be at least one TimeStep per Trajectory.

reset_kwargs dict

Dictionary containing the reset parameters used to initialize this episode.

error_info dict[str, str] | None

Optional dictionary containing error details if the episode terminated due to an error.

schema_version int

Integer indicating the schema version for compatibility and migration purposes.

episode_id str | None

Unique identifier of the episode

Examples:

>>> trajectory = Trajectory(
...     timesteps=[timestep1, timestep2, timestep3],
...     reset_kwargs={"seed": 42, "difficulty": "easy"},
... )
Source code in src/agoge/schema/trajectories.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
class Trajectory(BaseModel, frozen=True):
    """
    Represents a complete episode in the RL environment, consisting of an ordered sequence of TimeSteps.

    A Trajectory is the primary data structure passed between the runner, inference manager,
    and trainer in the RL pipeline. It contains all the information needed to train or evaluate an
    agent's behavior over a full episode, including all LLM interactions, rewards, and episode boundaries.

    Attributes:
        timesteps: List of TimeStep objects, ordered from t=0 to T (the end of the episode).
                   There must be at least one `TimeStep` per `Trajectory`.
        reset_kwargs: Dictionary containing the reset parameters used to initialize this episode.
        error_info: Optional dictionary containing error details if the episode terminated due to an error.
        schema_version: Integer indicating the schema version for compatibility and migration purposes.
        episode_id: Unique identifier of the episode

    Examples:
        >>> trajectory = Trajectory(
        ...     timesteps=[timestep1, timestep2, timestep3],
        ...     reset_kwargs={"seed": 42, "difficulty": "easy"},
        ... )
    """

    timesteps: Annotated[
        list[TimeStep],
        Field(description="Ordered timesteps from t=0 to T"),
    ]
    reset_kwargs: dict = Field(default_factory=dict, description="Reset parameters used to initialize this episode")
    error_info: dict[str, str] | None = Field(
        None, description="Error details if episode terminated due to an error (error_type, message, traceback)"
    )
    episode_id: str | None = Field(None, description="Unique identifier of the episode")
    schema_version: int = 1

    # Observability: last response flow ID from agent for flow termination at traj_out
    trace_last_response_flow_id: str | None = Field(
        None, description="Last response flow ID from vllm.inference for tracing", exclude=True
    )