Integration tests — zig/tests/basic.zig
This file is an end-to-end test suite: it exercises the runtime through the public C ABI and the Zig wrapper. Treat it as executable documentation of runtime contracts.
This page uses a textbook style: focused snippets with explanation, plus an appendix with the full source.
Cross-links
- Start here: Source index
- What this tests: C ABI (
include/jzx/jzx.h), Runtime core (src/jzx_runtime.c) - Run it: Installation (
zig build test) - Stress complement: Stress tool (
tools/stress.zig)
Imports and shared helpers
zig/tests/basic.zig#L1-L10const std = @import("std");
const jzx = @import("jzx");
const c = jzx.c;
const posix = std.posix;
const AsyncArgs = struct {
loop: *c.jzx_loop,
actor: c.jzx_actor_id,
payload: *u32,
};
std: Zig standard library, includingstd.testing.jzx: Zig wrapper module underzig/jzx/lib.zig.c = jzx.c: the C ABI import namespace.posix: used for fd helpers in I/O tests.AsyncArgs: arguments passed into helper threads for async-send tests.
A minimal “increment and stop” behavior
zig/tests/basic.zig#L12-L21fn increment_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
const state_ptr = @as(*u32, @ptrFromInt(@intFromPtr(ctx_ptr.state.?)));
if (msg_ptr.data) |data_ptr| {
const value_ptr = @as(*const u32, @ptrFromInt(@intFromPtr(data_ptr)));
state_ptr.* += value_ptr.*;
}
return c.JZX_BEHAVIOR_STOP;
}
This behavior is intentionally small and deterministic:
- It interprets
ctx_ptr.stateas*u32and increments it by the incoming payload*u32. - It returns
JZX_BEHAVIOR_STOP, which makes the actor stop after processing the message.
Why it exists: many tests want “spawn actor → send message → run loop → assert state changed” without having to write a bespoke behavior each time.
Shared state structs (test-owned state)
Many tests in this file use small, test-owned state structs. They are deliberately minimal:
- values are plain integers/bools so assertions are deterministic
- state is passed to the runtime as
void*and recovered via pointer casts in behaviors
TimerState
zig/tests/basic.zig#L23-L26const TimerState = struct {
target: u32,
hits: u32 = 0,
};
target: how many timer events we expect to observe before stopping.hits: counter incremented on each timer message delivery.- Why it exists: it turns “timers are firing” into a concrete, testable condition.
ObserverState
zig/tests/basic.zig#L28-L35const ObserverState = struct {
start_count: u32 = 0,
stop_count: u32 = 0,
mailbox_full_count: u32 = 0,
last_start: c.jzx_actor_id = 0,
last_stop: c.jzx_actor_id = 0,
last_stop_reason: c.jzx_exit_reason = c.JZX_EXIT_NORMAL,
};
This struct is what makes observer callbacks testable:
- counters (
*_count) let tests assert “event happened exactly N times”. last_*fields let tests assert “the event referred to the expected actor”.
Observer hooks used by tests
zig/tests/basic.zig#L37-L42fn observerOnStart(ctx: ?*anyopaque, id: c.jzx_actor_id, name: [*c]const u8) callconv(.c) void {
_ = name;
const state = @as(*ObserverState, @ptrCast(@alignCast(ctx.?)));
state.start_count += 1;
state.last_start = id;
}
zig/tests/basic.zig#L44-L49fn observerOnStop(ctx: ?*anyopaque, id: c.jzx_actor_id, reason: c.jzx_exit_reason) callconv(.c) void {
const state = @as(*ObserverState, @ptrCast(@alignCast(ctx.?)));
state.stop_count += 1;
state.last_stop = id;
state.last_stop_reason = reason;
}
zig/tests/basic.zig#L51-L55fn observerOnMailboxFull(ctx: ?*anyopaque, target: c.jzx_actor_id) callconv(.c) void {
const state = @as(*ObserverState, @ptrCast(@alignCast(ctx.?)));
state.mailbox_full_count += 1;
_ = target;
}
These callbacks accumulate counts and last-seen ids into a test-owned ObserverState.
Why it exists: observer behavior is part of the runtime’s contract, so tests assert it fires correctly without requiring logs.
Timer behavior used by tests
zig/tests/basic.zig#L57-L66fn timer_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*TimerState, @ptrCast(@alignCast(ctx_ptr.state.?)));
_ = msg;
state.hits += 1;
if (state.hits >= state.target) {
return c.JZX_BEHAVIOR_STOP;
}
return c.JZX_BEHAVIOR_OK;
}
This behavior counts timer hits and stops after reaching target.
Why it exists: timer tests need an actor that can receive repeated timer messages and stop deterministically.
Tests (complete suite)
Most tests follow a small harness pattern:
- Create a loop (
jzx.Loop.create). - Spawn actor(s) or supervisor(s).
- Queue work (message, timer, I/O watch, or
jzx_send_async). - Run the loop (
loop.run()). - Assert on observable outcomes (state changes, error codes, observer counters, ordering, fairness).
Below are all tests in zig/tests/basic.zig, grouped by subsystem.
Messaging and actor ids
Message delivery: actor receives and processes a message
zig/tests/basic.zig#L68-L89test "actor receives and processes a message" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 5;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 1));
try loop.run();
try std.testing.expectEqual(@as(u32, 5), state);
try std.testing.expectEqual(c.JZX_ERR_NO_SUCH_ACTOR, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 1));
}
What it’s asserting:
c.jzx_spawncreates an actor and returns an id.c.jzx_sendenqueues a message.loop.run()drives delivery and actor teardown (becauseincrement_behaviorreturnsSTOP).- After stop, sending to the same id returns
JZX_ERR_NO_SUCH_ACTOR(stale id is rejected).
Actor id generations: stale ids are rejected after slot reuse
zig/tests/basic.zig#L91-L139test "actor id generations reject stale ids after slot reuse" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state_a: u32 = 0;
var opts_a = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state_a,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_a: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_a, &actor_a));
var payload: u32 = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_a, &payload, @sizeOf(u32), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 1), state_a);
var state_b: u32 = 0;
var opts_b = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state_b,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_b: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_b, &actor_b));
try std.testing.expect(actor_b != actor_a);
const raw_a: u64 = @intCast(actor_a);
const raw_b: u64 = @intCast(actor_b);
const idx_a: u32 = @truncate(raw_a);
const idx_b: u32 = @truncate(raw_b);
const gen_a: u32 = @intCast(raw_a >> 32);
const gen_b: u32 = @intCast(raw_b >> 32);
try std.testing.expectEqual(idx_a, idx_b);
try std.testing.expectEqual(gen_a + 1, gen_b);
try std.testing.expectEqual(c.JZX_ERR_NO_SUCH_ACTOR, c.jzx_send(loop.ptr, actor_a, &payload, @sizeOf(u32), 0));
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_b, &payload, @sizeOf(u32), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 1), state_b);
}
What it’s asserting:
- Actor ids are not simple integers; they include a generation.
- When an actor slot is reused:
- the index stays the same
- the generation increments
- A stale id (old generation) is rejected with
JZX_ERR_NO_SUCH_ACTOR.
Backpressure: mailbox full returns error
zig/tests/basic.zig#L141-L161test "mailbox full returns error" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try std.testing.expectEqual(c.JZX_ERR_MAILBOX_FULL, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try loop.run();
}
What it’s asserting:
- Mailboxes are bounded (capacity is enforced).
- With
mailbox_cap = 1, the second send returnsJZX_ERR_MAILBOX_FULL.
Observability (observer hooks)
zig/tests/basic.zig#L163-L200test "observer hooks receive lifecycle + mailbox full" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var obs_state = ObserverState{};
var obs = c.jzx_observer{
.on_actor_start = observerOnStart,
.on_actor_stop = observerOnStop,
.on_actor_restart = null,
.on_supervisor_escalate = null,
.on_mailbox_full = observerOnMailboxFull,
};
c.jzx_loop_set_observer(loop.ptr, &obs, @ptrCast(&obs_state));
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1,
.name = "observer-actor",
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try std.testing.expectEqual(c.JZX_ERR_MAILBOX_FULL, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 1), obs_state.start_count);
try std.testing.expectEqual(@as(u32, 1), obs_state.stop_count);
try std.testing.expectEqual(@as(u32, 1), obs_state.mailbox_full_count);
try std.testing.expectEqual(actor_id, obs_state.last_start);
try std.testing.expectEqual(actor_id, obs_state.last_stop);
try std.testing.expectEqual(@as(c.jzx_exit_reason, c.JZX_EXIT_NORMAL), obs_state.last_stop_reason);
}
What it’s asserting:
- Observer callbacks fire in the expected places:
- start (spawn)
- mailbox full (second send)
- stop (actor teardown)
- The observer receives the correct actor id and exit reason.
Async send (cross-thread enqueue)
Async send tests use a helper thread that calls jzx_send_async.
zig/tests/basic.zig#L202-L204fn async_sender(args: AsyncArgs) void {
_ = c.jzx_send_async(args.loop, args.actor, args.payload, @sizeOf(u32), 2);
}
Key detail:
- The return value is ignored because these tests are asserting the delivery behavior of async sends, not the enqueue status.
FIFO test scaffolding: SeqState + seq_behavior
Some tests need to assert ordering. The simplest deterministic way is:
- send a known sequence of
u32values - have the actor verify it receives
0,1,2,...in order
zig/tests/basic.zig#L206-L210const SeqState = struct {
expected: u32,
remaining: u32,
ok: bool = true,
};
zig/tests/basic.zig#L212-L231fn seq_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
const state = @as(*SeqState, @ptrCast(@alignCast(ctx_ptr.state.?)));
if (state.remaining == 0) {
return c.JZX_BEHAVIOR_STOP;
}
if (msg_ptr.data == null) {
state.ok = false;
return c.JZX_BEHAVIOR_STOP;
}
const value_ptr: *const u32 = @ptrCast(@alignCast(msg_ptr.data.?));
if (value_ptr.* != state.expected) {
state.ok = false;
return c.JZX_BEHAVIOR_STOP;
}
state.expected += 1;
state.remaining -= 1;
return if (state.remaining == 0) c.JZX_BEHAVIOR_STOP else c.JZX_BEHAVIOR_OK;
}
This actor stops as soon as it detects a mismatch, which keeps failures crisp and deterministic.
async send dispatches message
zig/tests/basic.zig#L233-L258test "async send dispatches message" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 7;
var thread = try std.Thread.spawn(.{}, async_sender, .{AsyncArgs{
.loop = loop.ptr,
.actor = actor_id,
.payload = &payload,
}});
thread.join();
try loop.run();
try std.testing.expectEqual(@as(u32, 7), state);
}
What it’s asserting:
- A message enqueued via
jzx_send_asyncis delivered when the loop runs. - The delivery path is the same as
jzx_sendfrom the actor’s perspective.
async send wakes blocking loop
zig/tests/basic.zig#L260-L296test "async send wakes blocking loop" {
var cfg: c.jzx_config = undefined;
c.jzx_config_init(&cfg);
cfg.io_poll_timeout_ms = 5000;
var loop = try jzx.Loop.create(cfg);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var runner = try std.Thread.spawn(.{}, struct {
fn run(lp: *jzx.Loop) void {
_ = lp.run() catch {};
}
}.run, .{&loop});
std.Thread.sleep(5 * std.time.ns_per_ms);
var payload: u32 = 9;
const start_ms = @as(u64, @intCast(std.time.milliTimestamp()));
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_async(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
runner.join();
const end_ms = @as(u64, @intCast(std.time.milliTimestamp()));
try std.testing.expect(end_ms - start_ms < 1000);
try std.testing.expectEqual(@as(u32, 9), state);
}
What it’s asserting:
- With a long I/O poll timeout, the loop thread may block in the backend.
- An async send triggers a wakeup so delivery happens promptly (not after the full timeout).
async send preserves FIFO ordering
zig/tests/basic.zig#L298-L342test "async send preserves FIFO ordering" {
const n: u32 = 512;
var cfg: c.jzx_config = undefined;
c.jzx_config_init(&cfg);
cfg.io_poll_timeout_ms = 5000;
var loop = try jzx.Loop.create(cfg);
defer loop.deinit();
var state = SeqState{ .expected = 0, .remaining = n };
var opts = c.jzx_spawn_opts{
.behavior = seq_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1024,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var runner = try std.Thread.spawn(.{}, struct {
fn run(lp: *jzx.Loop) void {
_ = lp.run() catch {};
}
}.run, .{&loop});
std.Thread.sleep(5 * std.time.ns_per_ms);
var payloads = [_]u32{0} ** n;
var sender = try std.Thread.spawn(.{}, struct {
fn run(lp: *c.jzx_loop, id: c.jzx_actor_id, data: []u32) void {
for (data, 0..) |*item, idx| {
item.* = @as(u32, @intCast(idx));
_ = c.jzx_send_async(lp, id, item, @sizeOf(u32), 0);
}
}
}.run, .{ loop.ptr, actor_id, payloads[0..] });
sender.join();
runner.join();
try std.testing.expect(state.ok);
try std.testing.expectEqual(@as(u32, 0), state.remaining);
}
What it’s asserting:
- The async queue preserves enqueue order for a single producer thread.
- The runtime does not reorder messages during internal drain/dispatch.
Timers (delayed enqueue)
timer delivers message
zig/tests/basic.zig#L344-L366test "timer delivers message" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 3;
var timer_id: c.jzx_timer_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 5, &payload, @sizeOf(u32), 9, &timer_id));
try loop.run();
try std.testing.expectEqual(@as(u32, 3), state);
try std.testing.expectEqual(c.JZX_ERR_TIMER_INVALID, c.jzx_cancel_timer(loop.ptr, timer_id));
}
What it’s asserting:
jzx_send_afterdelivers a delayed message.- After the actor stops, canceling the timer id reports
JZX_ERR_TIMER_INVALID(no pending timer).
cancelled timer does not fire
zig/tests/basic.zig#L368-L391test "cancelled timer does not fire" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 11;
var timer_id: c.jzx_timer_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 5, &payload, @sizeOf(u32), 4, &timer_id));
try std.testing.expectEqual(c.JZX_OK, c.jzx_cancel_timer(loop.ptr, timer_id));
_ = c.jzx_actor_stop(loop.ptr, actor_id);
try loop.run();
try std.testing.expectEqual(@as(u32, 0), state);
}
What it’s asserting:
- Canceling a pending timer prevents delivery.
timer drop when actor stops
zig/tests/basic.zig#L393-L414test "timer drop when actor stops" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 5;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 1, &payload, @sizeOf(u32), 0, null));
try std.testing.expectEqual(c.JZX_OK, c.jzx_actor_stop(loop.ptr, actor_id));
try loop.run();
try std.testing.expectEqual(@as(u32, 0), state);
}
What it’s asserting:
- Stopping an actor prevents “late timer delivery” to a dead actor.
many timers fire
zig/tests/basic.zig#L416-L441test "many timers fire" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
const timer_count: u32 = 32;
var timer_state = TimerState{ .target = timer_count };
var opts = c.jzx_spawn_opts{
.behavior = timer_behavior,
.state = &timer_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var timer_ids = [_]c.jzx_timer_id{0} ** timer_count;
var payloads = [_]u32{0} ** timer_count;
for (&timer_ids, 0..) |tid_ptr, idx| {
payloads[idx] = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 1, &payloads[idx], @sizeOf(u32), 0, tid_ptr));
}
try loop.run();
try std.testing.expectEqual(timer_count, timer_state.hits);
}
What it’s asserting:
- The timer thread can enqueue many timers and deliver them all.
- The actor can count and stop deterministically (
TimerState.target).
timer delivery preserves enqueue order
zig/tests/basic.zig#L443-L473test "timer delivery preserves enqueue order" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
const timer_count: u32 = 32;
var state = SeqState{
.expected = 0,
.remaining = timer_count,
.ok = true,
};
var opts = c.jzx_spawn_opts{
.behavior = seq_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1024,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payloads = [_]u32{0} ** timer_count;
for (&payloads, 0..) |*item, idx| {
item.* = @as(u32, @intCast(idx));
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 1, item, @sizeOf(u32), 0, null));
}
try loop.run();
try std.testing.expect(state.ok);
try std.testing.expectEqual(@as(u32, 0), state.remaining);
}
What it’s asserting:
- Timer-enqueued messages preserve the order they are enqueued into the timer list.
Scheduler fairness (ping pong)
This test uses a tiny “ping pong” behavior: each actor sends work to its partner.
zig/tests/basic.zig#L475-L480const PingPongState = struct {
loop: *c.jzx_loop,
partner: *?c.jzx_actor_id,
remaining: u32,
hits: u32,
};
zig/tests/basic.zig#L482-L496fn pingPongBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*PingPongState, @ptrCast(@alignCast(ctx_ptr.state.?)));
_ = msg;
state.hits += 1;
if (state.remaining == 0) {
return c.JZX_BEHAVIOR_STOP;
}
state.remaining -= 1;
if (state.partner.*) |partner_id| {
var payload: u32 = 1;
_ = c.jzx_send(state.loop, partner_id, &payload, @sizeOf(u32), 0);
}
return c.JZX_BEHAVIOR_OK;
}
zig/tests/basic.zig#L498-L523test "ping pong actors share work fairly" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var id_a: c.jzx_actor_id = 0;
var id_b: c.jzx_actor_id = 0;
var partner_a: ?c.jzx_actor_id = 0;
var partner_b: ?c.jzx_actor_id = 0;
var state_a = PingPongState{ .loop = loop.ptr, .partner = &partner_a, .remaining = 10, .hits = 0 };
var state_b = PingPongState{ .loop = loop.ptr, .partner = &partner_b, .remaining = 10, .hits = 0 };
var opts_a = c.jzx_spawn_opts{ .behavior = pingPongBehavior, .state = &state_a, .supervisor = 0, .mailbox_cap = 0, .name = null };
var opts_b = c.jzx_spawn_opts{ .behavior = pingPongBehavior, .state = &state_b, .supervisor = 0, .mailbox_cap = 0, .name = null };
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_a, &id_a));
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_b, &id_b));
partner_a = id_b;
partner_b = id_a;
var init: u32 = 1;
_ = c.jzx_send(loop.ptr, id_a, &init, @sizeOf(u32), 0);
_ = c.jzx_send(loop.ptr, id_b, &init, @sizeOf(u32), 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 11), state_a.hits);
try std.testing.expectEqual(@as(u32, 11), state_b.hits);
}
What it’s asserting:
- The scheduler doesn’t starve one runnable actor while another is “chatty”.
- With the configured budgets, both actors make progress.
Typed actor wrapper (Zig API surface)
This test exercises jzx.Actor(...) from the Zig wrapper.
zig/tests/basic.zig#L525-L531const CounterState = struct {
total: u32 = 0,
};
const CounterMsg = struct {
value: u32,
};
zig/tests/basic.zig#L533-L537fn counterBehavior(state: *CounterState, msg: *CounterMsg, ctx: jzx.ActorContext) jzx.BehaviorResult {
_ = ctx;
state.total += msg.value;
return .stop;
}
zig/tests/basic.zig#L539-L557test "typed actor increments state" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var counter = CounterState{};
var actor = try jzx.Actor(CounterState, *CounterMsg).spawn(
loop.ptr,
std.heap.c_allocator,
&counter,
&counterBehavior,
.{},
);
defer actor.destroy();
var msg = CounterMsg{ .value = 8 };
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor.getId(), &msg, @sizeOf(CounterMsg), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 8), counter.total);
}
What it’s asserting:
- The Zig wrapper spawns a C actor that calls a typed Zig behavior.
- Typed payload decoding is safe as long as the caller sends the right payload type.
I/O readiness (fd watchers)
I/O tests use a behavior that expects JZX_TAG_SYS_IO and decodes a jzx_io_event.
zig/tests/basic.zig#L559-L573fn io_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
if (msg_ptr.tag == c.JZX_TAG_SYS_IO and msg_ptr.data != null) {
const data_ptr = msg_ptr.data.?;
const state_ptr = @as(*u32, @ptrFromInt(@intFromPtr(ctx_ptr.state.?)));
const event = @as(*c.jzx_io_event, @ptrFromInt(@intFromPtr(data_ptr)));
if ((event.readiness & c.JZX_IO_READ) != 0) {
state_ptr.* += 1;
}
c.jzx_loop_free(ctx_ptr.loop.?, data_ptr);
return c.JZX_BEHAVIOR_STOP;
}
return c.JZX_BEHAVIOR_OK;
}
This behavior demonstrates a critical ownership rule:
- The runtime allocates the
jzx_io_eventpayload. - The actor must free it with
jzx_loop_free(so it goes through the loop allocator).
zig/tests/basic.zig#L575-L578fn pipe_writer(fd: posix.fd_t) void {
const msg = "ping";
_ = posix.write(fd, msg) catch {};
}
io watcher delivers readiness
zig/tests/basic.zig#L580-L608test "io watcher delivers readiness" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = io_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
const pipefds = try posix.pipe();
defer {
posix.close(pipefds[0]);
posix.close(pipefds[1]);
}
try std.testing.expectEqual(c.JZX_OK, c.jzx_watch_fd(loop.ptr, pipefds[0], actor_id, c.JZX_IO_READ));
var writer = try std.Thread.spawn(.{}, pipe_writer, .{pipefds[1]});
writer.join();
try loop.run();
try std.testing.expectEqual(@as(u32, 1), state);
}
What it’s asserting:
jzx_watch_fdregisters interest.- A readiness event is delivered as a system message.
- The owning actor can observe and handle the event.
io rapid watch and unwatch
zig/tests/basic.zig#L610-L642test "io rapid watch and unwatch" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
const pipefds = try posix.pipe();
defer {
posix.close(pipefds[0]);
posix.close(pipefds[1]);
}
for (0..8) |_| {
try std.testing.expectEqual(c.JZX_OK, c.jzx_watch_fd(loop.ptr, pipefds[0], actor_id, c.JZX_IO_READ));
try std.testing.expectEqual(c.JZX_OK, c.jzx_unwatch_fd(loop.ptr, pipefds[0]));
}
try std.testing.expectEqual(c.JZX_OK, c.jzx_watch_fd(loop.ptr, pipefds[0], actor_id, c.JZX_IO_READ));
var writer = try std.Thread.spawn(.{}, pipe_writer, .{pipefds[1]});
writer.join();
try loop.run();
try std.testing.expect(state >= 1);
}
What it’s asserting:
- Rapid register/unregister cycles do not crash or leak.
- After the final watch, readiness delivery still works.
Supervision (restart strategies and intensity)
Supervision tests use “driver” actors that tick themselves until they observe the expected supervisor state.
zig/tests/basic.zig#L644-L646const RestartState = struct {
runs: u32 = 0,
};
zig/tests/basic.zig#L648-L650fn scheduleSelf(loop: *c.jzx_loop, self: c.jzx_actor_id, ms: u32) void {
_ = c.jzx_send_after(loop, self, ms, null, 0, 0, null);
}
zig/tests/basic.zig#L652-L658fn failThenStop(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*RestartState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
return if (state.runs == 1) c.JZX_BEHAVIOR_FAIL else c.JZX_BEHAVIOR_STOP;
}
zig/tests/basic.zig#L660-L666const TransientDriverState = struct {
sup_id: c.jzx_actor_id,
original_child_id: c.jzx_actor_id,
stage: u8 = 0,
ticks: u32 = 0,
timed_out: bool = false,
};
zig/tests/basic.zig#L668-L704fn transientDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*TransientDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var child_id: c.jzx_actor_id = 0;
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &child_id) != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, state.original_child_id, null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (child_id != 0 and child_id != state.original_child_id) {
_ = c.jzx_send(ctx_ptr.loop.?, child_id, null, 0, 0);
state.stage = 2;
}
} else {
if (child_id == 0) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
supervisor restarts transient child once
zig/tests/basic.zig#L706-L758test "supervisor restarts transient child once" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var child_state = RestartState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = failThenStop,
.state = &child_state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var child_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &child_id));
try std.testing.expect(child_id != 0);
var driver_state = TransientDriverState{
.sup_id = sup_id,
.original_child_id = child_id,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = transientDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 2), child_state.runs);
try std.testing.expect(!driver_state.timed_out);
}
What it’s asserting:
- A transient child that fails once is restarted.
- After restart, the child can stop normally (no further restart).
Intensity escalation when a child fails repeatedly
zig/tests/basic.zig#L760-L766fn alwaysFail(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*RestartState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
return c.JZX_BEHAVIOR_FAIL;
}
zig/tests/basic.zig#L768-L773const EscalationObsState = struct {
sup_id: c.jzx_actor_id,
escalations: u32 = 0,
sup_stopped: bool = false,
sup_stop_reason: c.jzx_exit_reason = c.JZX_EXIT_NORMAL,
};
zig/tests/basic.zig#L775-L780fn escalationOnSupervisorEscalate(ctx: ?*anyopaque, supervisor: c.jzx_actor_id) callconv(.c) void {
const state = @as(*EscalationObsState, @ptrCast(@alignCast(ctx.?)));
if (supervisor == state.sup_id) {
state.escalations += 1;
}
}
zig/tests/basic.zig#L782-L788fn escalationOnActorStop(ctx: ?*anyopaque, id: c.jzx_actor_id, reason: c.jzx_exit_reason) callconv(.c) void {
const state = @as(*EscalationObsState, @ptrCast(@alignCast(ctx.?)));
if (id == state.sup_id) {
state.sup_stopped = true;
state.sup_stop_reason = reason;
}
}
zig/tests/basic.zig#L790-L796const EscalationDriverState = struct {
sup_id: c.jzx_actor_id,
obs: *EscalationObsState,
last_child_id: c.jzx_actor_id = 0,
ticks: u32 = 0,
timed_out: bool = false,
};
zig/tests/basic.zig#L798-L834fn escalationDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*EscalationDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.obs.escalations > 0 or state.obs.sup_stopped) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var child_id: c.jzx_actor_id = 0;
const rc = c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &child_id);
if (rc == c.JZX_ERR_NO_SUCH_ACTOR) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (rc != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (child_id != 0 and child_id != state.last_child_id) {
_ = c.jzx_send(ctx_ptr.loop.?, child_id, null, 0, 0);
state.last_child_id = child_id;
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
zig/tests/basic.zig#L836-L897test "supervisor escalates when intensity exceeded" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var child_state = RestartState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = alwaysFail,
.state = &child_state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 2,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var obs_state = EscalationObsState{ .sup_id = sup_id };
var obs = c.jzx_observer{
.on_actor_start = null,
.on_actor_stop = escalationOnActorStop,
.on_actor_restart = null,
.on_supervisor_escalate = escalationOnSupervisorEscalate,
.on_mailbox_full = null,
};
c.jzx_loop_set_observer(loop.ptr, &obs, @ptrCast(&obs_state));
var driver_state = EscalationDriverState{
.sup_id = sup_id,
.obs = &obs_state,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = escalationDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 1), obs_state.escalations);
try std.testing.expect(obs_state.sup_stopped);
try std.testing.expectEqual(@as(c.jzx_exit_reason, c.JZX_EXIT_FAIL), obs_state.sup_stop_reason);
try std.testing.expect(!driver_state.timed_out);
try std.testing.expect(child_state.runs >= 3);
}
What it’s asserting:
- Restart intensity windows are enforced.
- When restarts exceed the configured intensity, the supervisor escalates (and the observer sees it).
Backoff (constant + exponential)
zig/tests/basic.zig#L899-L903const BackoffState = struct {
runs: u32 = 0,
t1_ms: u64 = 0,
t2_ms: u64 = 0,
};
zig/tests/basic.zig#L905-L918fn backoffRecorder(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*BackoffState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
const now_ms = @as(u64, @intCast(std.time.milliTimestamp()));
if (state.runs == 1) {
state.t1_ms = now_ms;
return c.JZX_BEHAVIOR_FAIL;
}
state.t2_ms = now_ms;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
zig/tests/basic.zig#L920-L927const BackoffDriverState = struct {
sup_id: c.jzx_actor_id,
original_child_id: c.jzx_actor_id,
backoff_state: *BackoffState,
stage: u8 = 0,
ticks: u32 = 0,
timed_out: bool = false,
};
zig/tests/basic.zig#L929-L964fn backoffDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*BackoffDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.backoff_state.runs >= 2) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var child_id: c.jzx_actor_id = 0;
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &child_id) != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, state.original_child_id, null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (child_id != 0 and child_id != state.original_child_id) {
_ = c.jzx_send(ctx_ptr.loop.?, child_id, null, 0, 0);
state.stage = 2;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
zig/tests/basic.zig#L966-L1019test "supervisor backoff delays restart" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state = BackoffState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = backoffRecorder,
.state = &state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE, // use supervisor default
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_CONSTANT,
.backoff_delay_ms = 50,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var child_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &child_id));
try std.testing.expect(child_id != 0);
var driver_state = BackoffDriverState{
.sup_id = sup_id,
.original_child_id = child_id,
.backoff_state = &state,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = backoffDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 2), state.runs);
try std.testing.expect(state.t2_ms >= state.t1_ms + 50);
try std.testing.expect(!driver_state.timed_out);
}
zig/tests/basic.zig#L1021-L1034fn backoffRecorderExp(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*BackoffState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
const now_ms = @as(u64, @intCast(std.time.milliTimestamp()));
if (state.runs == 1) {
state.t1_ms = now_ms;
return c.JZX_BEHAVIOR_FAIL;
}
state.t2_ms = now_ms;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
zig/tests/basic.zig#L1036-L1089test "supervisor exponential backoff delays restart" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state = BackoffState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = backoffRecorderExp,
.state = &state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 20, // base delay
.backoff = c.JZX_BACKOFF_EXPONENTIAL,
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_EXPONENTIAL,
.backoff_delay_ms = 20,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var child_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &child_id));
try std.testing.expect(child_id != 0);
var driver_state = BackoffDriverState{
.sup_id = sup_id,
.original_child_id = child_id,
.backoff_state = &state,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = backoffDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 2), state.runs);
try std.testing.expect(state.t2_ms >= state.t1_ms + 30);
try std.testing.expect(!driver_state.timed_out);
}
one_for_all: restart all children when one fails
zig/tests/basic.zig#L1091-L1099const DuoShared = struct {
runs_a: u32 = 0,
runs_b: u32 = 0,
};
const DuoState = struct {
shared: *DuoShared,
is_a: bool,
};
zig/tests/basic.zig#L1101-L1119fn duoBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*DuoState, @ptrCast(@alignCast(ctx_ptr.state.?)));
if (state.is_a) {
state.shared.runs_a += 1;
if (state.shared.runs_a >= 2 and state.shared.runs_b >= 2) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
}
return c.JZX_BEHAVIOR_OK;
} else {
state.shared.runs_b += 1;
if (state.shared.runs_b == 1) {
return c.JZX_BEHAVIOR_FAIL;
}
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
zig/tests/basic.zig#L1121-L1129const OneForAllDriverState = struct {
sup_id: c.jzx_actor_id,
original_a: c.jzx_actor_id,
original_b: c.jzx_actor_id,
stage: u8 = 0,
observed_restart: bool = false,
ticks: u32 = 0,
timed_out: bool = false,
};
zig/tests/basic.zig#L1131-L1168fn oneForAllDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*OneForAllDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var id_a: c.jzx_actor_id = 0;
var id_b: c.jzx_actor_id = 0;
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &id_a) != c.JZX_OK or
c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 1, &id_b) != c.JZX_OK)
{
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, state.original_a, null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, state.original_b, null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (id_a != 0 and id_b != 0 and id_a != state.original_a and id_b != state.original_b) {
state.observed_restart = true;
_ = c.jzx_send(ctx_ptr.loop.?, id_a, null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, id_b, null, 0, 0);
state.stage = 2;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
zig/tests/basic.zig#L1170-L1243test "supervisor one_for_all restarts all children" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var shared = DuoShared{};
var state_a = DuoState{ .shared = &shared, .is_a = true };
var state_b = DuoState{ .shared = &shared, .is_a = false };
var child_spec = [_]c.jzx_child_spec{
.{
.behavior = duoBehavior,
.state = &state_a,
.mode = c.JZX_CHILD_PERMANENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
},
.{
.behavior = duoBehavior,
.state = &state_b,
.mode = c.JZX_CHILD_PERMANENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
},
};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ALL,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var id_a: c.jzx_actor_id = 0;
var id_b: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &id_a));
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 1, &id_b));
try std.testing.expect(id_a != 0);
try std.testing.expect(id_b != 0);
var driver_state = OneForAllDriverState{
.sup_id = sup_id,
.original_a = id_a,
.original_b = id_b,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = oneForAllDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expect(shared.runs_a >= 2);
try std.testing.expect(shared.runs_b >= 2);
try std.testing.expect(driver_state.observed_restart);
try std.testing.expect(!driver_state.timed_out);
}
rest_for_one: restart downstream children only
zig/tests/basic.zig#L1245-L1254const TrioShared = struct {
hits_a: u32 = 0,
hits_b: u32 = 0,
hits_c: u32 = 0,
};
const TrioState = struct {
shared: *TrioShared,
role: enum { A, B, C },
};
zig/tests/basic.zig#L1256-L1274fn trioBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*TrioState, @ptrCast(@alignCast(ctx_ptr.state.?)));
switch (state.role) {
.A => {
state.shared.hits_a += 1;
return c.JZX_BEHAVIOR_OK;
},
.B => {
state.shared.hits_b += 1;
return c.JZX_BEHAVIOR_FAIL; // trigger rest_for_one restart for B and C
},
.C => {
state.shared.hits_c += 1;
return c.JZX_BEHAVIOR_OK;
},
}
}
zig/tests/basic.zig#L1276-L1285const RestForOneDriverState = struct {
sup_id: c.jzx_actor_id,
original_b: c.jzx_actor_id,
original_c: c.jzx_actor_id,
shared: *TrioShared,
stage: u8 = 0,
observed_restart: bool = false,
ticks: u32 = 0,
timed_out: bool = false,
};
zig/tests/basic.zig#L1287-L1332fn restForOneDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*RestForOneDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var ids = [_]c.jzx_actor_id{0} ** 3;
for (&ids, 0..) |*idptr, idx| {
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, idx, idptr) != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, ids[2], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[0], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[1], null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (ids[1] != 0 and ids[2] != 0 and ids[1] != state.original_b and ids[2] != state.original_c) {
state.observed_restart = true;
_ = c.jzx_send(ctx_ptr.loop.?, ids[2], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[0], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[1], null, 0, 0);
state.stage = 2;
}
}
if (state.stage == 2) {
if (state.shared.hits_a >= 2 and state.shared.hits_b >= 2 and state.shared.hits_c >= 2) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
zig/tests/basic.zig#L1334-L1393test "supervisor rest_for_one restarts downstream children" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var shared = TrioShared{};
var state_a = TrioState{ .shared = &shared, .role = .A };
var state_b = TrioState{ .shared = &shared, .role = .B };
var state_c = TrioState{ .shared = &shared, .role = .C };
var child_spec = [_]c.jzx_child_spec{
.{ .behavior = trioBehavior, .state = &state_a, .mode = c.JZX_CHILD_PERMANENT, .mailbox_cap = 0, .restart_delay_ms = 0, .backoff = c.JZX_BACKOFF_NONE, .name = null },
.{ .behavior = trioBehavior, .state = &state_b, .mode = c.JZX_CHILD_PERMANENT, .mailbox_cap = 0, .restart_delay_ms = 0, .backoff = c.JZX_BACKOFF_NONE, .name = null },
.{ .behavior = trioBehavior, .state = &state_c, .mode = c.JZX_CHILD_PERMANENT, .mailbox_cap = 0, .restart_delay_ms = 0, .backoff = c.JZX_BACKOFF_NONE, .name = null },
};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_REST_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var ids = [_]c.jzx_actor_id{0} ** 3;
for (&ids, 0..) |*idptr, idx| {
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, idx, idptr));
try std.testing.expect(idptr.* != 0);
}
var driver_state = RestForOneDriverState{
.sup_id = sup_id,
.original_b = ids[1],
.original_c = ids[2],
.shared = &shared,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = restForOneDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expect(shared.hits_a >= 2);
try std.testing.expect(shared.hits_b >= 2);
try std.testing.expect(shared.hits_c >= 2);
try std.testing.expect(driver_state.observed_restart);
try std.testing.expect(!driver_state.timed_out);
}
Appendix: full file
Show full zig/tests/basic.zig
zig/tests/basic.zig#L1-L1393const std = @import("std");
const jzx = @import("jzx");
const c = jzx.c;
const posix = std.posix;
const AsyncArgs = struct {
loop: *c.jzx_loop,
actor: c.jzx_actor_id,
payload: *u32,
};
fn increment_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
const state_ptr = @as(*u32, @ptrFromInt(@intFromPtr(ctx_ptr.state.?)));
if (msg_ptr.data) |data_ptr| {
const value_ptr = @as(*const u32, @ptrFromInt(@intFromPtr(data_ptr)));
state_ptr.* += value_ptr.*;
}
return c.JZX_BEHAVIOR_STOP;
}
const TimerState = struct {
target: u32,
hits: u32 = 0,
};
const ObserverState = struct {
start_count: u32 = 0,
stop_count: u32 = 0,
mailbox_full_count: u32 = 0,
last_start: c.jzx_actor_id = 0,
last_stop: c.jzx_actor_id = 0,
last_stop_reason: c.jzx_exit_reason = c.JZX_EXIT_NORMAL,
};
fn observerOnStart(ctx: ?*anyopaque, id: c.jzx_actor_id, name: [*c]const u8) callconv(.c) void {
_ = name;
const state = @as(*ObserverState, @ptrCast(@alignCast(ctx.?)));
state.start_count += 1;
state.last_start = id;
}
fn observerOnStop(ctx: ?*anyopaque, id: c.jzx_actor_id, reason: c.jzx_exit_reason) callconv(.c) void {
const state = @as(*ObserverState, @ptrCast(@alignCast(ctx.?)));
state.stop_count += 1;
state.last_stop = id;
state.last_stop_reason = reason;
}
fn observerOnMailboxFull(ctx: ?*anyopaque, target: c.jzx_actor_id) callconv(.c) void {
const state = @as(*ObserverState, @ptrCast(@alignCast(ctx.?)));
state.mailbox_full_count += 1;
_ = target;
}
fn timer_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*TimerState, @ptrCast(@alignCast(ctx_ptr.state.?)));
_ = msg;
state.hits += 1;
if (state.hits >= state.target) {
return c.JZX_BEHAVIOR_STOP;
}
return c.JZX_BEHAVIOR_OK;
}
test "actor receives and processes a message" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 5;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 1));
try loop.run();
try std.testing.expectEqual(@as(u32, 5), state);
try std.testing.expectEqual(c.JZX_ERR_NO_SUCH_ACTOR, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 1));
}
test "actor id generations reject stale ids after slot reuse" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state_a: u32 = 0;
var opts_a = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state_a,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_a: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_a, &actor_a));
var payload: u32 = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_a, &payload, @sizeOf(u32), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 1), state_a);
var state_b: u32 = 0;
var opts_b = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state_b,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_b: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_b, &actor_b));
try std.testing.expect(actor_b != actor_a);
const raw_a: u64 = @intCast(actor_a);
const raw_b: u64 = @intCast(actor_b);
const idx_a: u32 = @truncate(raw_a);
const idx_b: u32 = @truncate(raw_b);
const gen_a: u32 = @intCast(raw_a >> 32);
const gen_b: u32 = @intCast(raw_b >> 32);
try std.testing.expectEqual(idx_a, idx_b);
try std.testing.expectEqual(gen_a + 1, gen_b);
try std.testing.expectEqual(c.JZX_ERR_NO_SUCH_ACTOR, c.jzx_send(loop.ptr, actor_a, &payload, @sizeOf(u32), 0));
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_b, &payload, @sizeOf(u32), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 1), state_b);
}
test "mailbox full returns error" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try std.testing.expectEqual(c.JZX_ERR_MAILBOX_FULL, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try loop.run();
}
test "observer hooks receive lifecycle + mailbox full" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var obs_state = ObserverState{};
var obs = c.jzx_observer{
.on_actor_start = observerOnStart,
.on_actor_stop = observerOnStop,
.on_actor_restart = null,
.on_supervisor_escalate = null,
.on_mailbox_full = observerOnMailboxFull,
};
c.jzx_loop_set_observer(loop.ptr, &obs, @ptrCast(&obs_state));
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1,
.name = "observer-actor",
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try std.testing.expectEqual(c.JZX_ERR_MAILBOX_FULL, c.jzx_send(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 1), obs_state.start_count);
try std.testing.expectEqual(@as(u32, 1), obs_state.stop_count);
try std.testing.expectEqual(@as(u32, 1), obs_state.mailbox_full_count);
try std.testing.expectEqual(actor_id, obs_state.last_start);
try std.testing.expectEqual(actor_id, obs_state.last_stop);
try std.testing.expectEqual(@as(c.jzx_exit_reason, c.JZX_EXIT_NORMAL), obs_state.last_stop_reason);
}
fn async_sender(args: AsyncArgs) void {
_ = c.jzx_send_async(args.loop, args.actor, args.payload, @sizeOf(u32), 2);
}
const SeqState = struct {
expected: u32,
remaining: u32,
ok: bool = true,
};
fn seq_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
const state = @as(*SeqState, @ptrCast(@alignCast(ctx_ptr.state.?)));
if (state.remaining == 0) {
return c.JZX_BEHAVIOR_STOP;
}
if (msg_ptr.data == null) {
state.ok = false;
return c.JZX_BEHAVIOR_STOP;
}
const value_ptr: *const u32 = @ptrCast(@alignCast(msg_ptr.data.?));
if (value_ptr.* != state.expected) {
state.ok = false;
return c.JZX_BEHAVIOR_STOP;
}
state.expected += 1;
state.remaining -= 1;
return if (state.remaining == 0) c.JZX_BEHAVIOR_STOP else c.JZX_BEHAVIOR_OK;
}
test "async send dispatches message" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 7;
var thread = try std.Thread.spawn(.{}, async_sender, .{AsyncArgs{
.loop = loop.ptr,
.actor = actor_id,
.payload = &payload,
}});
thread.join();
try loop.run();
try std.testing.expectEqual(@as(u32, 7), state);
}
test "async send wakes blocking loop" {
var cfg: c.jzx_config = undefined;
c.jzx_config_init(&cfg);
cfg.io_poll_timeout_ms = 5000;
var loop = try jzx.Loop.create(cfg);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var runner = try std.Thread.spawn(.{}, struct {
fn run(lp: *jzx.Loop) void {
_ = lp.run() catch {};
}
}.run, .{&loop});
std.Thread.sleep(5 * std.time.ns_per_ms);
var payload: u32 = 9;
const start_ms = @as(u64, @intCast(std.time.milliTimestamp()));
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_async(loop.ptr, actor_id, &payload, @sizeOf(u32), 0));
runner.join();
const end_ms = @as(u64, @intCast(std.time.milliTimestamp()));
try std.testing.expect(end_ms - start_ms < 1000);
try std.testing.expectEqual(@as(u32, 9), state);
}
test "async send preserves FIFO ordering" {
const n: u32 = 512;
var cfg: c.jzx_config = undefined;
c.jzx_config_init(&cfg);
cfg.io_poll_timeout_ms = 5000;
var loop = try jzx.Loop.create(cfg);
defer loop.deinit();
var state = SeqState{ .expected = 0, .remaining = n };
var opts = c.jzx_spawn_opts{
.behavior = seq_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1024,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var runner = try std.Thread.spawn(.{}, struct {
fn run(lp: *jzx.Loop) void {
_ = lp.run() catch {};
}
}.run, .{&loop});
std.Thread.sleep(5 * std.time.ns_per_ms);
var payloads = [_]u32{0} ** n;
var sender = try std.Thread.spawn(.{}, struct {
fn run(lp: *c.jzx_loop, id: c.jzx_actor_id, data: []u32) void {
for (data, 0..) |*item, idx| {
item.* = @as(u32, @intCast(idx));
_ = c.jzx_send_async(lp, id, item, @sizeOf(u32), 0);
}
}
}.run, .{ loop.ptr, actor_id, payloads[0..] });
sender.join();
runner.join();
try std.testing.expect(state.ok);
try std.testing.expectEqual(@as(u32, 0), state.remaining);
}
test "timer delivers message" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 3;
var timer_id: c.jzx_timer_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 5, &payload, @sizeOf(u32), 9, &timer_id));
try loop.run();
try std.testing.expectEqual(@as(u32, 3), state);
try std.testing.expectEqual(c.JZX_ERR_TIMER_INVALID, c.jzx_cancel_timer(loop.ptr, timer_id));
}
test "cancelled timer does not fire" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 11;
var timer_id: c.jzx_timer_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 5, &payload, @sizeOf(u32), 4, &timer_id));
try std.testing.expectEqual(c.JZX_OK, c.jzx_cancel_timer(loop.ptr, timer_id));
_ = c.jzx_actor_stop(loop.ptr, actor_id);
try loop.run();
try std.testing.expectEqual(@as(u32, 0), state);
}
test "timer drop when actor stops" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payload: u32 = 5;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 1, &payload, @sizeOf(u32), 0, null));
try std.testing.expectEqual(c.JZX_OK, c.jzx_actor_stop(loop.ptr, actor_id));
try loop.run();
try std.testing.expectEqual(@as(u32, 0), state);
}
test "many timers fire" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
const timer_count: u32 = 32;
var timer_state = TimerState{ .target = timer_count };
var opts = c.jzx_spawn_opts{
.behavior = timer_behavior,
.state = &timer_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var timer_ids = [_]c.jzx_timer_id{0} ** timer_count;
var payloads = [_]u32{0} ** timer_count;
for (&timer_ids, 0..) |tid_ptr, idx| {
payloads[idx] = 1;
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 1, &payloads[idx], @sizeOf(u32), 0, tid_ptr));
}
try loop.run();
try std.testing.expectEqual(timer_count, timer_state.hits);
}
test "timer delivery preserves enqueue order" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
const timer_count: u32 = 32;
var state = SeqState{
.expected = 0,
.remaining = timer_count,
.ok = true,
};
var opts = c.jzx_spawn_opts{
.behavior = seq_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 1024,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
var payloads = [_]u32{0} ** timer_count;
for (&payloads, 0..) |*item, idx| {
item.* = @as(u32, @intCast(idx));
try std.testing.expectEqual(c.JZX_OK, c.jzx_send_after(loop.ptr, actor_id, 1, item, @sizeOf(u32), 0, null));
}
try loop.run();
try std.testing.expect(state.ok);
try std.testing.expectEqual(@as(u32, 0), state.remaining);
}
const PingPongState = struct {
loop: *c.jzx_loop,
partner: *?c.jzx_actor_id,
remaining: u32,
hits: u32,
};
fn pingPongBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*PingPongState, @ptrCast(@alignCast(ctx_ptr.state.?)));
_ = msg;
state.hits += 1;
if (state.remaining == 0) {
return c.JZX_BEHAVIOR_STOP;
}
state.remaining -= 1;
if (state.partner.*) |partner_id| {
var payload: u32 = 1;
_ = c.jzx_send(state.loop, partner_id, &payload, @sizeOf(u32), 0);
}
return c.JZX_BEHAVIOR_OK;
}
test "ping pong actors share work fairly" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var id_a: c.jzx_actor_id = 0;
var id_b: c.jzx_actor_id = 0;
var partner_a: ?c.jzx_actor_id = 0;
var partner_b: ?c.jzx_actor_id = 0;
var state_a = PingPongState{ .loop = loop.ptr, .partner = &partner_a, .remaining = 10, .hits = 0 };
var state_b = PingPongState{ .loop = loop.ptr, .partner = &partner_b, .remaining = 10, .hits = 0 };
var opts_a = c.jzx_spawn_opts{ .behavior = pingPongBehavior, .state = &state_a, .supervisor = 0, .mailbox_cap = 0, .name = null };
var opts_b = c.jzx_spawn_opts{ .behavior = pingPongBehavior, .state = &state_b, .supervisor = 0, .mailbox_cap = 0, .name = null };
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_a, &id_a));
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts_b, &id_b));
partner_a = id_b;
partner_b = id_a;
var init: u32 = 1;
_ = c.jzx_send(loop.ptr, id_a, &init, @sizeOf(u32), 0);
_ = c.jzx_send(loop.ptr, id_b, &init, @sizeOf(u32), 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 11), state_a.hits);
try std.testing.expectEqual(@as(u32, 11), state_b.hits);
}
const CounterState = struct {
total: u32 = 0,
};
const CounterMsg = struct {
value: u32,
};
fn counterBehavior(state: *CounterState, msg: *CounterMsg, ctx: jzx.ActorContext) jzx.BehaviorResult {
_ = ctx;
state.total += msg.value;
return .stop;
}
test "typed actor increments state" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var counter = CounterState{};
var actor = try jzx.Actor(CounterState, *CounterMsg).spawn(
loop.ptr,
std.heap.c_allocator,
&counter,
&counterBehavior,
.{},
);
defer actor.destroy();
var msg = CounterMsg{ .value = 8 };
try std.testing.expectEqual(c.JZX_OK, c.jzx_send(loop.ptr, actor.getId(), &msg, @sizeOf(CounterMsg), 0));
try loop.run();
try std.testing.expectEqual(@as(u32, 8), counter.total);
}
fn io_behavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
if (msg_ptr.tag == c.JZX_TAG_SYS_IO and msg_ptr.data != null) {
const data_ptr = msg_ptr.data.?;
const state_ptr = @as(*u32, @ptrFromInt(@intFromPtr(ctx_ptr.state.?)));
const event = @as(*c.jzx_io_event, @ptrFromInt(@intFromPtr(data_ptr)));
if ((event.readiness & c.JZX_IO_READ) != 0) {
state_ptr.* += 1;
}
c.jzx_loop_free(ctx_ptr.loop.?, data_ptr);
return c.JZX_BEHAVIOR_STOP;
}
return c.JZX_BEHAVIOR_OK;
}
fn pipe_writer(fd: posix.fd_t) void {
const msg = "ping";
_ = posix.write(fd, msg) catch {};
}
test "io watcher delivers readiness" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = io_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
const pipefds = try posix.pipe();
defer {
posix.close(pipefds[0]);
posix.close(pipefds[1]);
}
try std.testing.expectEqual(c.JZX_OK, c.jzx_watch_fd(loop.ptr, pipefds[0], actor_id, c.JZX_IO_READ));
var writer = try std.Thread.spawn(.{}, pipe_writer, .{pipefds[1]});
writer.join();
try loop.run();
try std.testing.expectEqual(@as(u32, 1), state);
}
test "io rapid watch and unwatch" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state: u32 = 0;
var opts = c.jzx_spawn_opts{
.behavior = increment_behavior,
.state = &state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &opts, &actor_id));
const pipefds = try posix.pipe();
defer {
posix.close(pipefds[0]);
posix.close(pipefds[1]);
}
for (0..8) |_| {
try std.testing.expectEqual(c.JZX_OK, c.jzx_watch_fd(loop.ptr, pipefds[0], actor_id, c.JZX_IO_READ));
try std.testing.expectEqual(c.JZX_OK, c.jzx_unwatch_fd(loop.ptr, pipefds[0]));
}
try std.testing.expectEqual(c.JZX_OK, c.jzx_watch_fd(loop.ptr, pipefds[0], actor_id, c.JZX_IO_READ));
var writer = try std.Thread.spawn(.{}, pipe_writer, .{pipefds[1]});
writer.join();
try loop.run();
try std.testing.expect(state >= 1);
}
const RestartState = struct {
runs: u32 = 0,
};
fn scheduleSelf(loop: *c.jzx_loop, self: c.jzx_actor_id, ms: u32) void {
_ = c.jzx_send_after(loop, self, ms, null, 0, 0, null);
}
fn failThenStop(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*RestartState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
return if (state.runs == 1) c.JZX_BEHAVIOR_FAIL else c.JZX_BEHAVIOR_STOP;
}
const TransientDriverState = struct {
sup_id: c.jzx_actor_id,
original_child_id: c.jzx_actor_id,
stage: u8 = 0,
ticks: u32 = 0,
timed_out: bool = false,
};
fn transientDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*TransientDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var child_id: c.jzx_actor_id = 0;
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &child_id) != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, state.original_child_id, null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (child_id != 0 and child_id != state.original_child_id) {
_ = c.jzx_send(ctx_ptr.loop.?, child_id, null, 0, 0);
state.stage = 2;
}
} else {
if (child_id == 0) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
test "supervisor restarts transient child once" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var child_state = RestartState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = failThenStop,
.state = &child_state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var child_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &child_id));
try std.testing.expect(child_id != 0);
var driver_state = TransientDriverState{
.sup_id = sup_id,
.original_child_id = child_id,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = transientDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 2), child_state.runs);
try std.testing.expect(!driver_state.timed_out);
}
fn alwaysFail(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*RestartState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
return c.JZX_BEHAVIOR_FAIL;
}
const EscalationObsState = struct {
sup_id: c.jzx_actor_id,
escalations: u32 = 0,
sup_stopped: bool = false,
sup_stop_reason: c.jzx_exit_reason = c.JZX_EXIT_NORMAL,
};
fn escalationOnSupervisorEscalate(ctx: ?*anyopaque, supervisor: c.jzx_actor_id) callconv(.c) void {
const state = @as(*EscalationObsState, @ptrCast(@alignCast(ctx.?)));
if (supervisor == state.sup_id) {
state.escalations += 1;
}
}
fn escalationOnActorStop(ctx: ?*anyopaque, id: c.jzx_actor_id, reason: c.jzx_exit_reason) callconv(.c) void {
const state = @as(*EscalationObsState, @ptrCast(@alignCast(ctx.?)));
if (id == state.sup_id) {
state.sup_stopped = true;
state.sup_stop_reason = reason;
}
}
const EscalationDriverState = struct {
sup_id: c.jzx_actor_id,
obs: *EscalationObsState,
last_child_id: c.jzx_actor_id = 0,
ticks: u32 = 0,
timed_out: bool = false,
};
fn escalationDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*EscalationDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.obs.escalations > 0 or state.obs.sup_stopped) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var child_id: c.jzx_actor_id = 0;
const rc = c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &child_id);
if (rc == c.JZX_ERR_NO_SUCH_ACTOR) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (rc != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (child_id != 0 and child_id != state.last_child_id) {
_ = c.jzx_send(ctx_ptr.loop.?, child_id, null, 0, 0);
state.last_child_id = child_id;
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
test "supervisor escalates when intensity exceeded" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var child_state = RestartState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = alwaysFail,
.state = &child_state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 2,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var obs_state = EscalationObsState{ .sup_id = sup_id };
var obs = c.jzx_observer{
.on_actor_start = null,
.on_actor_stop = escalationOnActorStop,
.on_actor_restart = null,
.on_supervisor_escalate = escalationOnSupervisorEscalate,
.on_mailbox_full = null,
};
c.jzx_loop_set_observer(loop.ptr, &obs, @ptrCast(&obs_state));
var driver_state = EscalationDriverState{
.sup_id = sup_id,
.obs = &obs_state,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = escalationDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 1), obs_state.escalations);
try std.testing.expect(obs_state.sup_stopped);
try std.testing.expectEqual(@as(c.jzx_exit_reason, c.JZX_EXIT_FAIL), obs_state.sup_stop_reason);
try std.testing.expect(!driver_state.timed_out);
try std.testing.expect(child_state.runs >= 3);
}
const BackoffState = struct {
runs: u32 = 0,
t1_ms: u64 = 0,
t2_ms: u64 = 0,
};
fn backoffRecorder(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*BackoffState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
const now_ms = @as(u64, @intCast(std.time.milliTimestamp()));
if (state.runs == 1) {
state.t1_ms = now_ms;
return c.JZX_BEHAVIOR_FAIL;
}
state.t2_ms = now_ms;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
const BackoffDriverState = struct {
sup_id: c.jzx_actor_id,
original_child_id: c.jzx_actor_id,
backoff_state: *BackoffState,
stage: u8 = 0,
ticks: u32 = 0,
timed_out: bool = false,
};
fn backoffDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*BackoffDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.backoff_state.runs >= 2) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var child_id: c.jzx_actor_id = 0;
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &child_id) != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, state.original_child_id, null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (child_id != 0 and child_id != state.original_child_id) {
_ = c.jzx_send(ctx_ptr.loop.?, child_id, null, 0, 0);
state.stage = 2;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
test "supervisor backoff delays restart" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state = BackoffState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = backoffRecorder,
.state = &state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE, // use supervisor default
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_CONSTANT,
.backoff_delay_ms = 50,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var child_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &child_id));
try std.testing.expect(child_id != 0);
var driver_state = BackoffDriverState{
.sup_id = sup_id,
.original_child_id = child_id,
.backoff_state = &state,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = backoffDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 2), state.runs);
try std.testing.expect(state.t2_ms >= state.t1_ms + 50);
try std.testing.expect(!driver_state.timed_out);
}
fn backoffRecorderExp(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*BackoffState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.runs += 1;
const now_ms = @as(u64, @intCast(std.time.milliTimestamp()));
if (state.runs == 1) {
state.t1_ms = now_ms;
return c.JZX_BEHAVIOR_FAIL;
}
state.t2_ms = now_ms;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
test "supervisor exponential backoff delays restart" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var state = BackoffState{};
var child_spec = [_]c.jzx_child_spec{.{
.behavior = backoffRecorderExp,
.state = &state,
.mode = c.JZX_CHILD_TRANSIENT,
.mailbox_cap = 0,
.restart_delay_ms = 20, // base delay
.backoff = c.JZX_BACKOFF_EXPONENTIAL,
.name = null,
}};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_EXPONENTIAL,
.backoff_delay_ms = 20,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var child_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &child_id));
try std.testing.expect(child_id != 0);
var driver_state = BackoffDriverState{
.sup_id = sup_id,
.original_child_id = child_id,
.backoff_state = &state,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = backoffDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expectEqual(@as(u32, 2), state.runs);
try std.testing.expect(state.t2_ms >= state.t1_ms + 30);
try std.testing.expect(!driver_state.timed_out);
}
const DuoShared = struct {
runs_a: u32 = 0,
runs_b: u32 = 0,
};
const DuoState = struct {
shared: *DuoShared,
is_a: bool,
};
fn duoBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*DuoState, @ptrCast(@alignCast(ctx_ptr.state.?)));
if (state.is_a) {
state.shared.runs_a += 1;
if (state.shared.runs_a >= 2 and state.shared.runs_b >= 2) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
}
return c.JZX_BEHAVIOR_OK;
} else {
state.shared.runs_b += 1;
if (state.shared.runs_b == 1) {
return c.JZX_BEHAVIOR_FAIL;
}
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
const OneForAllDriverState = struct {
sup_id: c.jzx_actor_id,
original_a: c.jzx_actor_id,
original_b: c.jzx_actor_id,
stage: u8 = 0,
observed_restart: bool = false,
ticks: u32 = 0,
timed_out: bool = false,
};
fn oneForAllDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*OneForAllDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var id_a: c.jzx_actor_id = 0;
var id_b: c.jzx_actor_id = 0;
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 0, &id_a) != c.JZX_OK or
c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, 1, &id_b) != c.JZX_OK)
{
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, state.original_a, null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, state.original_b, null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (id_a != 0 and id_b != 0 and id_a != state.original_a and id_b != state.original_b) {
state.observed_restart = true;
_ = c.jzx_send(ctx_ptr.loop.?, id_a, null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, id_b, null, 0, 0);
state.stage = 2;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
test "supervisor one_for_all restarts all children" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var shared = DuoShared{};
var state_a = DuoState{ .shared = &shared, .is_a = true };
var state_b = DuoState{ .shared = &shared, .is_a = false };
var child_spec = [_]c.jzx_child_spec{
.{
.behavior = duoBehavior,
.state = &state_a,
.mode = c.JZX_CHILD_PERMANENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
},
.{
.behavior = duoBehavior,
.state = &state_b,
.mode = c.JZX_CHILD_PERMANENT,
.mailbox_cap = 0,
.restart_delay_ms = 0,
.backoff = c.JZX_BACKOFF_NONE,
.name = null,
},
};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_ONE_FOR_ALL,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var id_a: c.jzx_actor_id = 0;
var id_b: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 0, &id_a));
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, 1, &id_b));
try std.testing.expect(id_a != 0);
try std.testing.expect(id_b != 0);
var driver_state = OneForAllDriverState{
.sup_id = sup_id,
.original_a = id_a,
.original_b = id_b,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = oneForAllDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expect(shared.runs_a >= 2);
try std.testing.expect(shared.runs_b >= 2);
try std.testing.expect(driver_state.observed_restart);
try std.testing.expect(!driver_state.timed_out);
}
const TrioShared = struct {
hits_a: u32 = 0,
hits_b: u32 = 0,
hits_c: u32 = 0,
};
const TrioState = struct {
shared: *TrioShared,
role: enum { A, B, C },
};
fn trioBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*TrioState, @ptrCast(@alignCast(ctx_ptr.state.?)));
switch (state.role) {
.A => {
state.shared.hits_a += 1;
return c.JZX_BEHAVIOR_OK;
},
.B => {
state.shared.hits_b += 1;
return c.JZX_BEHAVIOR_FAIL; // trigger rest_for_one restart for B and C
},
.C => {
state.shared.hits_c += 1;
return c.JZX_BEHAVIOR_OK;
},
}
}
const RestForOneDriverState = struct {
sup_id: c.jzx_actor_id,
original_b: c.jzx_actor_id,
original_c: c.jzx_actor_id,
shared: *TrioShared,
stage: u8 = 0,
observed_restart: bool = false,
ticks: u32 = 0,
timed_out: bool = false,
};
fn restForOneDriver(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
_ = msg;
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const state = @as(*RestForOneDriverState, @ptrCast(@alignCast(ctx_ptr.state.?)));
state.ticks += 1;
if (state.ticks > 5000) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
var ids = [_]c.jzx_actor_id{0} ** 3;
for (&ids, 0..) |*idptr, idx| {
if (c.jzx_supervisor_child_id(ctx_ptr.loop.?, state.sup_id, idx, idptr) != c.JZX_OK) {
state.timed_out = true;
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
if (state.stage == 0) {
_ = c.jzx_send(ctx_ptr.loop.?, ids[2], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[0], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[1], null, 0, 0);
state.stage = 1;
} else if (state.stage == 1) {
if (ids[1] != 0 and ids[2] != 0 and ids[1] != state.original_b and ids[2] != state.original_c) {
state.observed_restart = true;
_ = c.jzx_send(ctx_ptr.loop.?, ids[2], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[0], null, 0, 0);
_ = c.jzx_send(ctx_ptr.loop.?, ids[1], null, 0, 0);
state.stage = 2;
}
}
if (state.stage == 2) {
if (state.shared.hits_a >= 2 and state.shared.hits_b >= 2 and state.shared.hits_c >= 2) {
c.jzx_loop_request_stop(ctx_ptr.loop.?);
return c.JZX_BEHAVIOR_STOP;
}
}
scheduleSelf(ctx_ptr.loop.?, ctx_ptr.self, 1);
return c.JZX_BEHAVIOR_OK;
}
test "supervisor rest_for_one restarts downstream children" {
var loop = try jzx.Loop.create(null);
defer loop.deinit();
var shared = TrioShared{};
var state_a = TrioState{ .shared = &shared, .role = .A };
var state_b = TrioState{ .shared = &shared, .role = .B };
var state_c = TrioState{ .shared = &shared, .role = .C };
var child_spec = [_]c.jzx_child_spec{
.{ .behavior = trioBehavior, .state = &state_a, .mode = c.JZX_CHILD_PERMANENT, .mailbox_cap = 0, .restart_delay_ms = 0, .backoff = c.JZX_BACKOFF_NONE, .name = null },
.{ .behavior = trioBehavior, .state = &state_b, .mode = c.JZX_CHILD_PERMANENT, .mailbox_cap = 0, .restart_delay_ms = 0, .backoff = c.JZX_BACKOFF_NONE, .name = null },
.{ .behavior = trioBehavior, .state = &state_c, .mode = c.JZX_CHILD_PERMANENT, .mailbox_cap = 0, .restart_delay_ms = 0, .backoff = c.JZX_BACKOFF_NONE, .name = null },
};
var sup_init = c.jzx_supervisor_init{
.children = &child_spec,
.child_count = child_spec.len,
.supervisor = .{
.strategy = c.JZX_SUP_REST_FOR_ONE,
.intensity = 5,
.period_ms = 1000,
.backoff = c.JZX_BACKOFF_NONE,
.backoff_delay_ms = 0,
},
};
var sup_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn_supervisor(loop.ptr, &sup_init, 0, &sup_id));
var ids = [_]c.jzx_actor_id{0} ** 3;
for (&ids, 0..) |*idptr, idx| {
try std.testing.expectEqual(c.JZX_OK, c.jzx_supervisor_child_id(loop.ptr, sup_id, idx, idptr));
try std.testing.expect(idptr.* != 0);
}
var driver_state = RestForOneDriverState{
.sup_id = sup_id,
.original_b = ids[1],
.original_c = ids[2],
.shared = &shared,
};
var driver_opts = c.jzx_spawn_opts{
.behavior = restForOneDriver,
.state = &driver_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var driver_id: c.jzx_actor_id = 0;
try std.testing.expectEqual(c.JZX_OK, c.jzx_spawn(loop.ptr, &driver_opts, &driver_id));
_ = c.jzx_send(loop.ptr, driver_id, null, 0, 0);
try loop.run();
try std.testing.expect(shared.hits_a >= 2);
try std.testing.expect(shared.hits_b >= 2);
try std.testing.expect(shared.hits_c >= 2);
try std.testing.expect(driver_state.observed_restart);
try std.testing.expect(!driver_state.timed_out);
}