Skip to main content
Version: next

Runtime core — src/jzx_runtime.c

This file is the primary implementation of the libjzx runtime: actors, mailboxes, scheduling, supervision, timers, async send, and I/O watcher integration.

Unlike a reference dump, this page is written in a “textbook” style: small snippets with the explanation immediately around them. The goal is to explain what each piece is, how it works, and why it exists.

Includes and file-level structure

Includes + file-level section markers
#include "jzx_internal.h"

#include <stdlib.h>
#include <string.h>
#include <time.h>

// -----------------------------------------------------------------------------
// Utility helpers
// -----------------------------------------------------------------------------
  • jzx_internal.h provides the concrete struct jzx_loop and internal helper types.
  • stdlib.h / string.h: memory and string utilities used for allocations and memset.
  • time.h: monotonic clock access for timers and intensity windows.

The rest of the file is organized with section headers like:

  • “Mailbox implementation”
  • “Actor table implementation”
  • “Timer system”

Those section headers are meant to be a reading guide: each section is a subsystem with its own invariants.

Utility helpers

Actor id encoding helpers

These helpers implement the runtime’s actor-id scheme: a jzx_actor_id is a 64-bit value that encodes (generation, index).

jzx_id_index()
static inline uint32_t jzx_id_index(jzx_actor_id id) {
return (uint32_t)(id & 0xffffffffu);
}
  • Extracts the low 32 bits (the table index).
  • Why it exists: fast lookup into actors.slots[].
jzx_id_generation()
static inline uint32_t jzx_id_generation(jzx_actor_id id) {
return (uint32_t)(id >> 32u);
}
  • Extracts the high 32 bits (the generation).
  • Why it exists: stale-id rejection when actor slots are reused.
jzx_make_id()
static inline jzx_actor_id jzx_make_id(uint32_t gen, uint32_t idx) {
return ((uint64_t)gen << 32u) | (uint64_t)idx;
}
  • Packs (gen, idx) back into a 64-bit id.
  • Why it exists: the runtime controls the id format while keeping the public type opaque.

Allocator wrappers

The runtime always allocates through jzx_allocator so users can supply custom allocation strategies.

jzx_alloc()
static void* jzx_alloc(jzx_allocator* alloc, size_t size) {
return alloc->alloc ? alloc->alloc(alloc->ctx, size) : NULL;
}

Notes:

  • Calls alloc->alloc(ctx, size) when provided.
  • Returns NULL if no allocator is configured (or allocation fails).
jzx_free()
static void jzx_free(jzx_allocator* alloc, void* ptr) {
if (alloc->free) {
alloc->free(alloc->ctx, ptr);
}
}

Notes:

  • Calls the configured free function when present.
  • Does nothing if free is null.

Why wrappers exist: they concentrate “allocator optionality” into one place so the rest of the code doesn’t have to repeat null checks.

Supervisor state allocation and restart intensity

The supervisor subsystem needs its own heap state (the configured child list and restart counters).

jzx_supervisor_state_create()
static jzx_supervisor_state* jzx_supervisor_state_create(const jzx_supervisor_init* init,
jzx_allocator* allocator) {
if (!init || init->child_count == 0 || !init->children) {
return NULL;
}
jzx_supervisor_state* state =
(jzx_supervisor_state*)jzx_alloc(allocator, sizeof(jzx_supervisor_state));
if (!state) {
return NULL;
}
memset(state, 0, sizeof(*state));
state->config = init->supervisor;
state->child_count = init->child_count;
size_t bytes = sizeof(jzx_child_state) * init->child_count;
state->children = (jzx_child_state*)jzx_alloc(allocator, bytes);
if (!state->children) {
jzx_free(allocator, state);
return NULL;
}
memset(state->children, 0, bytes);
for (size_t i = 0; i < init->child_count; ++i) {
state->children[i].spec = init->children[i];
state->children[i].id = 0;
state->children[i].restart_count = 0;
state->children[i].last_restart_ms = 0;
}
state->intensity_window_count = 0;
state->intensity_window_start_ms = 0;
return state;
}

Key ideas:

  • Validates init and its children array.
  • Allocates jzx_supervisor_state and a contiguous children[] array.
  • Copies the child specs into runtime-owned memory so the init can be stack-allocated by the caller.

Ownership note:

  • The supervisor state owns children and must free it on teardown.
jzx_supervisor_state_destroy()
static void jzx_supervisor_state_destroy(jzx_supervisor_state* state, jzx_allocator* allocator) {
if (!state)
return;
if (state->children) {
jzx_free(allocator, state->children);
}
jzx_free(allocator, state);
}

This is the ownership inverse of create: free the children array, then free the state.

jzx_supervisor_allow_restart()
static int jzx_supervisor_allow_restart(jzx_supervisor_state* sup, uint64_t now_ms) {
if (!sup)
return 0;
if (sup->config.intensity == 0 || sup->config.period_ms == 0) {
return 1;
}
if (sup->intensity_window_start_ms == 0 ||
now_ms - sup->intensity_window_start_ms > sup->config.period_ms) {
sup->intensity_window_start_ms = now_ms;
sup->intensity_window_count = 0;
}
sup->intensity_window_count += 1;
if (sup->intensity_window_count > sup->config.intensity) {
return 0;
}
return 1;
}

This implements restart-intensity logic:

  • If intensity config is disabled (0), always allow.
  • Otherwise, maintain a rolling time window and count restarts.
  • If restarts exceed intensity within period_ms, disallow (which triggers escalation elsewhere).

Why it exists: without intensity limits, a crash loop could saturate CPU and starve everything else.

Time and saturating math helpers

jzx_now_ms()
static uint64_t jzx_now_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000ull + (uint64_t)ts.tv_nsec / 1000000ull;
}
  • Uses CLOCK_MONOTONIC so time deltas don’t go backwards if the system clock changes.
  • Returns milliseconds for timer scheduling and intensity windows.
jzx_sat_add32()
static uint32_t jzx_sat_add32(uint32_t a, uint32_t b) {
uint64_t sum = (uint64_t)a + (uint64_t)b;
if (sum > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)sum;
}

Saturating add prevents overflow when computing delays or budgets.

jzx_sat_mul32()
static uint32_t jzx_sat_mul32(uint32_t a, uint32_t b) {
if (a == 0 || b == 0)
return 0;
uint64_t prod = (uint64_t)a * (uint64_t)b;
if (prod > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)prod;
}

Saturating multiply is used for exponential backoff computations without wrapping.

This file is written top-to-bottom, but a few subsystems need to reference helpers that are defined later. Instead of reordering everything, jzx_runtime.c uses file-local forward declarations.

Forward declarations + section boundary
static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender);

static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor);

// -----------------------------------------------------------------------------
// Wakeup helpers
// -----------------------------------------------------------------------------

Why these exist:

  • jzx_send_internal(...): the “real send” primitive that all message paths funnel through.
    • It’s referenced by multiple subsystems (async send, timers, supervision system messages).
  • jzx_io_remove_actor(...): used during teardown to ensure watchers don’t keep stale (fd → actor) references.
  • The dashed section headers are purely for human readers, but they’re valuable in a file this large:
    • each header marks a coherent subsystem with its own invariants.

Wakeup helpers (event loop notification)

jzx_wakeup_signal()
static void jzx_wakeup_signal(jzx_loop* loop) {
if (!loop || !loop->xev) {
return;
}
jzx_xev_wakeup(loop->xev);
}

This is how the runtime wakes the backend event loop when something arrives from another thread (async send queue or timer wake).

Observer helpers (instrumentation)

Section: observer helpers
// -----------------------------------------------------------------------------
// Observer helpers
// -----------------------------------------------------------------------------

These helpers are “safe callouts”: they check for null hooks and then forward the event.

jzx_obs_actor_start()
static void jzx_obs_actor_start(jzx_loop* loop, jzx_actor_id id, const char* name) {
if (loop && loop->observer.on_actor_start) {
loop->observer.on_actor_start(loop->observer_ctx, id, name);
}
}
jzx_obs_actor_stop()
static void jzx_obs_actor_stop(jzx_loop* loop, jzx_actor_id id, jzx_exit_reason reason) {
if (loop && loop->observer.on_actor_stop) {
loop->observer.on_actor_stop(loop->observer_ctx, id, reason);
}
}
jzx_obs_actor_restart()
static void jzx_obs_actor_restart(jzx_loop* loop, jzx_actor_id supervisor, jzx_actor_id child,
uint32_t attempt) {
if (loop && loop->observer.on_actor_restart) {
loop->observer.on_actor_restart(loop->observer_ctx, supervisor, child, attempt);
}
}
jzx_obs_supervisor_escalate()
static void jzx_obs_supervisor_escalate(jzx_loop* loop, jzx_actor_id supervisor) {
if (loop && loop->observer.on_supervisor_escalate) {
loop->observer.on_supervisor_escalate(loop->observer_ctx, supervisor);
}
}
jzx_obs_mailbox_full()
static void jzx_obs_mailbox_full(jzx_loop* loop, jzx_actor_id target) {
if (loop && loop->observer.on_mailbox_full) {
loop->observer.on_mailbox_full(loop->observer_ctx, target);
}
}

Why these exist: to keep observer checks consistent and to keep the core logic readable.

Mailbox implementation (bounded per-actor queue)

Section: mailbox implementation
// -----------------------------------------------------------------------------
// Mailbox implementation
// -----------------------------------------------------------------------------
jzx_mailbox_init()
static jzx_err jzx_mailbox_init(jzx_mailbox_impl* box, uint32_t capacity,
jzx_allocator* allocator) {
if (capacity == 0) {
capacity = 1;
}
size_t bytes = sizeof(jzx_message) * capacity;
jzx_message* buffer = (jzx_message*)jzx_alloc(allocator, bytes);
if (!buffer) {
return JZX_ERR_NO_MEMORY;
}
memset(buffer, 0, bytes);
box->buffer = buffer;
box->capacity = capacity;
box->head = 0;
box->tail = 0;
box->count = 0;
return JZX_OK;
}

Notes:

  • Enforces capacity >= 1.
  • Allocates a fixed-size ring buffer of jzx_message items.
  • Initializes head/tail/count to empty.
jzx_mailbox_deinit()
static void jzx_mailbox_deinit(jzx_mailbox_impl* box, jzx_allocator* allocator) {
if (box->buffer) {
jzx_free(allocator, box->buffer);
}
memset(box, 0, sizeof(*box));
}
jzx_mailbox_push()
static int jzx_mailbox_push(jzx_mailbox_impl* box, const jzx_message* msg) {
if (box->count == box->capacity) {
return -1;
}
box->buffer[box->tail] = *msg;
box->tail = (box->tail + 1) % box->capacity;
box->count++;
return 0;
}

This is the backpressure point:

jzx_mailbox_pop()
static int jzx_mailbox_pop(jzx_mailbox_impl* box, jzx_message* out) {
if (box->count == 0) {
return -1;
}
*out = box->buffer[box->head];
box->head = (box->head + 1) % box->capacity;
box->count--;
return 0;
}
jzx_mailbox_has_items()
static int jzx_mailbox_has_items(const jzx_mailbox_impl* box) {
return box->count > 0;
}

Actor table (id → actor pointer with generations)

Section: actor table implementation
// -----------------------------------------------------------------------------
// Actor table implementation
// -----------------------------------------------------------------------------
jzx_actor_table_init()
static jzx_err jzx_actor_table_init(jzx_actor_table* table, uint32_t capacity,
jzx_allocator* allocator) {
memset(table, 0, sizeof(*table));
table->capacity = capacity;
size_t slot_bytes = sizeof(jzx_actor*) * capacity;
size_t gen_bytes = sizeof(uint32_t) * capacity;
size_t stack_bytes = sizeof(uint32_t) * capacity;

table->slots = (jzx_actor**)jzx_alloc(allocator, slot_bytes);
table->generations = (uint32_t*)jzx_alloc(allocator, gen_bytes);
table->free_stack = (uint32_t*)jzx_alloc(allocator, stack_bytes);
if (!table->slots || !table->generations || !table->free_stack) {
return JZX_ERR_NO_MEMORY;
}

memset(table->slots, 0, slot_bytes);
for (uint32_t i = 0; i < capacity; ++i) {
table->generations[i] = 1;
table->free_stack[i] = capacity - 1 - i;
}
table->free_top = capacity;
table->used = 0;
return JZX_OK;
}
jzx_actor_table_deinit()
static void jzx_actor_table_deinit(jzx_actor_table* table, jzx_allocator* allocator) {
if (!table) {
return;
}
if (table->slots) {
jzx_free(allocator, table->slots);
}
if (table->generations) {
jzx_free(allocator, table->generations);
}
if (table->free_stack) {
jzx_free(allocator, table->free_stack);
}
memset(table, 0, sizeof(*table));
}
jzx_actor_table_lookup()
static jzx_actor* jzx_actor_table_lookup(jzx_actor_table* table, jzx_actor_id id) {
uint32_t idx = jzx_id_index(id);
if (idx >= table->capacity) {
return NULL;
}
if (table->generations[idx] != jzx_id_generation(id)) {
return NULL;
}
return table->slots[idx];
}

This function is the critical stale-id guard:

  • It rejects ids whose generation doesn’t match the current slot generation.
jzx_actor_table_insert()
static jzx_err jzx_actor_table_insert(jzx_actor_table* table, jzx_actor* actor,
jzx_allocator* allocator, jzx_actor_id* out_id) {
(void)allocator;
if (table->free_top == 0) {
return JZX_ERR_MAX_ACTORS;
}
uint32_t idx = table->free_stack[--table->free_top];
uint32_t gen = table->generations[idx];
actor->id = jzx_make_id(gen, idx);
table->slots[idx] = actor;
table->used++;
if (out_id) {
*out_id = actor->id;
}
return JZX_OK;
}
jzx_actor_table_remove()
static void jzx_actor_table_remove(jzx_actor_table* table, jzx_actor* actor) {
if (!actor) {
return;
}
uint32_t idx = jzx_id_index(actor->id);
if (idx >= table->capacity) {
return;
}
if (table->slots[idx] != actor) {
return;
}
table->slots[idx] = NULL;
table->generations[idx] += 1u;
table->free_stack[table->free_top++] = idx;
if (table->used > 0) {
table->used--;
}
}

Run queue (who runs next)

Section: run queue implementation
// -----------------------------------------------------------------------------
// Run queue implementation
// -----------------------------------------------------------------------------
jzx_run_queue_init()
static jzx_err jzx_run_queue_init(jzx_run_queue* rq, uint32_t capacity, jzx_allocator* allocator) {
memset(rq, 0, sizeof(*rq));
rq->capacity = capacity > 0 ? capacity : 1;
rq->entries = (jzx_actor**)jzx_alloc(allocator, sizeof(jzx_actor*) * rq->capacity);
if (!rq->entries) {
return JZX_ERR_NO_MEMORY;
}
memset(rq->entries, 0, sizeof(jzx_actor*) * rq->capacity);
return JZX_OK;
}
jzx_run_queue_deinit()
static void jzx_run_queue_deinit(jzx_run_queue* rq, jzx_allocator* allocator) {
if (rq->entries) {
jzx_free(allocator, rq->entries);
}
memset(rq, 0, sizeof(*rq));
}
jzx_run_queue_push()
static int jzx_run_queue_push(jzx_run_queue* rq, jzx_actor* actor) {
if (rq->count == rq->capacity) {
return -1;
}
rq->entries[rq->tail] = actor;
rq->tail = (rq->tail + 1) % rq->capacity;
rq->count++;
return 0;
}
jzx_run_queue_pop()
static jzx_actor* jzx_run_queue_pop(jzx_run_queue* rq) {
if (rq->count == 0) {
return NULL;
}
jzx_actor* actor = rq->entries[rq->head];
rq->entries[rq->head] = NULL;
rq->head = (rq->head + 1) % rq->capacity;
rq->count--;
return actor;
}
jzx_schedule_actor()
static void jzx_schedule_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor || actor->in_run_queue) {
return;
}
if (jzx_run_queue_push(&loop->run_queue, actor) == 0) {
actor->in_run_queue = 1;
}
}

This is the scheduler’s “make runnable” operation. It ensures a runnable actor is in the run queue exactly once.

Actor teardown (resource cleanup)

jzx_teardown_actor()
static void jzx_teardown_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor) {
return;
}
jzx_io_remove_actor(loop, actor->id);
jzx_exit_reason reason = JZX_EXIT_NORMAL;
if (actor->status == JZX_ACTOR_FAILED) {
reason = JZX_EXIT_FAIL;
}
jzx_obs_actor_stop(loop, actor->id, reason);
if (actor->supervisor) {
jzx_child_exit* ev = (jzx_child_exit*)jzx_alloc(&loop->allocator, sizeof(jzx_child_exit));
if (ev) {
ev->child = actor->id;
ev->status = actor->status;
jzx_err err = jzx_send_internal(loop, actor->supervisor, ev, sizeof(jzx_child_exit),
JZX_TAG_SYS_CHILD_EXIT, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
}
}
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_actor_table_remove(&loop->actors, actor);
if (actor->supervisor_state) {
jzx_supervisor_state_destroy(actor->supervisor_state, &loop->allocator);
}
jzx_free(&loop->allocator, actor);
}

This function owns the “stop means free” contract:

  • Deinitializes mailbox.
  • Removes I/O watches owned by the actor.
  • Frees supervision state if present.
  • Removes the actor from the actor table and frees its allocation.

Supervision logic (restart strategies)

Section: supervisor helpers
// -----------------------------------------------------------------------------
// Supervisor helpers
// -----------------------------------------------------------------------------
jzx_supervisor_find_child()
static jzx_child_state* jzx_supervisor_find_child(jzx_supervisor_state* sup, jzx_actor_id id,
size_t* out_idx) {
if (!sup)
return NULL;
for (size_t i = 0; i < sup->child_count; ++i) {
if (sup->children[i].id == id) {
if (out_idx) {
*out_idx = i;
}
return &sup->children[i];
}
}
return NULL;
}
jzx_supervisor_spawn_child()
static jzx_err jzx_supervisor_spawn_child(jzx_loop* loop, jzx_actor_id supervisor_id,
jzx_child_state* child) {
jzx_spawn_opts opts = {
.behavior = child->spec.behavior,
.state = child->spec.state,
.supervisor = supervisor_id,
.mailbox_cap = child->spec.mailbox_cap,
.name = child->spec.name,
};
child->last_restart_ms = jzx_now_ms();
return jzx_spawn(loop, &opts, &child->id);
}
jzx_supervisor_stop_child()
static void jzx_supervisor_stop_child(jzx_loop* loop, jzx_child_state* child) {
if (child->id != 0) {
(void)jzx_actor_stop(loop, child->id);
child->id = 0;
}
}
jzx_supervisor_schedule_restart()
static void jzx_supervisor_schedule_restart(jzx_loop* loop, jzx_actor* sup_actor, size_t child_idx,
uint32_t delay_ms) {
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (!sup || child_idx >= sup->child_count)
return;
if (delay_ms == 0) {
(void)jzx_supervisor_spawn_child(loop, sup_actor->id, &sup->children[child_idx]);
return;
}
jzx_child_restart* payload =
(jzx_child_restart*)jzx_alloc(&loop->allocator, sizeof(jzx_child_restart));
if (!payload)
return;
payload->child_index = (uint32_t)child_idx;
jzx_err err = jzx_send_after(loop, sup_actor->id, delay_ms, payload, sizeof(jzx_child_restart),
JZX_TAG_SYS_CHILD_RESTART, NULL);
if (err != JZX_OK) {
jzx_free(&loop->allocator, payload);
}
}

This typically schedules a system message/timer that triggers the actual restart later.

jzx_supervisor_compute_delay()
static uint32_t jzx_supervisor_compute_delay(const jzx_supervisor_state* sup,
const jzx_child_state* child) {
if (!sup || !child)
return 0;
uint32_t base = child->spec.restart_delay_ms;
jzx_backoff_type strategy = child->spec.backoff;
if (strategy == JZX_BACKOFF_NONE) {
strategy = sup->config.backoff;
}
uint32_t step = sup->config.backoff_delay_ms;
switch (strategy) {
case JZX_BACKOFF_NONE:
return base;
case JZX_BACKOFF_CONSTANT: {
uint32_t extra = jzx_sat_mul32(step, child->restart_count);
return jzx_sat_add32(base, extra);
}
case JZX_BACKOFF_EXPONENTIAL: {
uint32_t factor = 1u;
uint32_t shifts = child->restart_count;
if (shifts >= 31) {
factor = UINT32_MAX;
} else {
factor = 1u << shifts;
}
uint32_t scaled_base = jzx_sat_mul32(base ? base : step, factor);
return scaled_base;
}
}
return base;
}

This computes the restart delay based on:

  • restart attempt count
  • configured backoff type
  • saturating arithmetic to avoid overflow
jzx_supervisor_restart_strategy()
static void jzx_supervisor_restart_strategy(jzx_loop* loop, jzx_actor* supervisor_actor,
size_t failed_idx, jzx_actor_id failed_child_id,
jzx_supervisor_state* sup) {
uint32_t failed_delay = sup->children[failed_idx].spec.restart_delay_ms;
switch (sup->config.strategy) {
case JZX_SUP_ONE_FOR_ONE:
if (failed_child_id) {
uint32_t attempt = sup->children[failed_idx].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, failed_child_id, attempt);
}
sup->children[failed_idx].restart_count += 1;
sup->children[failed_idx].last_restart_ms = jzx_now_ms();
failed_delay = jzx_supervisor_compute_delay(sup, &sup->children[failed_idx]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, failed_idx, failed_delay);
break;
case JZX_SUP_ONE_FOR_ALL:
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = 0; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
case JZX_SUP_REST_FOR_ONE:
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
}
}

This applies the configured strategy (one-for-one, one-for-all, rest-for-one).

jzx_supervisor_behavior()
static jzx_behavior_result jzx_supervisor_behavior(jzx_context* ctx, const jzx_message* msg) {
jzx_actor* sup_actor = jzx_actor_table_lookup(&ctx->loop->actors, ctx->self);
if (!sup_actor || !sup_actor->supervisor_state) {
if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (msg->tag == JZX_TAG_SYS_CHILD_EXIT && msg->data) {
jzx_child_exit* ev = (jzx_child_exit*)msg->data;
jzx_actor_id failed_child_id = ev->child;
size_t idx = 0;
jzx_child_state* child = jzx_supervisor_find_child(sup, ev->child, &idx);
jzx_actor_status status = ev->status;
jzx_free(&ctx->loop->allocator, ev);
if (!child) {
return JZX_BEHAVIOR_OK;
}
child->id = 0;

int restart = 0;
if (child->spec.mode == JZX_CHILD_PERMANENT) {
restart = 1;
} else if (child->spec.mode == JZX_CHILD_TRANSIENT && status == JZX_ACTOR_FAILED) {
restart = 1;
}

if (!restart) {
return JZX_BEHAVIOR_OK;
}

uint64_t now = jzx_now_ms();
if (!jzx_supervisor_allow_restart(sup, now)) {
jzx_obs_supervisor_escalate(ctx->loop, ctx->self);
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(ctx->loop, &sup->children[i]);
}
sup_actor->status = JZX_ACTOR_FAILED;
return JZX_BEHAVIOR_FAIL;
}

jzx_supervisor_restart_strategy(ctx->loop, sup_actor, idx, failed_child_id, sup);
return JZX_BEHAVIOR_OK;
}

if (msg->tag == JZX_TAG_SYS_CHILD_RESTART && msg->data) {
jzx_child_restart* ev = (jzx_child_restart*)msg->data;
uint32_t idx = ev->child_index;
jzx_free(&ctx->loop->allocator, ev);
if (idx < sup->child_count) {
(void)jzx_supervisor_spawn_child(ctx->loop, ctx->self, &sup->children[idx]);
}
return JZX_BEHAVIOR_OK;
}

if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}

Supervisor actors are just actors with a special behavior: they react to system tags like “child exit” and “restart child”.

Async queue (jzx_send_async)

Section: async queue
// -----------------------------------------------------------------------------
// Async queue
// -----------------------------------------------------------------------------
jzx_async_queue_init()
static jzx_err jzx_async_queue_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->async_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->async_mutex_initialized = 1;
loop->async_head = NULL;
loop->async_tail = NULL;
return JZX_OK;
}
jzx_async_queue_destroy()
static void jzx_async_queue_destroy(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
pthread_mutex_destroy(&loop->async_mutex);
loop->async_mutex_initialized = 0;
while (head) {
jzx_async_msg* next = head->next;
jzx_free(&loop->allocator, head);
head = next;
}
}
jzx_async_enqueue()
static jzx_err jzx_async_enqueue(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop || !loop->async_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
jzx_async_msg* msg = (jzx_async_msg*)jzx_alloc(&loop->allocator, sizeof(jzx_async_msg));
if (!msg) {
return JZX_ERR_NO_MEMORY;
}
msg->target = target;
msg->data = data;
msg->len = len;
msg->tag = tag;
msg->sender = sender;
msg->next = NULL;

pthread_mutex_lock(&loop->async_mutex);
if (!loop->async_head) {
loop->async_head = msg;
loop->async_tail = msg;
} else {
loop->async_tail->next = msg;
loop->async_tail = msg;
}
pthread_mutex_unlock(&loop->async_mutex);
jzx_wakeup_signal(loop);
return JZX_OK;
}

This is the cross-thread enqueue point.

Important: the runtime does not copy the payload; it stores the pointer.

jzx_async_detach()
static jzx_async_msg* jzx_async_detach(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return NULL;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
return head;
}

Detaches the list under the mutex so the loop thread can process without holding the lock.

jzx_async_dispatch()
static void jzx_async_dispatch(jzx_loop* loop, jzx_async_msg* head) {
jzx_async_msg* msg = head;
while (msg) {
jzx_async_msg* next = msg->next;
(void)jzx_send_internal(loop, msg->target, msg->data, msg->len, msg->tag, msg->sender);
jzx_free(&loop->allocator, msg);
msg = next;
}
}

Delivers detached async messages by converting them into normal mailbox enqueues.

jzx_async_drain()
static void jzx_async_drain(jzx_loop* loop) {
jzx_async_msg* head = jzx_async_detach(loop);
if (head) {
jzx_async_dispatch(loop, head);
}
}
jzx_async_has_pending()
static int jzx_async_has_pending(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->async_mutex);
int has = loop->async_head != NULL;
pthread_mutex_unlock(&loop->async_mutex);
return has;
}

Timer system (timer thread + due list)

Section: timer system
// -----------------------------------------------------------------------------
// Timer system
// -----------------------------------------------------------------------------
jzx_timer_insert_locked()
static void jzx_timer_insert_locked(jzx_loop* loop, jzx_timer_entry* entry) {
if (!loop->timer_head || entry->due_ms < loop->timer_head->due_ms) {
entry->next = loop->timer_head;
loop->timer_head = entry;
return;
}
jzx_timer_entry* cur = loop->timer_head;
while (cur->next && cur->next->due_ms <= entry->due_ms) {
cur = cur->next;
}
entry->next = cur->next;
cur->next = entry;
}
jzx_timer_thread_main()
static void* jzx_timer_thread_main(void* arg) {
jzx_loop* loop = (jzx_loop*)arg;
pthread_mutex_lock(&loop->timer_mutex);
while (!loop->timer_stop) {
uint64_t now = jzx_now_ms();
jzx_timer_entry* head = loop->timer_head;
if (!head) {
pthread_cond_wait(&loop->timer_cond, &loop->timer_mutex);
continue;
}
if (head->due_ms > now) {
uint64_t wait_ms = head->due_ms - now;
#if defined(__APPLE__)
struct timespec rel;
rel.tv_sec = (time_t)(wait_ms / 1000ull);
rel.tv_nsec = (long)((wait_ms % 1000ull) * 1000000ull);
(void)pthread_cond_timedwait_relative_np(&loop->timer_cond, &loop->timer_mutex, &rel);
#else
struct timespec ts;
#if defined(__linux__)
clockid_t clock_id = loop->timer_cond_monotonic ? CLOCK_MONOTONIC : CLOCK_REALTIME;
clock_gettime(clock_id, &ts);
#else
clock_gettime(CLOCK_REALTIME, &ts);
#endif
ts.tv_sec += (time_t)(wait_ms / 1000ull);
ts.tv_nsec += (long)((wait_ms % 1000ull) * 1000000ull);
if (ts.tv_nsec >= 1000000000l) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000l;
}
(void)pthread_cond_timedwait(&loop->timer_cond, &loop->timer_mutex, &ts);
#endif
continue;
}
loop->timer_head = head->next;
pthread_mutex_unlock(&loop->timer_mutex);
jzx_async_enqueue(loop, head->target, head->data, head->len, head->tag, 0);
jzx_free(&loop->allocator, head);
pthread_mutex_lock(&loop->timer_mutex);
}
pthread_mutex_unlock(&loop->timer_mutex);
return NULL;
}

This is the timer thread loop: wait until next due timer, then enqueue wakeups/deliveries.

jzx_timer_system_init()
static jzx_err jzx_timer_system_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->timer_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->timer_cond_monotonic = 0;
#if defined(__linux__)
pthread_condattr_t attr;
pthread_condattr_t* attr_ptr = NULL;
if (pthread_condattr_init(&attr) == 0) {
attr_ptr = &attr;
if (pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0) {
loop->timer_cond_monotonic = 1;
}
}
int cond_rc = pthread_cond_init(&loop->timer_cond, attr_ptr);
if (attr_ptr) {
pthread_condattr_destroy(&attr);
}
if (cond_rc != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#else
if (pthread_cond_init(&loop->timer_cond, NULL) != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#endif
loop->timer_mutex_initialized = 1;
loop->timer_thread_running = 0;
loop->timer_stop = 0;
loop->timer_head = NULL;
loop->next_timer_id = 1;
if (pthread_create(&loop->timer_thread, NULL, jzx_timer_thread_main, loop) != 0) {
pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
return JZX_ERR_UNKNOWN;
}
loop->timer_thread_running = 1;
return JZX_OK;
}
jzx_timer_system_shutdown()
static void jzx_timer_system_shutdown(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->timer_mutex);
loop->timer_stop = 1;
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);

if (loop->timer_thread_running) {
pthread_join(loop->timer_thread, NULL);
loop->timer_thread_running = 0;
}

pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* cur = loop->timer_head;
loop->timer_head = NULL;
pthread_mutex_unlock(&loop->timer_mutex);

while (cur) {
jzx_timer_entry* next = cur->next;
jzx_free(&loop->allocator, cur);
cur = next;
}

pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
}
jzx_timer_has_pending()
static int jzx_timer_has_pending(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->timer_mutex);
int has = loop->timer_head != NULL;
pthread_mutex_unlock(&loop->timer_mutex);
return has;
}

I/O watchers (fd table + backend registration)

Section: I/O watchers
// -----------------------------------------------------------------------------
// I O watchers
// -----------------------------------------------------------------------------
jzx_io_init()
static jzx_err jzx_io_init(jzx_loop* loop, uint32_t capacity) {
loop->io_capacity = capacity ? capacity : 1;
loop->io_count = 0;
loop->io_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * loop->io_capacity);
if (!loop->io_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(loop->io_watchers, 0, sizeof(jzx_io_watch) * loop->io_capacity);
return JZX_OK;
}
jzx_io_deinit()
static void jzx_io_deinit(jzx_loop* loop) {
if (loop->io_watchers) {
jzx_free(&loop->allocator, loop->io_watchers);
loop->io_watchers = NULL;
}
loop->io_capacity = 0;
loop->io_count = 0;
}
jzx_io_reserve()
static jzx_err jzx_io_reserve(jzx_loop* loop, uint32_t new_cap) {
jzx_io_watch* new_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * new_cap);
if (!new_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(new_watchers, 0, sizeof(jzx_io_watch) * new_cap);
if (loop->io_watchers) {
memcpy(new_watchers, loop->io_watchers, sizeof(jzx_io_watch) * loop->io_count);
jzx_free(&loop->allocator, loop->io_watchers);
}
loop->io_watchers = new_watchers;
loop->io_capacity = new_cap;
return JZX_OK;
}
jzx_io_find()
static jzx_io_watch* jzx_io_find(jzx_loop* loop, int fd, uint32_t* idx_out) {
for (uint32_t i = 0; i < loop->io_count; ++i) {
if (loop->io_watchers[i].fd == fd) {
if (idx_out) {
*idx_out = i;
}
return &loop->io_watchers[i];
}
}
return NULL;
}
jzx_io_remove_index()
static void jzx_io_remove_index(jzx_loop* loop, uint32_t idx) {
if (idx >= loop->io_count) {
return;
}
if (loop->xev) {
jzx_xev_unwatch_fd(loop->xev, loop->io_watchers[idx].fd);
}
uint32_t last = loop->io_count - 1;
if (idx != last) {
loop->io_watchers[idx] = loop->io_watchers[last];
}
loop->io_count--;
}
jzx_io_remove_actor()
static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor) {
for (uint32_t i = 0; i < loop->io_count;) {
if (loop->io_watchers[i].owner == actor) {
jzx_io_remove_index(loop, i);
continue;
}
++i;
}
}

libxev → runtime bridge (jzx_io_xev_notify)

The src/jzx_xev.zig layer needs a single, C-callable “upcall” into the runtime when an fd becomes ready. That is what jzx_io_xev_notify is for.

jzx_io_xev_notify(): deliver readiness as a system message
uint8_t jzx_io_xev_notify(jzx_loop* loop, int fd, uint32_t readiness) {
if (!loop || !loop->running || fd < 0 || readiness == 0) {
return 0;
}

uint32_t idx = 0;
jzx_io_watch* watch = jzx_io_find(loop, fd, &idx);
if (!watch) {
return 0;
}
if (!jzx_actor_table_lookup(&loop->actors, watch->owner)) {
jzx_io_remove_index(loop, idx);
return 0;
}

jzx_io_event* ev = (jzx_io_event*)jzx_alloc(&loop->allocator, sizeof(jzx_io_event));
if (!ev) {
return 1;
}
ev->fd = fd;
ev->readiness = readiness;
jzx_err err =
jzx_send_internal(loop, watch->owner, ev, sizeof(jzx_io_event), JZX_TAG_SYS_IO, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
return 1;
}

Read it as a three-stage pipeline:

  1. Validate: if the loop isn’t running yet, or the event is invalid, return 0 (not delivered).
  2. Resolve: map fd → watch → owner actor.
    • If the owner no longer exists, the watch is removed (self-healing).
  3. Deliver: allocate a jzx_io_event, enqueue it as a system message (JZX_TAG_SYS_IO).

The return value is intentionally small and lossy:

  • 0: nothing was delivered (no watch, invalid state, stale owner).
  • 1: the event was “handled” from the backend’s perspective.
    • Even if the message could not be enqueued, the backend shouldn’t spin endlessly on the same readiness.

Config helpers and loop lifecycle (public ABI entry points)

Default allocator hooks

If the caller doesn’t provide a custom allocator, libjzx uses malloc/free.

Config helpers: default allocator
// -----------------------------------------------------------------------------
// Config helpers
// -----------------------------------------------------------------------------

static void* default_alloc(void* ctx, size_t size) {
(void)ctx;
return malloc(size);
}

static void default_free(void* ctx, void* ptr) {
(void)ctx;
free(ptr);
}

Why this is written this way:

  • The allocator signature includes a ctx pointer so applications can route allocations to arenas/pools.
  • The default implementation ignores ctx and forwards to the platform heap.
jzx_config_init()
void jzx_config_init(jzx_config* cfg) {
if (!cfg) {
return;
}
memset(cfg, 0, sizeof(*cfg));
cfg->allocator.alloc = default_alloc;
cfg->allocator.free = default_free;
cfg->allocator.ctx = NULL;
cfg->max_actors = 1024;
cfg->default_mailbox_cap = 1024;
cfg->max_msgs_per_actor = 64;
cfg->max_actors_per_tick = 1024;
cfg->max_io_watchers = 1024;
cfg->io_poll_timeout_ms = 10;
}

“Apply defaults” for partially-initialized configs

jzx_config_init gives you a fully populated config, but callers can also pass a partially-filled struct (for example, override io_poll_timeout_ms while leaving everything else “default”).

apply_defaults is the guardrail that turns “some fields set to 0” into “reasonable default values”.

apply_defaults(): fill in missing config values
static void apply_defaults(jzx_config* cfg) {
if (!cfg->allocator.alloc) {
cfg->allocator.alloc = default_alloc;
}
if (!cfg->allocator.free) {
cfg->allocator.free = default_free;
}
if (cfg->max_actors == 0) {
cfg->max_actors = 1024;
}
if (cfg->default_mailbox_cap == 0) {
cfg->default_mailbox_cap = 1024;
}
if (cfg->max_msgs_per_actor == 0) {
cfg->max_msgs_per_actor = 64;
}
if (cfg->max_actors_per_tick == 0) {
cfg->max_actors_per_tick = 1024;
}
if (cfg->max_io_watchers == 0) {
cfg->max_io_watchers = 1024;
}
if (cfg->io_poll_timeout_ms == 0) {
cfg->io_poll_timeout_ms = 10;
}
}

Why this exists:

  • It lets you do “override one knob” without having to update this file every time new knobs are added.
  • It makes 0 mean “use default” for most numeric config fields, which keeps the API ergonomic.
Section: loop lifecycle
// -----------------------------------------------------------------------------
// Loop lifecycle
// -----------------------------------------------------------------------------
jzx_loop_create()
jzx_loop* jzx_loop_create(const jzx_config* cfg) {
jzx_config local;
if (cfg) {
local = *cfg;
} else {
jzx_config_init(&local);
}
apply_defaults(&local);

jzx_loop* loop = (jzx_loop*)jzx_alloc(&local.allocator, sizeof(jzx_loop));
if (!loop) {
return NULL;
}
memset(loop, 0, sizeof(*loop));
loop->cfg = local;
loop->allocator = local.allocator;
loop->xev = jzx_xev_create();
if (!loop->xev) {
jzx_loop_destroy(loop);
return NULL;
}

if (jzx_actor_table_init(&loop->actors, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_run_queue_init(&loop->run_queue, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_async_queue_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_timer_system_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_io_init(loop, local.max_io_watchers) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
loop->running = 0;
loop->stop_requested = 0;
return loop;
}
jzx_loop_destroy()
void jzx_loop_destroy(jzx_loop* loop) {
if (!loop) {
return;
}
jzx_timer_system_shutdown(loop);
jzx_async_queue_destroy(loop);
if (loop->xev) {
jzx_xev_destroy(loop->xev);
loop->xev = NULL;
}
jzx_io_deinit(loop);
for (uint32_t i = 0; i < loop->actors.capacity; ++i) {
jzx_actor* actor = loop->actors.slots ? loop->actors.slots[i] : NULL;
if (actor) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
loop->actors.slots[i] = NULL;
}
}
jzx_actor_table_deinit(&loop->actors, &loop->allocator);
jzx_run_queue_deinit(&loop->run_queue, &loop->allocator);
jzx_free(&loop->allocator, loop);
}
jzx_loop_run()
int jzx_loop_run(jzx_loop* loop) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (loop->running) {
return JZX_ERR_LOOP_CLOSED;
}
loop->running = 1;
int rc = JZX_OK;
while (!loop->stop_requested) {
jzx_async_drain(loop);
jzx_xev_run(loop->xev, 0);
uint32_t actors_processed = 0;
while (actors_processed < loop->cfg.max_actors_per_tick) {
jzx_actor* actor = jzx_run_queue_pop(&loop->run_queue);
if (!actor) {
break;
}
actor->in_run_queue = 0;
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
continue;
}

uint32_t processed_msgs = 0;
while (processed_msgs < loop->cfg.max_msgs_per_actor) {
jzx_message msg;
if (jzx_mailbox_pop(&actor->mailbox, &msg) != 0) {
break;
}
jzx_context ctx = {
.state = actor->state,
.self = actor->id,
.loop = loop,
};
jzx_behavior_result result = actor->behavior(&ctx, &msg);
processed_msgs++;
if (result == JZX_BEHAVIOR_STOP) {
actor->status = JZX_ACTOR_STOPPING;
break;
} else if (result == JZX_BEHAVIOR_FAIL) {
actor->status = JZX_ACTOR_FAILED;
break;
}
}
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
} else if (jzx_mailbox_has_items(&actor->mailbox)) {
jzx_schedule_actor(loop, actor);
}
actors_processed++;
}

if (loop->run_queue.count == 0) {
if (loop->actors.used == 0 && !jzx_async_has_pending(loop) &&
!jzx_timer_has_pending(loop) && loop->io_count == 0) {
for (uint32_t i = 0; i < 64; ++i) {
jzx_xev_run(loop->xev, 0);
}
break;
}
if (jzx_async_has_pending(loop)) {
continue;
}
jzx_xev_run(loop->xev, 1);
}
}
loop->running = 0;
loop->stop_requested = 0;
return rc;
}

This is the main event loop: it runs ready actors up to configured budgets and integrates I/O + timers + async sends.

jzx_loop_request_stop()
void jzx_loop_request_stop(jzx_loop* loop) {
if (!loop) {
return;
}
loop->stop_requested = 1;
jzx_wakeup_signal(loop);
if (loop->timer_mutex_initialized) {
pthread_mutex_lock(&loop->timer_mutex);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
}
}
jzx_loop_free()
void jzx_loop_free(jzx_loop* loop, void* ptr) {
if (!loop || !ptr) {
return;
}
jzx_free(&loop->allocator, ptr);
}
jzx_loop_set_observer()
void jzx_loop_set_observer(jzx_loop* loop, const jzx_observer* obs, void* ctx) {
if (!loop) {
return;
}
if (obs) {
loop->observer = *obs;
loop->observer_ctx = ctx;
} else {
memset(&loop->observer, 0, sizeof(loop->observer));
loop->observer_ctx = NULL;
}
}

Actor APIs (spawn/send/stop/fail)

Section: actor APIs
// -----------------------------------------------------------------------------
// Actor APIs
// -----------------------------------------------------------------------------
jzx_actor_create() (internal helper)
static jzx_actor* jzx_actor_create(jzx_loop* loop, const jzx_spawn_opts* opts) {
jzx_actor* actor = (jzx_actor*)jzx_alloc(&loop->allocator, sizeof(jzx_actor));
if (!actor) {
return NULL;
}
memset(actor, 0, sizeof(*actor));
actor->status = JZX_ACTOR_RUNNING;
actor->behavior = opts->behavior;
actor->state = opts->state;
actor->supervisor = opts->supervisor;
if (jzx_mailbox_init(&actor->mailbox,
opts->mailbox_cap ? opts->mailbox_cap : loop->cfg.default_mailbox_cap,
&loop->allocator) != JZX_OK) {
jzx_free(&loop->allocator, actor);
return NULL;
}
return actor;
}
jzx_spawn()
jzx_err jzx_spawn(jzx_loop* loop, const jzx_spawn_opts* opts, jzx_actor_id* out_id) {
if (!loop || !opts || !opts->behavior) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_create(loop, opts);
if (!actor) {
return JZX_ERR_NO_MEMORY;
}
jzx_err err = jzx_actor_table_insert(&loop->actors, actor, &loop->allocator, out_id);
if (err != JZX_OK) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
return err;
}
jzx_obs_actor_start(loop, actor->id, opts->name);
return JZX_OK;
}
jzx_send_internal() (core enqueue path)
static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, target);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_message msg = {
.data = data,
.len = len,
.tag = tag,
.sender = sender,
};
if (jzx_mailbox_push(&actor->mailbox, &msg) != 0) {
jzx_obs_mailbox_full(loop, target);
return JZX_ERR_MAILBOX_FULL;
}
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
jzx_send()
jzx_err jzx_send(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_send_internal(loop, target, data, len, tag, 0);
}
jzx_send_async()
jzx_err jzx_send_async(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_async_enqueue(loop, target, data, len, tag, 0);
}
jzx_actor_stop()
jzx_err jzx_actor_stop(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_STOPPING;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
jzx_actor_fail()
jzx_err jzx_actor_fail(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_FAILED;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}

Supervisor APIs (public ABI entry points)

Section: supervisor spawn
// -----------------------------------------------------------------------------
// Supervisor spawn
// -----------------------------------------------------------------------------
jzx_spawn_supervisor()
jzx_err jzx_spawn_supervisor(jzx_loop* loop, const jzx_supervisor_init* init, jzx_actor_id parent,
jzx_actor_id* out_id) {
if (!loop || !init || !init->children || init->child_count == 0) {
return JZX_ERR_INVALID_ARG;
}
jzx_supervisor_state* state = jzx_supervisor_state_create(init, &loop->allocator);
if (!state) {
return JZX_ERR_NO_MEMORY;
}
jzx_spawn_opts opts = {
.behavior = jzx_supervisor_behavior,
.state = state,
.supervisor = parent,
.mailbox_cap = 0,
.name = NULL,
};
jzx_actor_id sup_id = 0;
jzx_err err = jzx_spawn(loop, &opts, &sup_id);
if (err != JZX_OK) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return err;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, sup_id);
if (!sup_actor) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return JZX_ERR_UNKNOWN;
}
sup_actor->supervisor_state = state;

for (size_t i = 0; i < state->child_count; ++i) {
err = jzx_supervisor_spawn_child(loop, sup_id, &state->children[i]);
if (err != JZX_OK) {
(void)jzx_actor_fail(loop, sup_id);
return err;
}
}

if (out_id) {
*out_id = sup_id;
}
return JZX_OK;
}
jzx_supervisor_child_id()
jzx_err jzx_supervisor_child_id(jzx_loop* loop, jzx_actor_id supervisor, size_t index,
jzx_actor_id* out_id) {
if (!loop || !out_id) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, supervisor);
if (!sup_actor || !sup_actor->supervisor_state) {
return JZX_ERR_NO_SUCH_ACTOR;
}
if (index >= sup_actor->supervisor_state->child_count) {
return JZX_ERR_INVALID_ARG;
}
*out_id = sup_actor->supervisor_state->children[index].id;
return JZX_OK;
}

Timers and I/O APIs (public ABI entry points)

Section: timers & IO
// -----------------------------------------------------------------------------
// Timers & IO
// -----------------------------------------------------------------------------
jzx_send_after()
jzx_err jzx_send_after(jzx_loop* loop, jzx_actor_id target, uint32_t ms, void* data, size_t len,
uint32_t tag, jzx_timer_id* out_timer) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, target)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_timer_entry* entry = (jzx_timer_entry*)jzx_alloc(&loop->allocator, sizeof(jzx_timer_entry));
if (!entry) {
return JZX_ERR_NO_MEMORY;
}
entry->target = target;
entry->data = data;
entry->len = len;
entry->tag = tag;
entry->next = NULL;

pthread_mutex_lock(&loop->timer_mutex);
entry->id = loop->next_timer_id++;
entry->due_ms = jzx_now_ms() + (uint64_t)ms;
jzx_timer_insert_locked(loop, entry);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);

if (out_timer) {
*out_timer = entry->id;
}
return JZX_OK;
}
jzx_cancel_timer()
jzx_err jzx_cancel_timer(jzx_loop* loop, jzx_timer_id timer) {
if (!loop || !loop->timer_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* prev = NULL;
jzx_timer_entry* cur = loop->timer_head;
while (cur) {
if (cur->id == timer) {
if (prev) {
prev->next = cur->next;
} else {
loop->timer_head = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
jzx_free(&loop->allocator, cur);
return JZX_OK;
}
prev = cur;
cur = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
return JZX_ERR_TIMER_INVALID;
}
jzx_watch_fd()
jzx_err jzx_watch_fd(jzx_loop* loop, int fd, jzx_actor_id owner, uint32_t interest) {
if (!loop || !loop->xev || fd < 0 || interest == 0) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, owner)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_io_watch* existing = jzx_io_find(loop, fd, NULL);
if (existing) {
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
return err;
}
existing->owner = owner;
existing->interest = interest;
return JZX_OK;
}
if (loop->io_count == loop->io_capacity) {
jzx_err err = jzx_io_reserve(loop, loop->io_capacity * 2);
if (err != JZX_OK) {
return err;
}
}
uint32_t idx = loop->io_count;
loop->io_watchers[idx] = (jzx_io_watch){
.fd = fd,
.owner = owner,
.interest = interest,
.active = 1,
};
loop->io_count = idx + 1;
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
loop->io_count = idx;
memset(&loop->io_watchers[idx], 0, sizeof(loop->io_watchers[idx]));
return err;
}
return JZX_OK;
}
jzx_unwatch_fd()
jzx_err jzx_unwatch_fd(jzx_loop* loop, int fd) {
if (!loop || fd < 0) {
return JZX_ERR_INVALID_ARG;
}
uint32_t idx = 0;
jzx_io_watch* entry = jzx_io_find(loop, fd, &idx);
if (!entry) {
return JZX_ERR_IO_NOT_WATCHED;
}
jzx_io_remove_index(loop, idx);
return JZX_OK;
}

Appendix: full file (for grepping and context)

Show full src/jzx_runtime.c
src/jzx_runtime.c (full source)
#include "jzx_internal.h"

#include <stdlib.h>
#include <string.h>
#include <time.h>

// -----------------------------------------------------------------------------
// Utility helpers
// -----------------------------------------------------------------------------

static inline uint32_t jzx_id_index(jzx_actor_id id) {
return (uint32_t)(id & 0xffffffffu);
}

static inline uint32_t jzx_id_generation(jzx_actor_id id) {
return (uint32_t)(id >> 32u);
}

static inline jzx_actor_id jzx_make_id(uint32_t gen, uint32_t idx) {
return ((uint64_t)gen << 32u) | (uint64_t)idx;
}

static void* jzx_alloc(jzx_allocator* alloc, size_t size) {
return alloc->alloc ? alloc->alloc(alloc->ctx, size) : NULL;
}

static void jzx_free(jzx_allocator* alloc, void* ptr) {
if (alloc->free) {
alloc->free(alloc->ctx, ptr);
}
}

static jzx_supervisor_state* jzx_supervisor_state_create(const jzx_supervisor_init* init,
jzx_allocator* allocator) {
if (!init || init->child_count == 0 || !init->children) {
return NULL;
}
jzx_supervisor_state* state =
(jzx_supervisor_state*)jzx_alloc(allocator, sizeof(jzx_supervisor_state));
if (!state) {
return NULL;
}
memset(state, 0, sizeof(*state));
state->config = init->supervisor;
state->child_count = init->child_count;
size_t bytes = sizeof(jzx_child_state) * init->child_count;
state->children = (jzx_child_state*)jzx_alloc(allocator, bytes);
if (!state->children) {
jzx_free(allocator, state);
return NULL;
}
memset(state->children, 0, bytes);
for (size_t i = 0; i < init->child_count; ++i) {
state->children[i].spec = init->children[i];
state->children[i].id = 0;
state->children[i].restart_count = 0;
state->children[i].last_restart_ms = 0;
}
state->intensity_window_count = 0;
state->intensity_window_start_ms = 0;
return state;
}

static void jzx_supervisor_state_destroy(jzx_supervisor_state* state, jzx_allocator* allocator) {
if (!state)
return;
if (state->children) {
jzx_free(allocator, state->children);
}
jzx_free(allocator, state);
}

static int jzx_supervisor_allow_restart(jzx_supervisor_state* sup, uint64_t now_ms) {
if (!sup)
return 0;
if (sup->config.intensity == 0 || sup->config.period_ms == 0) {
return 1;
}
if (sup->intensity_window_start_ms == 0 ||
now_ms - sup->intensity_window_start_ms > sup->config.period_ms) {
sup->intensity_window_start_ms = now_ms;
sup->intensity_window_count = 0;
}
sup->intensity_window_count += 1;
if (sup->intensity_window_count > sup->config.intensity) {
return 0;
}
return 1;
}

static uint64_t jzx_now_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000ull + (uint64_t)ts.tv_nsec / 1000000ull;
}

static uint32_t jzx_sat_add32(uint32_t a, uint32_t b) {
uint64_t sum = (uint64_t)a + (uint64_t)b;
if (sum > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)sum;
}

static uint32_t jzx_sat_mul32(uint32_t a, uint32_t b) {
if (a == 0 || b == 0)
return 0;
uint64_t prod = (uint64_t)a * (uint64_t)b;
if (prod > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)prod;
}

static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender);

static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor);

// -----------------------------------------------------------------------------
// Wakeup helpers
// -----------------------------------------------------------------------------

static void jzx_wakeup_signal(jzx_loop* loop) {
if (!loop || !loop->xev) {
return;
}
jzx_xev_wakeup(loop->xev);
}

// -----------------------------------------------------------------------------
// Observer helpers
// -----------------------------------------------------------------------------

static void jzx_obs_actor_start(jzx_loop* loop, jzx_actor_id id, const char* name) {
if (loop && loop->observer.on_actor_start) {
loop->observer.on_actor_start(loop->observer_ctx, id, name);
}
}

static void jzx_obs_actor_stop(jzx_loop* loop, jzx_actor_id id, jzx_exit_reason reason) {
if (loop && loop->observer.on_actor_stop) {
loop->observer.on_actor_stop(loop->observer_ctx, id, reason);
}
}

static void jzx_obs_actor_restart(jzx_loop* loop, jzx_actor_id supervisor, jzx_actor_id child,
uint32_t attempt) {
if (loop && loop->observer.on_actor_restart) {
loop->observer.on_actor_restart(loop->observer_ctx, supervisor, child, attempt);
}
}

static void jzx_obs_supervisor_escalate(jzx_loop* loop, jzx_actor_id supervisor) {
if (loop && loop->observer.on_supervisor_escalate) {
loop->observer.on_supervisor_escalate(loop->observer_ctx, supervisor);
}
}

static void jzx_obs_mailbox_full(jzx_loop* loop, jzx_actor_id target) {
if (loop && loop->observer.on_mailbox_full) {
loop->observer.on_mailbox_full(loop->observer_ctx, target);
}
}

// -----------------------------------------------------------------------------
// Mailbox implementation
// -----------------------------------------------------------------------------

static jzx_err jzx_mailbox_init(jzx_mailbox_impl* box, uint32_t capacity,
jzx_allocator* allocator) {
if (capacity == 0) {
capacity = 1;
}
size_t bytes = sizeof(jzx_message) * capacity;
jzx_message* buffer = (jzx_message*)jzx_alloc(allocator, bytes);
if (!buffer) {
return JZX_ERR_NO_MEMORY;
}
memset(buffer, 0, bytes);
box->buffer = buffer;
box->capacity = capacity;
box->head = 0;
box->tail = 0;
box->count = 0;
return JZX_OK;
}

static void jzx_mailbox_deinit(jzx_mailbox_impl* box, jzx_allocator* allocator) {
if (box->buffer) {
jzx_free(allocator, box->buffer);
}
memset(box, 0, sizeof(*box));
}

static int jzx_mailbox_push(jzx_mailbox_impl* box, const jzx_message* msg) {
if (box->count == box->capacity) {
return -1;
}
box->buffer[box->tail] = *msg;
box->tail = (box->tail + 1) % box->capacity;
box->count++;
return 0;
}

static int jzx_mailbox_pop(jzx_mailbox_impl* box, jzx_message* out) {
if (box->count == 0) {
return -1;
}
*out = box->buffer[box->head];
box->head = (box->head + 1) % box->capacity;
box->count--;
return 0;
}

static int jzx_mailbox_has_items(const jzx_mailbox_impl* box) {
return box->count > 0;
}

// -----------------------------------------------------------------------------
// Actor table implementation
// -----------------------------------------------------------------------------

static jzx_err jzx_actor_table_init(jzx_actor_table* table, uint32_t capacity,
jzx_allocator* allocator) {
memset(table, 0, sizeof(*table));
table->capacity = capacity;
size_t slot_bytes = sizeof(jzx_actor*) * capacity;
size_t gen_bytes = sizeof(uint32_t) * capacity;
size_t stack_bytes = sizeof(uint32_t) * capacity;

table->slots = (jzx_actor**)jzx_alloc(allocator, slot_bytes);
table->generations = (uint32_t*)jzx_alloc(allocator, gen_bytes);
table->free_stack = (uint32_t*)jzx_alloc(allocator, stack_bytes);
if (!table->slots || !table->generations || !table->free_stack) {
return JZX_ERR_NO_MEMORY;
}

memset(table->slots, 0, slot_bytes);
for (uint32_t i = 0; i < capacity; ++i) {
table->generations[i] = 1;
table->free_stack[i] = capacity - 1 - i;
}
table->free_top = capacity;
table->used = 0;
return JZX_OK;
}

static void jzx_actor_table_deinit(jzx_actor_table* table, jzx_allocator* allocator) {
if (!table) {
return;
}
if (table->slots) {
jzx_free(allocator, table->slots);
}
if (table->generations) {
jzx_free(allocator, table->generations);
}
if (table->free_stack) {
jzx_free(allocator, table->free_stack);
}
memset(table, 0, sizeof(*table));
}

static jzx_actor* jzx_actor_table_lookup(jzx_actor_table* table, jzx_actor_id id) {
uint32_t idx = jzx_id_index(id);
if (idx >= table->capacity) {
return NULL;
}
if (table->generations[idx] != jzx_id_generation(id)) {
return NULL;
}
return table->slots[idx];
}

static jzx_err jzx_actor_table_insert(jzx_actor_table* table, jzx_actor* actor,
jzx_allocator* allocator, jzx_actor_id* out_id) {
(void)allocator;
if (table->free_top == 0) {
return JZX_ERR_MAX_ACTORS;
}
uint32_t idx = table->free_stack[--table->free_top];
uint32_t gen = table->generations[idx];
actor->id = jzx_make_id(gen, idx);
table->slots[idx] = actor;
table->used++;
if (out_id) {
*out_id = actor->id;
}
return JZX_OK;
}

static void jzx_actor_table_remove(jzx_actor_table* table, jzx_actor* actor) {
if (!actor) {
return;
}
uint32_t idx = jzx_id_index(actor->id);
if (idx >= table->capacity) {
return;
}
if (table->slots[idx] != actor) {
return;
}
table->slots[idx] = NULL;
table->generations[idx] += 1u;
table->free_stack[table->free_top++] = idx;
if (table->used > 0) {
table->used--;
}
}

// -----------------------------------------------------------------------------
// Run queue implementation
// -----------------------------------------------------------------------------

static jzx_err jzx_run_queue_init(jzx_run_queue* rq, uint32_t capacity, jzx_allocator* allocator) {
memset(rq, 0, sizeof(*rq));
rq->capacity = capacity > 0 ? capacity : 1;
rq->entries = (jzx_actor**)jzx_alloc(allocator, sizeof(jzx_actor*) * rq->capacity);
if (!rq->entries) {
return JZX_ERR_NO_MEMORY;
}
memset(rq->entries, 0, sizeof(jzx_actor*) * rq->capacity);
return JZX_OK;
}

static void jzx_run_queue_deinit(jzx_run_queue* rq, jzx_allocator* allocator) {
if (rq->entries) {
jzx_free(allocator, rq->entries);
}
memset(rq, 0, sizeof(*rq));
}

static int jzx_run_queue_push(jzx_run_queue* rq, jzx_actor* actor) {
if (rq->count == rq->capacity) {
return -1;
}
rq->entries[rq->tail] = actor;
rq->tail = (rq->tail + 1) % rq->capacity;
rq->count++;
return 0;
}

static jzx_actor* jzx_run_queue_pop(jzx_run_queue* rq) {
if (rq->count == 0) {
return NULL;
}
jzx_actor* actor = rq->entries[rq->head];
rq->entries[rq->head] = NULL;
rq->head = (rq->head + 1) % rq->capacity;
rq->count--;
return actor;
}

static void jzx_schedule_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor || actor->in_run_queue) {
return;
}
if (jzx_run_queue_push(&loop->run_queue, actor) == 0) {
actor->in_run_queue = 1;
}
}

static void jzx_teardown_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor) {
return;
}
jzx_io_remove_actor(loop, actor->id);
jzx_exit_reason reason = JZX_EXIT_NORMAL;
if (actor->status == JZX_ACTOR_FAILED) {
reason = JZX_EXIT_FAIL;
}
jzx_obs_actor_stop(loop, actor->id, reason);
if (actor->supervisor) {
jzx_child_exit* ev = (jzx_child_exit*)jzx_alloc(&loop->allocator, sizeof(jzx_child_exit));
if (ev) {
ev->child = actor->id;
ev->status = actor->status;
jzx_err err = jzx_send_internal(loop, actor->supervisor, ev, sizeof(jzx_child_exit),
JZX_TAG_SYS_CHILD_EXIT, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
}
}
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_actor_table_remove(&loop->actors, actor);
if (actor->supervisor_state) {
jzx_supervisor_state_destroy(actor->supervisor_state, &loop->allocator);
}
jzx_free(&loop->allocator, actor);
}

// -----------------------------------------------------------------------------
// Supervisor helpers
// -----------------------------------------------------------------------------

static jzx_child_state* jzx_supervisor_find_child(jzx_supervisor_state* sup, jzx_actor_id id,
size_t* out_idx) {
if (!sup)
return NULL;
for (size_t i = 0; i < sup->child_count; ++i) {
if (sup->children[i].id == id) {
if (out_idx) {
*out_idx = i;
}
return &sup->children[i];
}
}
return NULL;
}

static jzx_err jzx_supervisor_spawn_child(jzx_loop* loop, jzx_actor_id supervisor_id,
jzx_child_state* child) {
jzx_spawn_opts opts = {
.behavior = child->spec.behavior,
.state = child->spec.state,
.supervisor = supervisor_id,
.mailbox_cap = child->spec.mailbox_cap,
.name = child->spec.name,
};
child->last_restart_ms = jzx_now_ms();
return jzx_spawn(loop, &opts, &child->id);
}

static void jzx_supervisor_stop_child(jzx_loop* loop, jzx_child_state* child) {
if (child->id != 0) {
(void)jzx_actor_stop(loop, child->id);
child->id = 0;
}
}

static void jzx_supervisor_schedule_restart(jzx_loop* loop, jzx_actor* sup_actor, size_t child_idx,
uint32_t delay_ms) {
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (!sup || child_idx >= sup->child_count)
return;
if (delay_ms == 0) {
(void)jzx_supervisor_spawn_child(loop, sup_actor->id, &sup->children[child_idx]);
return;
}
jzx_child_restart* payload =
(jzx_child_restart*)jzx_alloc(&loop->allocator, sizeof(jzx_child_restart));
if (!payload)
return;
payload->child_index = (uint32_t)child_idx;
jzx_err err = jzx_send_after(loop, sup_actor->id, delay_ms, payload, sizeof(jzx_child_restart),
JZX_TAG_SYS_CHILD_RESTART, NULL);
if (err != JZX_OK) {
jzx_free(&loop->allocator, payload);
}
}

static uint32_t jzx_supervisor_compute_delay(const jzx_supervisor_state* sup,
const jzx_child_state* child) {
if (!sup || !child)
return 0;
uint32_t base = child->spec.restart_delay_ms;
jzx_backoff_type strategy = child->spec.backoff;
if (strategy == JZX_BACKOFF_NONE) {
strategy = sup->config.backoff;
}
uint32_t step = sup->config.backoff_delay_ms;
switch (strategy) {
case JZX_BACKOFF_NONE:
return base;
case JZX_BACKOFF_CONSTANT: {
uint32_t extra = jzx_sat_mul32(step, child->restart_count);
return jzx_sat_add32(base, extra);
}
case JZX_BACKOFF_EXPONENTIAL: {
uint32_t factor = 1u;
uint32_t shifts = child->restart_count;
if (shifts >= 31) {
factor = UINT32_MAX;
} else {
factor = 1u << shifts;
}
uint32_t scaled_base = jzx_sat_mul32(base ? base : step, factor);
return scaled_base;
}
}
return base;
}

static void jzx_supervisor_restart_strategy(jzx_loop* loop, jzx_actor* supervisor_actor,
size_t failed_idx, jzx_actor_id failed_child_id,
jzx_supervisor_state* sup) {
uint32_t failed_delay = sup->children[failed_idx].spec.restart_delay_ms;
switch (sup->config.strategy) {
case JZX_SUP_ONE_FOR_ONE:
if (failed_child_id) {
uint32_t attempt = sup->children[failed_idx].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, failed_child_id, attempt);
}
sup->children[failed_idx].restart_count += 1;
sup->children[failed_idx].last_restart_ms = jzx_now_ms();
failed_delay = jzx_supervisor_compute_delay(sup, &sup->children[failed_idx]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, failed_idx, failed_delay);
break;
case JZX_SUP_ONE_FOR_ALL:
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = 0; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
case JZX_SUP_REST_FOR_ONE:
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
}
}

static jzx_behavior_result jzx_supervisor_behavior(jzx_context* ctx, const jzx_message* msg) {
jzx_actor* sup_actor = jzx_actor_table_lookup(&ctx->loop->actors, ctx->self);
if (!sup_actor || !sup_actor->supervisor_state) {
if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (msg->tag == JZX_TAG_SYS_CHILD_EXIT && msg->data) {
jzx_child_exit* ev = (jzx_child_exit*)msg->data;
jzx_actor_id failed_child_id = ev->child;
size_t idx = 0;
jzx_child_state* child = jzx_supervisor_find_child(sup, ev->child, &idx);
jzx_actor_status status = ev->status;
jzx_free(&ctx->loop->allocator, ev);
if (!child) {
return JZX_BEHAVIOR_OK;
}
child->id = 0;

int restart = 0;
if (child->spec.mode == JZX_CHILD_PERMANENT) {
restart = 1;
} else if (child->spec.mode == JZX_CHILD_TRANSIENT && status == JZX_ACTOR_FAILED) {
restart = 1;
}

if (!restart) {
return JZX_BEHAVIOR_OK;
}

uint64_t now = jzx_now_ms();
if (!jzx_supervisor_allow_restart(sup, now)) {
jzx_obs_supervisor_escalate(ctx->loop, ctx->self);
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(ctx->loop, &sup->children[i]);
}
sup_actor->status = JZX_ACTOR_FAILED;
return JZX_BEHAVIOR_FAIL;
}

jzx_supervisor_restart_strategy(ctx->loop, sup_actor, idx, failed_child_id, sup);
return JZX_BEHAVIOR_OK;
}

if (msg->tag == JZX_TAG_SYS_CHILD_RESTART && msg->data) {
jzx_child_restart* ev = (jzx_child_restart*)msg->data;
uint32_t idx = ev->child_index;
jzx_free(&ctx->loop->allocator, ev);
if (idx < sup->child_count) {
(void)jzx_supervisor_spawn_child(ctx->loop, ctx->self, &sup->children[idx]);
}
return JZX_BEHAVIOR_OK;
}

if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}

// -----------------------------------------------------------------------------
// Async queue
// -----------------------------------------------------------------------------

static jzx_err jzx_async_queue_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->async_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->async_mutex_initialized = 1;
loop->async_head = NULL;
loop->async_tail = NULL;
return JZX_OK;
}

static void jzx_async_queue_destroy(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
pthread_mutex_destroy(&loop->async_mutex);
loop->async_mutex_initialized = 0;
while (head) {
jzx_async_msg* next = head->next;
jzx_free(&loop->allocator, head);
head = next;
}
}

static jzx_err jzx_async_enqueue(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop || !loop->async_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
jzx_async_msg* msg = (jzx_async_msg*)jzx_alloc(&loop->allocator, sizeof(jzx_async_msg));
if (!msg) {
return JZX_ERR_NO_MEMORY;
}
msg->target = target;
msg->data = data;
msg->len = len;
msg->tag = tag;
msg->sender = sender;
msg->next = NULL;

pthread_mutex_lock(&loop->async_mutex);
if (!loop->async_head) {
loop->async_head = msg;
loop->async_tail = msg;
} else {
loop->async_tail->next = msg;
loop->async_tail = msg;
}
pthread_mutex_unlock(&loop->async_mutex);
jzx_wakeup_signal(loop);
return JZX_OK;
}

static jzx_async_msg* jzx_async_detach(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return NULL;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
return head;
}

static void jzx_async_dispatch(jzx_loop* loop, jzx_async_msg* head) {
jzx_async_msg* msg = head;
while (msg) {
jzx_async_msg* next = msg->next;
(void)jzx_send_internal(loop, msg->target, msg->data, msg->len, msg->tag, msg->sender);
jzx_free(&loop->allocator, msg);
msg = next;
}
}

static void jzx_async_drain(jzx_loop* loop) {
jzx_async_msg* head = jzx_async_detach(loop);
if (head) {
jzx_async_dispatch(loop, head);
}
}

static int jzx_async_has_pending(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->async_mutex);
int has = loop->async_head != NULL;
pthread_mutex_unlock(&loop->async_mutex);
return has;
}

// -----------------------------------------------------------------------------
// Timer system
// -----------------------------------------------------------------------------

static void jzx_timer_insert_locked(jzx_loop* loop, jzx_timer_entry* entry) {
if (!loop->timer_head || entry->due_ms < loop->timer_head->due_ms) {
entry->next = loop->timer_head;
loop->timer_head = entry;
return;
}
jzx_timer_entry* cur = loop->timer_head;
while (cur->next && cur->next->due_ms <= entry->due_ms) {
cur = cur->next;
}
entry->next = cur->next;
cur->next = entry;
}

static void* jzx_timer_thread_main(void* arg) {
jzx_loop* loop = (jzx_loop*)arg;
pthread_mutex_lock(&loop->timer_mutex);
while (!loop->timer_stop) {
uint64_t now = jzx_now_ms();
jzx_timer_entry* head = loop->timer_head;
if (!head) {
pthread_cond_wait(&loop->timer_cond, &loop->timer_mutex);
continue;
}
if (head->due_ms > now) {
uint64_t wait_ms = head->due_ms - now;
#if defined(__APPLE__)
struct timespec rel;
rel.tv_sec = (time_t)(wait_ms / 1000ull);
rel.tv_nsec = (long)((wait_ms % 1000ull) * 1000000ull);
(void)pthread_cond_timedwait_relative_np(&loop->timer_cond, &loop->timer_mutex, &rel);
#else
struct timespec ts;
#if defined(__linux__)
clockid_t clock_id = loop->timer_cond_monotonic ? CLOCK_MONOTONIC : CLOCK_REALTIME;
clock_gettime(clock_id, &ts);
#else
clock_gettime(CLOCK_REALTIME, &ts);
#endif
ts.tv_sec += (time_t)(wait_ms / 1000ull);
ts.tv_nsec += (long)((wait_ms % 1000ull) * 1000000ull);
if (ts.tv_nsec >= 1000000000l) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000l;
}
(void)pthread_cond_timedwait(&loop->timer_cond, &loop->timer_mutex, &ts);
#endif
continue;
}
loop->timer_head = head->next;
pthread_mutex_unlock(&loop->timer_mutex);
jzx_async_enqueue(loop, head->target, head->data, head->len, head->tag, 0);
jzx_free(&loop->allocator, head);
pthread_mutex_lock(&loop->timer_mutex);
}
pthread_mutex_unlock(&loop->timer_mutex);
return NULL;
}

static jzx_err jzx_timer_system_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->timer_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->timer_cond_monotonic = 0;
#if defined(__linux__)
pthread_condattr_t attr;
pthread_condattr_t* attr_ptr = NULL;
if (pthread_condattr_init(&attr) == 0) {
attr_ptr = &attr;
if (pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0) {
loop->timer_cond_monotonic = 1;
}
}
int cond_rc = pthread_cond_init(&loop->timer_cond, attr_ptr);
if (attr_ptr) {
pthread_condattr_destroy(&attr);
}
if (cond_rc != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#else
if (pthread_cond_init(&loop->timer_cond, NULL) != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#endif
loop->timer_mutex_initialized = 1;
loop->timer_thread_running = 0;
loop->timer_stop = 0;
loop->timer_head = NULL;
loop->next_timer_id = 1;
if (pthread_create(&loop->timer_thread, NULL, jzx_timer_thread_main, loop) != 0) {
pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
return JZX_ERR_UNKNOWN;
}
loop->timer_thread_running = 1;
return JZX_OK;
}

static void jzx_timer_system_shutdown(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->timer_mutex);
loop->timer_stop = 1;
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);

if (loop->timer_thread_running) {
pthread_join(loop->timer_thread, NULL);
loop->timer_thread_running = 0;
}

pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* cur = loop->timer_head;
loop->timer_head = NULL;
pthread_mutex_unlock(&loop->timer_mutex);

while (cur) {
jzx_timer_entry* next = cur->next;
jzx_free(&loop->allocator, cur);
cur = next;
}

pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
}

static int jzx_timer_has_pending(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->timer_mutex);
int has = loop->timer_head != NULL;
pthread_mutex_unlock(&loop->timer_mutex);
return has;
}

// -----------------------------------------------------------------------------
// I O watchers
// -----------------------------------------------------------------------------

static jzx_err jzx_io_init(jzx_loop* loop, uint32_t capacity) {
loop->io_capacity = capacity ? capacity : 1;
loop->io_count = 0;
loop->io_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * loop->io_capacity);
if (!loop->io_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(loop->io_watchers, 0, sizeof(jzx_io_watch) * loop->io_capacity);
return JZX_OK;
}

static void jzx_io_deinit(jzx_loop* loop) {
if (loop->io_watchers) {
jzx_free(&loop->allocator, loop->io_watchers);
loop->io_watchers = NULL;
}
loop->io_capacity = 0;
loop->io_count = 0;
}

static jzx_err jzx_io_reserve(jzx_loop* loop, uint32_t new_cap) {
jzx_io_watch* new_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * new_cap);
if (!new_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(new_watchers, 0, sizeof(jzx_io_watch) * new_cap);
if (loop->io_watchers) {
memcpy(new_watchers, loop->io_watchers, sizeof(jzx_io_watch) * loop->io_count);
jzx_free(&loop->allocator, loop->io_watchers);
}
loop->io_watchers = new_watchers;
loop->io_capacity = new_cap;
return JZX_OK;
}

static jzx_io_watch* jzx_io_find(jzx_loop* loop, int fd, uint32_t* idx_out) {
for (uint32_t i = 0; i < loop->io_count; ++i) {
if (loop->io_watchers[i].fd == fd) {
if (idx_out) {
*idx_out = i;
}
return &loop->io_watchers[i];
}
}
return NULL;
}

static void jzx_io_remove_index(jzx_loop* loop, uint32_t idx) {
if (idx >= loop->io_count) {
return;
}
if (loop->xev) {
jzx_xev_unwatch_fd(loop->xev, loop->io_watchers[idx].fd);
}
uint32_t last = loop->io_count - 1;
if (idx != last) {
loop->io_watchers[idx] = loop->io_watchers[last];
}
loop->io_count--;
}

static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor) {
for (uint32_t i = 0; i < loop->io_count;) {
if (loop->io_watchers[i].owner == actor) {
jzx_io_remove_index(loop, i);
continue;
}
++i;
}
}

uint8_t jzx_io_xev_notify(jzx_loop* loop, int fd, uint32_t readiness) {
if (!loop || !loop->running || fd < 0 || readiness == 0) {
return 0;
}

uint32_t idx = 0;
jzx_io_watch* watch = jzx_io_find(loop, fd, &idx);
if (!watch) {
return 0;
}
if (!jzx_actor_table_lookup(&loop->actors, watch->owner)) {
jzx_io_remove_index(loop, idx);
return 0;
}

jzx_io_event* ev = (jzx_io_event*)jzx_alloc(&loop->allocator, sizeof(jzx_io_event));
if (!ev) {
return 1;
}
ev->fd = fd;
ev->readiness = readiness;
jzx_err err =
jzx_send_internal(loop, watch->owner, ev, sizeof(jzx_io_event), JZX_TAG_SYS_IO, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
return 1;
}

// -----------------------------------------------------------------------------
// Config helpers
// -----------------------------------------------------------------------------

static void* default_alloc(void* ctx, size_t size) {
(void)ctx;
return malloc(size);
}

static void default_free(void* ctx, void* ptr) {
(void)ctx;
free(ptr);
}

void jzx_config_init(jzx_config* cfg) {
if (!cfg) {
return;
}
memset(cfg, 0, sizeof(*cfg));
cfg->allocator.alloc = default_alloc;
cfg->allocator.free = default_free;
cfg->allocator.ctx = NULL;
cfg->max_actors = 1024;
cfg->default_mailbox_cap = 1024;
cfg->max_msgs_per_actor = 64;
cfg->max_actors_per_tick = 1024;
cfg->max_io_watchers = 1024;
cfg->io_poll_timeout_ms = 10;
}

static void apply_defaults(jzx_config* cfg) {
if (!cfg->allocator.alloc) {
cfg->allocator.alloc = default_alloc;
}
if (!cfg->allocator.free) {
cfg->allocator.free = default_free;
}
if (cfg->max_actors == 0) {
cfg->max_actors = 1024;
}
if (cfg->default_mailbox_cap == 0) {
cfg->default_mailbox_cap = 1024;
}
if (cfg->max_msgs_per_actor == 0) {
cfg->max_msgs_per_actor = 64;
}
if (cfg->max_actors_per_tick == 0) {
cfg->max_actors_per_tick = 1024;
}
if (cfg->max_io_watchers == 0) {
cfg->max_io_watchers = 1024;
}
if (cfg->io_poll_timeout_ms == 0) {
cfg->io_poll_timeout_ms = 10;
}
}

// -----------------------------------------------------------------------------
// Loop lifecycle
// -----------------------------------------------------------------------------

jzx_loop* jzx_loop_create(const jzx_config* cfg) {
jzx_config local;
if (cfg) {
local = *cfg;
} else {
jzx_config_init(&local);
}
apply_defaults(&local);

jzx_loop* loop = (jzx_loop*)jzx_alloc(&local.allocator, sizeof(jzx_loop));
if (!loop) {
return NULL;
}
memset(loop, 0, sizeof(*loop));
loop->cfg = local;
loop->allocator = local.allocator;
loop->xev = jzx_xev_create();
if (!loop->xev) {
jzx_loop_destroy(loop);
return NULL;
}

if (jzx_actor_table_init(&loop->actors, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_run_queue_init(&loop->run_queue, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_async_queue_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_timer_system_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_io_init(loop, local.max_io_watchers) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
loop->running = 0;
loop->stop_requested = 0;
return loop;
}

void jzx_loop_destroy(jzx_loop* loop) {
if (!loop) {
return;
}
jzx_timer_system_shutdown(loop);
jzx_async_queue_destroy(loop);
if (loop->xev) {
jzx_xev_destroy(loop->xev);
loop->xev = NULL;
}
jzx_io_deinit(loop);
for (uint32_t i = 0; i < loop->actors.capacity; ++i) {
jzx_actor* actor = loop->actors.slots ? loop->actors.slots[i] : NULL;
if (actor) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
loop->actors.slots[i] = NULL;
}
}
jzx_actor_table_deinit(&loop->actors, &loop->allocator);
jzx_run_queue_deinit(&loop->run_queue, &loop->allocator);
jzx_free(&loop->allocator, loop);
}

int jzx_loop_run(jzx_loop* loop) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (loop->running) {
return JZX_ERR_LOOP_CLOSED;
}
loop->running = 1;
int rc = JZX_OK;
while (!loop->stop_requested) {
jzx_async_drain(loop);
jzx_xev_run(loop->xev, 0);
uint32_t actors_processed = 0;
while (actors_processed < loop->cfg.max_actors_per_tick) {
jzx_actor* actor = jzx_run_queue_pop(&loop->run_queue);
if (!actor) {
break;
}
actor->in_run_queue = 0;
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
continue;
}

uint32_t processed_msgs = 0;
while (processed_msgs < loop->cfg.max_msgs_per_actor) {
jzx_message msg;
if (jzx_mailbox_pop(&actor->mailbox, &msg) != 0) {
break;
}
jzx_context ctx = {
.state = actor->state,
.self = actor->id,
.loop = loop,
};
jzx_behavior_result result = actor->behavior(&ctx, &msg);
processed_msgs++;
if (result == JZX_BEHAVIOR_STOP) {
actor->status = JZX_ACTOR_STOPPING;
break;
} else if (result == JZX_BEHAVIOR_FAIL) {
actor->status = JZX_ACTOR_FAILED;
break;
}
}
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
} else if (jzx_mailbox_has_items(&actor->mailbox)) {
jzx_schedule_actor(loop, actor);
}
actors_processed++;
}

if (loop->run_queue.count == 0) {
if (loop->actors.used == 0 && !jzx_async_has_pending(loop) &&
!jzx_timer_has_pending(loop) && loop->io_count == 0) {
for (uint32_t i = 0; i < 64; ++i) {
jzx_xev_run(loop->xev, 0);
}
break;
}
if (jzx_async_has_pending(loop)) {
continue;
}
jzx_xev_run(loop->xev, 1);
}
}
loop->running = 0;
loop->stop_requested = 0;
return rc;
}

void jzx_loop_request_stop(jzx_loop* loop) {
if (!loop) {
return;
}
loop->stop_requested = 1;
jzx_wakeup_signal(loop);
if (loop->timer_mutex_initialized) {
pthread_mutex_lock(&loop->timer_mutex);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
}
}

void jzx_loop_free(jzx_loop* loop, void* ptr) {
if (!loop || !ptr) {
return;
}
jzx_free(&loop->allocator, ptr);
}

void jzx_loop_set_observer(jzx_loop* loop, const jzx_observer* obs, void* ctx) {
if (!loop) {
return;
}
if (obs) {
loop->observer = *obs;
loop->observer_ctx = ctx;
} else {
memset(&loop->observer, 0, sizeof(loop->observer));
loop->observer_ctx = NULL;
}
}

// -----------------------------------------------------------------------------
// Actor APIs
// -----------------------------------------------------------------------------

static jzx_actor* jzx_actor_create(jzx_loop* loop, const jzx_spawn_opts* opts) {
jzx_actor* actor = (jzx_actor*)jzx_alloc(&loop->allocator, sizeof(jzx_actor));
if (!actor) {
return NULL;
}
memset(actor, 0, sizeof(*actor));
actor->status = JZX_ACTOR_RUNNING;
actor->behavior = opts->behavior;
actor->state = opts->state;
actor->supervisor = opts->supervisor;
if (jzx_mailbox_init(&actor->mailbox,
opts->mailbox_cap ? opts->mailbox_cap : loop->cfg.default_mailbox_cap,
&loop->allocator) != JZX_OK) {
jzx_free(&loop->allocator, actor);
return NULL;
}
return actor;
}

jzx_err jzx_spawn(jzx_loop* loop, const jzx_spawn_opts* opts, jzx_actor_id* out_id) {
if (!loop || !opts || !opts->behavior) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_create(loop, opts);
if (!actor) {
return JZX_ERR_NO_MEMORY;
}
jzx_err err = jzx_actor_table_insert(&loop->actors, actor, &loop->allocator, out_id);
if (err != JZX_OK) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
return err;
}
jzx_obs_actor_start(loop, actor->id, opts->name);
return JZX_OK;
}

static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, target);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_message msg = {
.data = data,
.len = len,
.tag = tag,
.sender = sender,
};
if (jzx_mailbox_push(&actor->mailbox, &msg) != 0) {
jzx_obs_mailbox_full(loop, target);
return JZX_ERR_MAILBOX_FULL;
}
jzx_schedule_actor(loop, actor);
return JZX_OK;
}

jzx_err jzx_send(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_send_internal(loop, target, data, len, tag, 0);
}

jzx_err jzx_send_async(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_async_enqueue(loop, target, data, len, tag, 0);
}

jzx_err jzx_actor_stop(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_STOPPING;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}

jzx_err jzx_actor_fail(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_FAILED;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}

// -----------------------------------------------------------------------------
// Supervisor spawn
// -----------------------------------------------------------------------------

jzx_err jzx_spawn_supervisor(jzx_loop* loop, const jzx_supervisor_init* init, jzx_actor_id parent,
jzx_actor_id* out_id) {
if (!loop || !init || !init->children || init->child_count == 0) {
return JZX_ERR_INVALID_ARG;
}
jzx_supervisor_state* state = jzx_supervisor_state_create(init, &loop->allocator);
if (!state) {
return JZX_ERR_NO_MEMORY;
}
jzx_spawn_opts opts = {
.behavior = jzx_supervisor_behavior,
.state = state,
.supervisor = parent,
.mailbox_cap = 0,
.name = NULL,
};
jzx_actor_id sup_id = 0;
jzx_err err = jzx_spawn(loop, &opts, &sup_id);
if (err != JZX_OK) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return err;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, sup_id);
if (!sup_actor) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return JZX_ERR_UNKNOWN;
}
sup_actor->supervisor_state = state;

for (size_t i = 0; i < state->child_count; ++i) {
err = jzx_supervisor_spawn_child(loop, sup_id, &state->children[i]);
if (err != JZX_OK) {
(void)jzx_actor_fail(loop, sup_id);
return err;
}
}

if (out_id) {
*out_id = sup_id;
}
return JZX_OK;
}

jzx_err jzx_supervisor_child_id(jzx_loop* loop, jzx_actor_id supervisor, size_t index,
jzx_actor_id* out_id) {
if (!loop || !out_id) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, supervisor);
if (!sup_actor || !sup_actor->supervisor_state) {
return JZX_ERR_NO_SUCH_ACTOR;
}
if (index >= sup_actor->supervisor_state->child_count) {
return JZX_ERR_INVALID_ARG;
}
*out_id = sup_actor->supervisor_state->children[index].id;
return JZX_OK;
}

// -----------------------------------------------------------------------------
// Timers & IO
// -----------------------------------------------------------------------------

jzx_err jzx_send_after(jzx_loop* loop, jzx_actor_id target, uint32_t ms, void* data, size_t len,
uint32_t tag, jzx_timer_id* out_timer) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, target)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_timer_entry* entry = (jzx_timer_entry*)jzx_alloc(&loop->allocator, sizeof(jzx_timer_entry));
if (!entry) {
return JZX_ERR_NO_MEMORY;
}
entry->target = target;
entry->data = data;
entry->len = len;
entry->tag = tag;
entry->next = NULL;

pthread_mutex_lock(&loop->timer_mutex);
entry->id = loop->next_timer_id++;
entry->due_ms = jzx_now_ms() + (uint64_t)ms;
jzx_timer_insert_locked(loop, entry);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);

if (out_timer) {
*out_timer = entry->id;
}
return JZX_OK;
}

jzx_err jzx_cancel_timer(jzx_loop* loop, jzx_timer_id timer) {
if (!loop || !loop->timer_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* prev = NULL;
jzx_timer_entry* cur = loop->timer_head;
while (cur) {
if (cur->id == timer) {
if (prev) {
prev->next = cur->next;
} else {
loop->timer_head = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
jzx_free(&loop->allocator, cur);
return JZX_OK;
}
prev = cur;
cur = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
return JZX_ERR_TIMER_INVALID;
}

jzx_err jzx_watch_fd(jzx_loop* loop, int fd, jzx_actor_id owner, uint32_t interest) {
if (!loop || !loop->xev || fd < 0 || interest == 0) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, owner)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_io_watch* existing = jzx_io_find(loop, fd, NULL);
if (existing) {
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
return err;
}
existing->owner = owner;
existing->interest = interest;
return JZX_OK;
}
if (loop->io_count == loop->io_capacity) {
jzx_err err = jzx_io_reserve(loop, loop->io_capacity * 2);
if (err != JZX_OK) {
return err;
}
}
uint32_t idx = loop->io_count;
loop->io_watchers[idx] = (jzx_io_watch){
.fd = fd,
.owner = owner,
.interest = interest,
.active = 1,
};
loop->io_count = idx + 1;
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
loop->io_count = idx;
memset(&loop->io_watchers[idx], 0, sizeof(loop->io_watchers[idx]));
return err;
}
return JZX_OK;
}

jzx_err jzx_unwatch_fd(jzx_loop* loop, int fd) {
if (!loop || fd < 0) {
return JZX_ERR_INVALID_ARG;
}
uint32_t idx = 0;
jzx_io_watch* entry = jzx_io_find(loop, fd, &idx);
if (!entry) {
return JZX_ERR_IO_NOT_WATCHED;
}
jzx_io_remove_index(loop, idx);
return JZX_OK;
}