Trajectory Processing#
This is the minimal guide for how trajectories flow through the system and how to wire a pipeline.
1) Input#
TrajectoryBuffer receives one Trajectory at a time.
2) Output shape#
After the pipeline runs, you get a list of items. Each item is typically a single training example (not a full batch). For language-model fine-tuning this commonly looks like:
[
{"input_ids": ..., "labels": ...},
{"input_ids": ..., "labels": ...},
# ...
]
The buffer then pushes each item to the downstream queue/loader.
3) Define a pipeline#
You compose a pipeline by chaining transforms. Two common ways:
Option A — Chain with .then(...)#
from agoge.traj_transforms import TrajectoryPipeline, UnwrapChatMessages, TokenizeChats
pipeline = (
TrajectoryPipeline.start()
.then(UnwrapChatMessages())
.then(TokenizeChats(processor=your_tokenizer))
)
items = pipeline.run(trajectory)
Option B — Build from a list (e.g., when transforms come from config)#
from agoge.traj_transforms import build_pipeline, UnwrapChatMessages, TokenizeChats
steps = [
UnwrapChatMessages(),
TokenizeChats(processor=your_tokenizer),
]
pipeline = pipeline.from_transforms(*steps)
items = pipeline.run(trajectory)
4) Configure via Hydra#
Compositional Config Approach#
The trajectory buffer uses a compositional config structure that allows you to mix and match transforms flexibly:
defaults:
- traj_buffer: base
- traj_buffer/transforms:
- unwrap_chat_messages
- tokenize_chats
This approach:
- Decouples transforms — Each transform has its own config file in
configs/traj_buffer/transforms/ - Preserves order — Transforms are applied in the order listed
- Enables easy overriding — Swap or reorder transforms via CLI without editing files
Built-in Transform Configs#
Available in configs/traj_buffer/transforms/:
unwrap_chat_messages.yaml— Extracts chat messages from trajectory timestepstokenize_chats.yaml— Tokenizes chats for training (requires${processor})passthrough_save.yaml— Saves trajectories to disk (configurable via${traj_output_dir}and${save_traj_to_file})simple_evaluator.yaml— LLM-based trajectory evaluation (requires OpenAI API key)
Complete Example: RL Training#
# configs/rl.yaml
defaults:
- traj_buffer: base
- traj_buffer/transforms:
- passthrough_save
- unwrap_chat_messages
- tokenize_chats
save_traj_to_file: true
traj_output_dir: ${paths.nfs}/trajectories/
Complete Example: Evaluation#
# configs/eval.yaml
defaults:
- traj_buffer: base
- traj_buffer/transforms:
- simple_evaluator
- passthrough_save
save_traj_to_file: true
traj_output_dir: ${paths.nfs}/trajectories/
CLI Overrides#
Reorder or customize transforms from the command line:
# Change transform order
uv run src/agoge/entrypoints/rl.py \
'traj_buffer/transforms=[tokenize_chats,passthrough_save,unwrap_chat_messages]'
# Use only specific transforms
uv run src/agoge/entrypoints/eval.py \
'traj_buffer/transforms=[simple_evaluator]'
# Disable trajectory saving
uv run src/agoge/entrypoints/eval.py \
save_traj_to_file=false
NOTE For convenience (and because they are not easy to override in the main config), we define
save_traj_to_fileandtraj_output_diras a top level configuration, and link it to thePassthroughSavetransform.
Hydra and TrajectoryBuffer will instantiate the pipeline, and the buffer will use it to take it from the input to the output queue.
5) Write your own transform#
A transform receives the current value and the original Trajectory, and returns the next value. Example: persist the original trajectory to disk and pass the input through untouched.
from typing import TypeVar
from pathlib import Path
import json, uuid
from agoge.schema import Trajectory
from agoge.traj_transforms import TrajectoryTransform
T = TypeVar("T")
class PassthroughSave(TrajectoryTransform[T, T]):
def __init__(self, output_dir: str):
self.output = Path(output_dir)
self.output.mkdir(parents=True, exist_ok=True)
def __call__(self, current: T, original: Trajectory) -> T:
path = self.output / f"{uuid.uuid4()}.json"
with path.open("w", encoding="utf-8") as f:
json.dump(original.model_dump(), f)
return current # We pass "current" unchanged so that this transform can be placed anywhere.
Use programmatically#
pipeline = (
TrajectoryPipeline.start()
.then(PassthroughSave("debug_trajs/"))
.then(UnwrapChatMessages())
.then(TokenizeChats(processor=your_tokenizer))
)
Integrate into config system#
Create a config file at configs/traj_buffer/transforms/passthrough_save.yaml:
passthrough_save:
_target_: agoge.traj_transforms.PassthroughSave
output_dir: ${paths.output_dir}/trajectories/
Then use it in your entrypoint config:
defaults:
- traj_buffer: base
- traj_buffer/transforms:
- passthrough_save
- unwrap_chat_messages
Or override from CLI:
uv run src/agoge/entrypoints/rl.py \
'traj_buffer/transforms=[passthrough_save]' \
'traj_buffer.transforms.passthrough_save.output_dir=/tmp/my_trajs/'
6) Recap#
- Input: one
Trajectory. - TrajectoryPipeline: series of typed transforms.
- Output: list of training items (e.g., list of dictionary with
{input_ids, labels}). - Buffer: iterates items and feeds the training loader.