Skip to content

Trajectory Processing#

This is the minimal guide for how trajectories flow through the system and how to wire a pipeline.


1) Input#

TrajectoryBuffer receives one Trajectory at a time.


2) Output shape#

After the pipeline runs, you get a list of items. Each item is typically a single training example (not a full batch). For language-model fine-tuning this commonly looks like:

[
  {"input_ids": ..., "labels": ...},
  {"input_ids": ..., "labels": ...},
  # ...
]

The buffer then pushes each item to the downstream queue/loader.


3) Define a pipeline#

You compose a pipeline by chaining transforms. Two common ways:

Option A — Chain with .then(...)#

from agoge.traj_transforms import TrajectoryPipeline, UnwrapChatMessages, TokenizeChats

pipeline = (
    TrajectoryPipeline.start()
    .then(UnwrapChatMessages())
    .then(TokenizeChats(processor=your_tokenizer))
)

items = pipeline.run(trajectory)

Option B — Build from a list (e.g., when transforms come from config)#

from agoge.traj_transforms import build_pipeline, UnwrapChatMessages, TokenizeChats

steps = [
    UnwrapChatMessages(),
    TokenizeChats(processor=your_tokenizer),
]

pipeline = pipeline.from_transforms(*steps)
items = pipeline.run(trajectory)

4) Configure via Hydra#

Compositional Config Approach#

The trajectory buffer uses a compositional config structure that allows you to mix and match transforms flexibly:

defaults:
  - traj_buffer: base
  - traj_buffer/transforms:
    - unwrap_chat_messages
    - tokenize_chats

This approach:

  • Decouples transforms — Each transform has its own config file in configs/traj_buffer/transforms/
  • Preserves order — Transforms are applied in the order listed
  • Enables easy overriding — Swap or reorder transforms via CLI without editing files

Built-in Transform Configs#

Available in configs/traj_buffer/transforms/:

  • unwrap_chat_messages.yaml — Extracts chat messages from trajectory timesteps
  • tokenize_chats.yaml — Tokenizes chats for training (requires ${processor})
  • passthrough_save.yaml — Saves trajectories to disk (configurable via ${traj_output_dir} and ${save_traj_to_file})
  • simple_evaluator.yaml — LLM-based trajectory evaluation (requires OpenAI API key)

Complete Example: RL Training#

# configs/rl.yaml
defaults:
  - traj_buffer: base
  - traj_buffer/transforms:
    - passthrough_save
    - unwrap_chat_messages
    - tokenize_chats

save_traj_to_file: true
traj_output_dir: ${paths.nfs}/trajectories/

Complete Example: Evaluation#

# configs/eval.yaml
defaults:
  - traj_buffer: base
  - traj_buffer/transforms:
    - simple_evaluator
    - passthrough_save

save_traj_to_file: true
traj_output_dir: ${paths.nfs}/trajectories/

CLI Overrides#

Reorder or customize transforms from the command line:

# Change transform order
uv run src/agoge/entrypoints/rl.py \
  'traj_buffer/transforms=[tokenize_chats,passthrough_save,unwrap_chat_messages]'

# Use only specific transforms
uv run src/agoge/entrypoints/eval.py \
  'traj_buffer/transforms=[simple_evaluator]'

# Disable trajectory saving
uv run src/agoge/entrypoints/eval.py \
  save_traj_to_file=false

NOTE For convenience (and because they are not easy to override in the main config), we define save_traj_to_file and traj_output_dir as a top level configuration, and link it to the PassthroughSave transform.

Hydra and TrajectoryBuffer will instantiate the pipeline, and the buffer will use it to take it from the input to the output queue.


5) Write your own transform#

A transform receives the current value and the original Trajectory, and returns the next value. Example: persist the original trajectory to disk and pass the input through untouched.

from typing import TypeVar
from pathlib import Path
import json, uuid
from agoge.schema import Trajectory
from agoge.traj_transforms import TrajectoryTransform

T = TypeVar("T")

class PassthroughSave(TrajectoryTransform[T, T]):
    def __init__(self, output_dir: str):
        self.output = Path(output_dir)
        self.output.mkdir(parents=True, exist_ok=True)

    def __call__(self, current: T, original: Trajectory) -> T:
        path = self.output / f"{uuid.uuid4()}.json"
        with path.open("w", encoding="utf-8") as f:
            json.dump(original.model_dump(), f)
        return current # We pass "current" unchanged so that this transform can be placed anywhere.

Use programmatically#

pipeline = (
    TrajectoryPipeline.start()
    .then(PassthroughSave("debug_trajs/"))
    .then(UnwrapChatMessages())
    .then(TokenizeChats(processor=your_tokenizer))
)

Integrate into config system#

Create a config file at configs/traj_buffer/transforms/passthrough_save.yaml:

passthrough_save:
  _target_: agoge.traj_transforms.PassthroughSave
  output_dir: ${paths.output_dir}/trajectories/

Then use it in your entrypoint config:

defaults:
  - traj_buffer: base
  - traj_buffer/transforms:
    - passthrough_save
    - unwrap_chat_messages

Or override from CLI:

uv run src/agoge/entrypoints/rl.py \
  'traj_buffer/transforms=[passthrough_save]' \
  'traj_buffer.transforms.passthrough_save.output_dir=/tmp/my_trajs/'

6) Recap#

  • Input: one Trajectory.
  • TrajectoryPipeline: series of typed transforms.
  • Output: list of training items (e.g., list of dictionary with {input_ids, labels}).
  • Buffer: iterates items and feeds the training loader.