Testing Agoge#
(5 min) Please read these general testing scope, guidelines and structure, the rest as you need it.
See nomenclature for definitions.
This is a researchy project, but will have a good number of engineering bits and pieces. Mainly we expect it to grow in size beyond manageable for a single person to regress test (manual, iterative, additive, has to be done over and over again to ensure the system works). This will add overhead, but unless you are doubling your effort you should not question it, it is worth it in the long run.
Personal motivation: it is a way to preserve sanity and peace of mind, think of it as part of your wellbeing program - you are given time to increase your test coverage and preserve sanity. This is a skill that needs practice, if something seems untestable, unless it is non-deterministic, ask a colleague on a different perspective.
Scope#
The scope of this testing suite is to test Agoge - the deterministic parts, and its interactions with the wider world. This is not meant to be a benchmark or evaluation. Those are isolated efforts, whose parts can be tested, e.g. that a driver clicks a UI element, that encoding/collation works, that integration with a 3rd party library/package/framework works (Chromium, DeepSpeed, PyTorch, Triton, etc.). Benchmarks of agoge (say SFT), some custom layer runs in x/y memory/time complexity, same with LLM evaluations, are out of scope.
Guidance#
When writing a piece of software you should be thinking, in order of priority:
- Do I expect this to work?
- Write a test for it. Assume anything untested is either broken or will break.
- Untested may be acceptable for temporary state and local development, but should not be present in
main- relied upon by others.
- How do I test this?
- Great question to ask at implementation/planning time. Often times hard to write tests are caused by the code style chosen. It is a good idea to think about a structure that is easier to test. It tends to increase global complexity (say more individual pieces), but more importantly it decreases the local complexity, which is good for reducing contextual load for extension/maintenance.
- How do I phrase the test into 3 sections, setup/act/assert? (see structure)
- How long do I expect this to live for (what is the lifetime)?
- Determines the scope of the test.
- For example, if it is a new feature it is probably not worth writing an integration test first. You should focus on the core functionality and write a unit test to test in isolation as much as possible.
- Can I break this down into multiple tests?
- Test one thing at a time, independent tests can be parallelized. Dependent tests should have mocks or stubs. (see nomenclature)
- Use parametrized tests where possible, easy to define a test-matrix
- Many tests are expected, but redundancy and scope of change needs to be minimized
- Use test fixtures to share parametrization (scopes: test suite, test collection, or just a test decorator)
- What do I call the test to recognize intent and be able to conveniently execute it?
- Give semantic names, under a collection (class) describe in name what it is doing in short form.
- It is a good idea to give the test a canonical name as it helps with test filtering.
- For example
test_fwd_chunk_numeric_one_vs_multiple_chunkstest_fwd_chunk_numeric_single_chunks
- What is the execution time?
- GPU tests will fundamentally have a longer overhead than fast CPU tests, or CPU equivalents.
- You should support your development with fast tests where possible, write a CPU equivalent if feasible, or at least limit the scope of the GPU test.
- Use pytest markers to explicitly differentiate between fast and slow tests, i.e.
@pytest.mark.slow,@pytest.mark.gpu,@pytest.mark.e2e(subject to change).
- Am I testing an implementation detail or the interface?
- Writing a test means we are locking in an API. Tests increase the effort it takes to change something. Whenever possible test the abstraction/API and not the implementation detail.
- This is intentionally set to be the last point as this is the most context-dependent. In test driven development (TDD) you would first write your intention and make sure the behavior is encoded in a verifiably failing test (red), then you implement the code that makes the test pass (green), then you make it pretty/fast (refactor).
- Sometimes I want to test the detail as a way of developing a complex feature (TDD), but I have to expect that I will have to rewrite the tests as soon as some detail changes - throwaway tests are fine situationally.
Structure#
This is mandatory for any new tests. A test should contain 3 distinct sections:
#setup, (always denote, content is optional as none might be necessary or fixtures provide it on the input, be explicit), sets the context, variables, everything that is necessary for the execution#act, actions on the context and executes the body of the test producing values to test for#assert, execute the actual testing conditions and make the test fail giving informative errors/messages
def my_test():
# setup
...
# act
...
# assert
...
Directory Organization#
tests/
├── unit/ # Fast, isolated tests (~seconds)
├── integration/ # Component/3rd-party interaction tests (~minutes)
├── data/ # Testing/mock data, subsets etc.
└── conftest.py # Pytest configuration, fixtures, utils
Test Directories and Environment Variables#
When running tests, the test harness creates temporary directories for each test session and test run. By default, these directories are removed after test completion, but you can preserve them for debugging by setting the KEEP_TEST_DIRS environment variable:
KEEP_TEST_DIRS=1 pytest tests/integration/test_rl.py
Test directories follow this structure:
- A session directory is created at
$TMPDIR/pt<timestamp> - Each test gets a subdirectory named with a short hash of the test name
- Log files (including
agoge.jsonl) are stored in these directories
This is particularly useful when running with pytest-xdist, as stdin is not always available. With proper fixtures fixtures each worker process creates its own session directory. The logs will accumulate in separate directories, making it easier to debug failed tests/runs.
Test Fixtures#
There are other useful fixtures, but two main ones are automatically injected fromconftest.py, for logging/config resolution, same interface:
| Fixture | Use when |
|---|---|
test_env_base |
No Ray needed (transforms, configs, local components and logging) |
isolated_ray |
Ray actors/tasks, distributed logging |
# they both resolve the configs
cfg = fixture("rl", ["model=qwen2.5vl-3B"]) # Config name + overrides
cfg = fixture(OmegaConf.create({...})) # Direct DictConfig
Auto-enforced: test paths, DEBUG logging, wandb disabled, run_id="run".
Examples#
Unit test CPU - agoge
from pathlib import Path
import pytest
from agoge import wds_from_traj
@pytest.fixture
def jpg_bytes():
"""Load JPG test image bytes."""
jpg_path = Path("tests/data/test_image1.jpg")
with jpg_path.open("rb") as f:
return f.read()
@pytest.fixture
def png_bytes():
"""Load PNG test image bytes."""
png_path = Path("tests/data/test_image1.png")
with png_path.open("rb") as f:
return f.read()
class TestGuessExtNew:
"""Test image format detection from byte content."""
def test_guess_ext_jpg(self, jpg_bytes):
"""Test that JPEG files are correctly identified."""
# setup
image_bytes = jpg_bytes
# act
result = wds_from_traj._guess_ext(image_bytes)
# assert
assert result == "jpg"
def test_guess_ext_png(self, png_bytes):
"""Test that PNG files are correctly identified."""
# setup
image_bytes = png_bytes
# act
result = wds_from_traj._guess_ext(image_bytes)
# assert
assert result == "png"
@pytest.mark.parametrize(
"invalid_bytes",
[
b"not an image format",
b"",
b"\x00\x00\x00\x00",
b"random data here",
],
)
def test_guess_ext_invalid_formats(self, invalid_bytes):
"""Test that invalid/unknown formats raise ValueError."""
# setup
# act & assert
with pytest.raises(ValueError, match="Unknown image format"):
wds_from_traj._guess_ext(invalid_bytes)
Unit test GPU#
TODO
Integration test
class TestRLParamSyncModes:
"""Focused tests for parameter syncing modes with deterministic test environment."""
def parse_param_sync_events(self, log_file: Path) -> ParamSyncEvents:
"""Parse parameter sync events from JSONL log file.
Args:
log_file: Path to agoge.jsonl log file
Returns:
ParamSyncEvents containing parsed start and complete events
"""
import json
import re
start_events = []
complete_events = []
with log_file.open() as f:
for line in f:
try:
log_entry = json.loads(line)
message = log_entry.get("message", "")
if "PARAM_SYNC_START" in message:
# Parse: "PARAM_SYNC_START step=2 mode=original zero_stage=2"
match = re.search(r"step=(\d+) mode=(\w+) zero_stage=(\d+)", message)
if match:
start_events.append(
ParamSyncStartEvent(
step=int(match.group(1)),
mode=match.group(2),
zero_stage=int(match.group(3)),
worker_id=log_entry.get("worker_id", ""),
)
)
elif "PARAM_SYNC_COMPLETE" in message:
# Parse: "PARAM_SYNC_COMPLETE step=2 mode=original num_updates=824"
match = re.search(r"step=(\d+) mode=(\w+) num_updates=(\d+)", message)
if match:
complete_events.append(
ParamSyncCompleteEvent(
step=int(match.group(1)),
mode=match.group(2),
num_updates=int(match.group(3)),
worker_id=log_entry.get("worker_id", ""),
)
)
except (json.JSONDecodeError, KeyError):
continue
return ParamSyncEvents(start=start_events, complete=complete_events)
def verify_inference_manager_produces_valid_output(self, test_prompt: str = "say 'hello'"):
"""
Verify inference manager is alive and produces valid output after training.
Args:
test_prompt: Prompt to send to inference manager
Returns:
bool: True if inference manager responds correctly
Raises:
AssertionError: If inference manager fails to respond or output is invalid
"""
# Get inference manager actor
manager = ray.get_actor("inference_manager")
# Create simple chat for generation
from agoge.schema import Chat
from agoge.schema.inference_request import InferenceRequest
from agoge.schema.msgs import UserMessage
chat = Chat(messages=[UserMessage(content=test_prompt)])
# Create inference request
request = InferenceRequest(messages=chat, model="intraining", max_tokens=50, temperature=0.7)
# Generate response
completion = ray.get(manager.create_chat_completion.remote(request))
# Verify response is valid
assert completion is not None, "Inference manager returned None"
assert len(completion.choices) > 0, "Inference manager returned no choices"
response_text = completion.choices[0].message.content
assert response_text is not None, "Response content is None"
assert len(response_text) > 0, "Inference manager returned empty response"
assert "hello" in response_text.lower(), f"Expected 'hello' in response, got: {response_text}"
return True
# takes about 4 minutes
@pytest.mark.slow
@pytest.mark.gpu(num_gpu=4)
@pytest.mark.integration
@pytest.mark.timeout(360, method="thread") # Uses thread method to break free of hanging C-level actors
def test_param_sync_nonzero3_mode_with_test_env(
self, isolated_ray, test_working_dir: Path, num_gpu: int
):
"""
Test nonzero3_low_mem parameter sync mode with gradient accumulation.
Uses TestEnv (after one step always returns reward=1.0) and simple transform pipeline.
Tests parameter syncing with gradient accumulation to verify that param_sync_every_n_steps
correctly counts gradient updates (not microbatch steps).
Configuration:
- gradient_accumulation_steps=4 (4 microbatches per gradient update)
- param_sync_every_n_steps=2 (sync every 2 gradient updates)
Expected behavior:
- Gradient updates at microbatch steps: 3, 7, 11, 15, ...
- Parameter syncs at microbatch steps: 7, 15, 23, ...
Verifies:
- Training executes enough iterations to trigger multiple syncs
- Parameter sync occurs at correct microbatch steps (7, 15, ...)
- Sync respects gradient_accumulation_steps (not syncing every N microbatches)
- Inference manager produces valid output after training
"""
# setup
from agoge.entrypoints.rl import start
# Configure test with isolated_ray fixture
cfg = isolated_ray(
"rl", # Config name
[ # Overrides list
"paths=gcs",
"model=qwen2.5vl-3B",
"agent=rl_agent",
"environment=test", # TestEnv - always reward=1.0
"task_loader=default", # Simple default task
"chat_template=null",
# Training config (NO ZeRO-3) with gradient accumulation
"trainer.scaling_config.num_workers=2",
"deepspeed.train_batch_size=8", # 2 workers * 1 batch/gpu * 4 grad_accum = 8
"deepspeed.train_micro_batch_size_per_gpu=1",
"deepspeed.gradient_accumulation_steps=4", # Test gradient accumulation
"deepspeed/zero_optimization=zero2",
# Simplified transform pipeline (no LLM evaluation)
"traj_buffer/transforms=[reinforce_buffer,chat2list]",
# Inference config
"inference_manager.models.intraining.num_workers=2",
"inference_manager.models.intraining.worker_cfg.engine_args.tensor_parallel_size=1",
"inference_manager.models.intraining.worker_cfg.engine_args.gpu_memory_utilization=0.75",
# Weight sync config - NONZERO3 MODE
"param_sync_every_n_steps=2", # Sync every 2 gradient updates
"param_sync_mode=nonzero3_low_mem",
# 80 episodes / 2 workers = 40 episodes ensures enough iterations to test gradient accumulation
# With 40 episodes, we get ~40 microbatch steps
# = 10 gradient updates (steps 3, 7, 11, 15, 19, ...)
# = 5 param syncs (steps 7, 15, 23, 31, ...)
"num_episodes=80",
"num_runners=1",
]
)
# act
start(cfg)
# assert
log_file = Path(cfg.logging.log_filename)
assert log_file.exists(), f"Log file {log_file} does not exist"
# Parse and verify parameter sync events
sync_events = self.parse_param_sync_events(log_file)
# Verify at least one sync occurred
assert len(sync_events.start) > 0, "No PARAM_SYNC_START events found in logs"
assert len(sync_events.complete) > 0, "No PARAM_SYNC_COMPLETE events found in logs"
# Get unique sync steps (multiple workers log the same step)
sync_steps = sorted({event.step for event in sync_events.start})
# Verify sync happened at expected microbatch steps
# With gradient_accumulation_steps=4 and param_sync_every_n_steps=2:
# - Gradient updates at steps: 3, 7, 11, 15, 19, ...
# - Syncs every 2nd gradient update: steps 7, 15, ...
expected_sync_steps = [7, 15]
for expected_step in expected_sync_steps:
assert expected_step in sync_steps, (
f"Expected sync at microbatch step '{expected_step}', got syncs at steps: '{sync_steps}'"
)
# Verify correct mode was used for all syncs
for event in sync_events.start:
assert event.mode == "nonzero3_low_mem", f"Expected mode 'nonzero3_low_mem', got '{event.mode}'"
for event in sync_events.complete:
assert event.mode == "nonzero3_low_mem", f"Expected mode 'nonzero3_low_mem', got '{event.mode}'"
# Verify ZeRO stage is correct (not ZeRO-3 for this test)
for event in sync_events.start:
assert event.zero_stage == 2, f"Expected zero_stage 2, got {event.zero_stage}"
# Verify each START has a matching COMPLETE (same step, from same or different worker)
start_steps = [event.step for event in sync_events.start]
complete_steps = [event.step for event in sync_events.complete]
for step in set(start_steps):
assert complete_steps.count(step) >= start_steps.count(step), (
f"Step {step} has {start_steps.count(step)} START but {complete_steps.count(step)} COMPLETE events"
)
# Verify num_updates is reasonable (should be > 0)
for event in sync_events.complete:
assert event.num_updates > 0, f"Expected num_updates > 0, got {event.num_updates}"
# Verify inference manager produces valid output
self.verify_inference_manager_produces_valid_output()
Data test#
TODO
Helpful pytest docs#
Mark#
Standard markers#
| Marker | Purpose | Example |
|---|---|---|
@pytest.mark.gpu(num_gpu) |
Requires (n) GPUs | Training tests |
@pytest.mark.fast |
Takes <=10s | Local function test |
@pytest.mark.slow |
Takes >10s | Large dataset tests |
@pytest.mark.e2e |
End-to-end test | Full pipeline runs |
@pytest.mark.integration |
Multi-component test, external systems | Agent-Environment |
@pytest.mark.skipif(...) |
Conditional skip | Skip if no CUDA (see) |
@pytest.mark.skip(...) |
Unconditional skip | Skip (give reason!, see) |
@pytest.mark.gpu(num_gpu) is special as it has a global fixture set up that retrieves the keyword argument, you can then get it on the test's kwargs (if none is set, defaults to 1).
See pytest --markers for available and registered /pyproject.toml->[tool.pytest.ini_options->markers] markers
creating marks (see)#
There exist built-in markers, the custom markers will be built dynamically and can be programmatically referenced. Error will be thrown if an unregistered marker is used.
@pytest.mark.gpu
@pytest.mark.my_marker(arg1=2)
can then be then referenced within test (see)
Filter#
select tests based on their name substring (see)#
pytest -k <substring>
select tests based on a marker (see)#
pytest -v -m "not gpu" # run all cpu tests
Running Tests with pytest-xdist#
For running integration tests that may hang due to C-level actors not responding properly, we use pytest-xdist with -n 1 (an xdist argument specifying the number of worker processes) to run tests sequentially in a separate process:
# Run tests sequentially with xdist (one worker process)
pytest tests/integration/ -v -n 1
When using pytest-xdist, stdin is not fully available to the test processes, which can limit interactive debugging. For debugging failing tests, it's recommended to:
- Use
-kto select the specific failing test - Run without xdist (omit the
-nargument) for full stdin access - Use
-sto show stdout output - Set
KEEP_TEST_DIRS=1to preserve test directories for log examination
# For debugging a specific test with full output (without xdist)
KEEP_TEST_DIRS=1 pytest tests/integration/test_rl.py::TestRLParamSyncModes::test_param_sync_nonzero3_mode_with_test_env -vs
# For finding and examining test logs afterward
ls $TMPDIR/pt*
Integration tests use @pytest.mark.timeout(seconds, method="thread") to break free of hanging C-level actors by calling os._exit(1) on timeout, which terminates the entire pytest process. This is why pytest-xdist is used - to ensure that even if one test hangs and terminates, the other tests will still run in their separate worker processes.
Debug#
show slowest tests#
pytest tests/unit/test_wds_from_traj.py -vv --durations=0
or show 5 slowest
pytest tests/unit/test_wds_from_traj.py -vv --durations=5
GitHub Action integration#
TODO
Notes#
Test may appear to execute slow, even for the "fast cpu-only". There is a substantial overhead in importing ray, torch, transformers, hydra, etc., that we pay at test collection time - pytest crawls the test directories and loads that module tree, even if the tests are not selected for execution. This is in the 10s of seconds.
Nomenclature#
| Term | Definition |
|---|---|
| Mock | A test double that records interactions and allows verification of calls. Mocks can verify that specific methods were called with expected parameters. Used when you need to assert that certain interactions occurred. |
| Stub | A test double that provides predefined responses to method calls. Stubs return canned data without any verification logic. Used when you need to control indirect inputs to the system under test. |
| E2E | End-to-End test. Validates complete system workflows from start to finish in a production-like environment. Tests all components working together as a user/agent would experience them. Typically slow but high confidence. Example: full training pipeline from data load to checkpoint save. |
| Fixture | Reusable test setup/teardown code defined with @pytest.fixture. Can have different scopes (function, class, module, session) to control when setup runs. Used to share common test infrastructure. |
| Parametrize | Pytest decorator @pytest.mark.parametrize that runs the same test with different input values, creating a test matrix. Useful for testing multiple scenarios efficiently. |
LLM instrucitons#
- prefer fixtures shared within a module than class scoped for simplicity of the test collections
- maintain fixtures in their relative order of dependency and presence within file whenever possible
- when specifying an empy setup, no further commentary is necessary