Runtime core — src/jzx_runtime.c
This file is the primary implementation of the libjzx runtime: actors, mailboxes, scheduling, supervision, timers, async send, and I/O watcher integration.
Unlike a reference dump, this page is written in a “textbook” style: small snippets with the explanation immediately around them. The goal is to explain what each piece is, how it works, and why it exists.
Cross-links
- Start here: Source index
- API + config surface: C ABI (
include/jzx/jzx.h), Configuration reference - Internal structs: Runtime internals (
src/jzx_internal.h) - I/O backend: libxev integration (
src/jzx_xev.zig) - Semantics in practice: Integration tests, Stress tool
Includes and file-level structure
src/jzx_runtime.c#L1-L9#include "jzx_internal.h"
#include <stdlib.h>
#include <string.h>
#include <time.h>
// -----------------------------------------------------------------------------
// Utility helpers
// -----------------------------------------------------------------------------
jzx_internal.hprovides the concretestruct jzx_loopand internal helper types.stdlib.h/string.h: memory and string utilities used for allocations andmemset.time.h: monotonic clock access for timers and intensity windows.
The rest of the file is organized with section headers like:
- “Mailbox implementation”
- “Actor table implementation”
- “Timer system”
Those section headers are meant to be a reading guide: each section is a subsystem with its own invariants.
Utility helpers
Actor id encoding helpers
These helpers implement the runtime’s actor-id scheme: a jzx_actor_id is a 64-bit value that encodes (generation, index).
src/jzx_runtime.c#L11-L13static inline uint32_t jzx_id_index(jzx_actor_id id) {
return (uint32_t)(id & 0xffffffffu);
}
- Extracts the low 32 bits (the table index).
- Why it exists: fast lookup into
actors.slots[].
src/jzx_runtime.c#L15-L17static inline uint32_t jzx_id_generation(jzx_actor_id id) {
return (uint32_t)(id >> 32u);
}
- Extracts the high 32 bits (the generation).
- Why it exists: stale-id rejection when actor slots are reused.
src/jzx_runtime.c#L19-L21static inline jzx_actor_id jzx_make_id(uint32_t gen, uint32_t idx) {
return ((uint64_t)gen << 32u) | (uint64_t)idx;
}
- Packs
(gen, idx)back into a 64-bit id. - Why it exists: the runtime controls the id format while keeping the public type opaque.
Allocator wrappers
The runtime always allocates through jzx_allocator so users can supply custom allocation strategies.
src/jzx_runtime.c#L23-L25static void* jzx_alloc(jzx_allocator* alloc, size_t size) {
return alloc->alloc ? alloc->alloc(alloc->ctx, size) : NULL;
}
Notes:
- Calls
alloc->alloc(ctx, size)when provided. - Returns
NULLif no allocator is configured (or allocation fails).
src/jzx_runtime.c#L27-L31static void jzx_free(jzx_allocator* alloc, void* ptr) {
if (alloc->free) {
alloc->free(alloc->ctx, ptr);
}
}
Notes:
- Calls the configured
freefunction when present. - Does nothing if
freeis null.
Why wrappers exist: they concentrate “allocator optionality” into one place so the rest of the code doesn’t have to repeat null checks.
Supervisor state allocation and restart intensity
The supervisor subsystem needs its own heap state (the configured child list and restart counters).
src/jzx_runtime.c#L33-L62static jzx_supervisor_state* jzx_supervisor_state_create(const jzx_supervisor_init* init,
jzx_allocator* allocator) {
if (!init || init->child_count == 0 || !init->children) {
return NULL;
}
jzx_supervisor_state* state =
(jzx_supervisor_state*)jzx_alloc(allocator, sizeof(jzx_supervisor_state));
if (!state) {
return NULL;
}
memset(state, 0, sizeof(*state));
state->config = init->supervisor;
state->child_count = init->child_count;
size_t bytes = sizeof(jzx_child_state) * init->child_count;
state->children = (jzx_child_state*)jzx_alloc(allocator, bytes);
if (!state->children) {
jzx_free(allocator, state);
return NULL;
}
memset(state->children, 0, bytes);
for (size_t i = 0; i < init->child_count; ++i) {
state->children[i].spec = init->children[i];
state->children[i].id = 0;
state->children[i].restart_count = 0;
state->children[i].last_restart_ms = 0;
}
state->intensity_window_count = 0;
state->intensity_window_start_ms = 0;
return state;
}
Key ideas:
- Validates
initand its children array. - Allocates
jzx_supervisor_stateand a contiguouschildren[]array. - Copies the child specs into runtime-owned memory so the init can be stack-allocated by the caller.
Ownership note:
- The supervisor state owns
childrenand must free it on teardown.
src/jzx_runtime.c#L64-L71static void jzx_supervisor_state_destroy(jzx_supervisor_state* state, jzx_allocator* allocator) {
if (!state)
return;
if (state->children) {
jzx_free(allocator, state->children);
}
jzx_free(allocator, state);
}
This is the ownership inverse of create: free the children array, then free the state.
src/jzx_runtime.c#L73-L89static int jzx_supervisor_allow_restart(jzx_supervisor_state* sup, uint64_t now_ms) {
if (!sup)
return 0;
if (sup->config.intensity == 0 || sup->config.period_ms == 0) {
return 1;
}
if (sup->intensity_window_start_ms == 0 ||
now_ms - sup->intensity_window_start_ms > sup->config.period_ms) {
sup->intensity_window_start_ms = now_ms;
sup->intensity_window_count = 0;
}
sup->intensity_window_count += 1;
if (sup->intensity_window_count > sup->config.intensity) {
return 0;
}
return 1;
}
This implements restart-intensity logic:
- If intensity config is disabled (0), always allow.
- Otherwise, maintain a rolling time window and count restarts.
- If restarts exceed
intensitywithinperiod_ms, disallow (which triggers escalation elsewhere).
Why it exists: without intensity limits, a crash loop could saturate CPU and starve everything else.
Time and saturating math helpers
src/jzx_runtime.c#L91-L95static uint64_t jzx_now_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000ull + (uint64_t)ts.tv_nsec / 1000000ull;
}
- Uses
CLOCK_MONOTONICso time deltas don’t go backwards if the system clock changes. - Returns milliseconds for timer scheduling and intensity windows.
src/jzx_runtime.c#L97-L103static uint32_t jzx_sat_add32(uint32_t a, uint32_t b) {
uint64_t sum = (uint64_t)a + (uint64_t)b;
if (sum > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)sum;
}
Saturating add prevents overflow when computing delays or budgets.
src/jzx_runtime.c#L105-L113static uint32_t jzx_sat_mul32(uint32_t a, uint32_t b) {
if (a == 0 || b == 0)
return 0;
uint64_t prod = (uint64_t)a * (uint64_t)b;
if (prod > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)prod;
}
Saturating multiply is used for exponential backoff computations without wrapping.
Forward declarations (internal cross-links)
This file is written top-to-bottom, but a few subsystems need to reference helpers that are defined later.
Instead of reordering everything, jzx_runtime.c uses file-local forward declarations.
src/jzx_runtime.c#L115-L122static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender);
static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor);
// -----------------------------------------------------------------------------
// Wakeup helpers
// -----------------------------------------------------------------------------
Why these exist:
jzx_send_internal(...): the “real send” primitive that all message paths funnel through.- It’s referenced by multiple subsystems (async send, timers, supervision system messages).
jzx_io_remove_actor(...): used during teardown to ensure watchers don’t keep stale(fd → actor)references.- The dashed section headers are purely for human readers, but they’re valuable in a file this large:
- each header marks a coherent subsystem with its own invariants.
Wakeup helpers (event loop notification)
src/jzx_runtime.c#L124-L129static void jzx_wakeup_signal(jzx_loop* loop) {
if (!loop || !loop->xev) {
return;
}
jzx_xev_wakeup(loop->xev);
}
This is how the runtime wakes the backend event loop when something arrives from another thread (async send queue or timer wake).
Observer helpers (instrumentation)
src/jzx_runtime.c#L131-L133// -----------------------------------------------------------------------------
// Observer helpers
// -----------------------------------------------------------------------------
These helpers are “safe callouts”: they check for null hooks and then forward the event.
src/jzx_runtime.c#L135-L139static void jzx_obs_actor_start(jzx_loop* loop, jzx_actor_id id, const char* name) {
if (loop && loop->observer.on_actor_start) {
loop->observer.on_actor_start(loop->observer_ctx, id, name);
}
}
src/jzx_runtime.c#L141-L145static void jzx_obs_actor_stop(jzx_loop* loop, jzx_actor_id id, jzx_exit_reason reason) {
if (loop && loop->observer.on_actor_stop) {
loop->observer.on_actor_stop(loop->observer_ctx, id, reason);
}
}
src/jzx_runtime.c#L147-L152static void jzx_obs_actor_restart(jzx_loop* loop, jzx_actor_id supervisor, jzx_actor_id child,
uint32_t attempt) {
if (loop && loop->observer.on_actor_restart) {
loop->observer.on_actor_restart(loop->observer_ctx, supervisor, child, attempt);
}
}
src/jzx_runtime.c#L154-L158static void jzx_obs_supervisor_escalate(jzx_loop* loop, jzx_actor_id supervisor) {
if (loop && loop->observer.on_supervisor_escalate) {
loop->observer.on_supervisor_escalate(loop->observer_ctx, supervisor);
}
}
src/jzx_runtime.c#L160-L164static void jzx_obs_mailbox_full(jzx_loop* loop, jzx_actor_id target) {
if (loop && loop->observer.on_mailbox_full) {
loop->observer.on_mailbox_full(loop->observer_ctx, target);
}
}
Why these exist: to keep observer checks consistent and to keep the core logic readable.
Mailbox implementation (bounded per-actor queue)
src/jzx_runtime.c#L166-L168// -----------------------------------------------------------------------------
// Mailbox implementation
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L170-L187static jzx_err jzx_mailbox_init(jzx_mailbox_impl* box, uint32_t capacity,
jzx_allocator* allocator) {
if (capacity == 0) {
capacity = 1;
}
size_t bytes = sizeof(jzx_message) * capacity;
jzx_message* buffer = (jzx_message*)jzx_alloc(allocator, bytes);
if (!buffer) {
return JZX_ERR_NO_MEMORY;
}
memset(buffer, 0, bytes);
box->buffer = buffer;
box->capacity = capacity;
box->head = 0;
box->tail = 0;
box->count = 0;
return JZX_OK;
}
Notes:
- Enforces
capacity >= 1. - Allocates a fixed-size ring buffer of
jzx_messageitems. - Initializes head/tail/count to empty.
src/jzx_runtime.c#L189-L194static void jzx_mailbox_deinit(jzx_mailbox_impl* box, jzx_allocator* allocator) {
if (box->buffer) {
jzx_free(allocator, box->buffer);
}
memset(box, 0, sizeof(*box));
}
src/jzx_runtime.c#L196-L204static int jzx_mailbox_push(jzx_mailbox_impl* box, const jzx_message* msg) {
if (box->count == box->capacity) {
return -1;
}
box->buffer[box->tail] = *msg;
box->tail = (box->tail + 1) % box->capacity;
box->count++;
return 0;
}
This is the backpressure point:
- If
count == capacity, it returns-1(caller maps toJZX_ERR_MAILBOX_FULL).
src/jzx_runtime.c#L206-L214static int jzx_mailbox_pop(jzx_mailbox_impl* box, jzx_message* out) {
if (box->count == 0) {
return -1;
}
*out = box->buffer[box->head];
box->head = (box->head + 1) % box->capacity;
box->count--;
return 0;
}
src/jzx_runtime.c#L216-L218static int jzx_mailbox_has_items(const jzx_mailbox_impl* box) {
return box->count > 0;
}
Actor table (id → actor pointer with generations)
src/jzx_runtime.c#L220-L222// -----------------------------------------------------------------------------
// Actor table implementation
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L224-L247static jzx_err jzx_actor_table_init(jzx_actor_table* table, uint32_t capacity,
jzx_allocator* allocator) {
memset(table, 0, sizeof(*table));
table->capacity = capacity;
size_t slot_bytes = sizeof(jzx_actor*) * capacity;
size_t gen_bytes = sizeof(uint32_t) * capacity;
size_t stack_bytes = sizeof(uint32_t) * capacity;
table->slots = (jzx_actor**)jzx_alloc(allocator, slot_bytes);
table->generations = (uint32_t*)jzx_alloc(allocator, gen_bytes);
table->free_stack = (uint32_t*)jzx_alloc(allocator, stack_bytes);
if (!table->slots || !table->generations || !table->free_stack) {
return JZX_ERR_NO_MEMORY;
}
memset(table->slots, 0, slot_bytes);
for (uint32_t i = 0; i < capacity; ++i) {
table->generations[i] = 1;
table->free_stack[i] = capacity - 1 - i;
}
table->free_top = capacity;
table->used = 0;
return JZX_OK;
}
src/jzx_runtime.c#L249-L263static void jzx_actor_table_deinit(jzx_actor_table* table, jzx_allocator* allocator) {
if (!table) {
return;
}
if (table->slots) {
jzx_free(allocator, table->slots);
}
if (table->generations) {
jzx_free(allocator, table->generations);
}
if (table->free_stack) {
jzx_free(allocator, table->free_stack);
}
memset(table, 0, sizeof(*table));
}
src/jzx_runtime.c#L265-L274static jzx_actor* jzx_actor_table_lookup(jzx_actor_table* table, jzx_actor_id id) {
uint32_t idx = jzx_id_index(id);
if (idx >= table->capacity) {
return NULL;
}
if (table->generations[idx] != jzx_id_generation(id)) {
return NULL;
}
return table->slots[idx];
}
This function is the critical stale-id guard:
- It rejects ids whose generation doesn’t match the current slot generation.
src/jzx_runtime.c#L276-L291static jzx_err jzx_actor_table_insert(jzx_actor_table* table, jzx_actor* actor,
jzx_allocator* allocator, jzx_actor_id* out_id) {
(void)allocator;
if (table->free_top == 0) {
return JZX_ERR_MAX_ACTORS;
}
uint32_t idx = table->free_stack[--table->free_top];
uint32_t gen = table->generations[idx];
actor->id = jzx_make_id(gen, idx);
table->slots[idx] = actor;
table->used++;
if (out_id) {
*out_id = actor->id;
}
return JZX_OK;
}
src/jzx_runtime.c#L293-L310static void jzx_actor_table_remove(jzx_actor_table* table, jzx_actor* actor) {
if (!actor) {
return;
}
uint32_t idx = jzx_id_index(actor->id);
if (idx >= table->capacity) {
return;
}
if (table->slots[idx] != actor) {
return;
}
table->slots[idx] = NULL;
table->generations[idx] += 1u;
table->free_stack[table->free_top++] = idx;
if (table->used > 0) {
table->used--;
}
}
Run queue (who runs next)
src/jzx_runtime.c#L312-L314// -----------------------------------------------------------------------------
// Run queue implementation
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L316-L325static jzx_err jzx_run_queue_init(jzx_run_queue* rq, uint32_t capacity, jzx_allocator* allocator) {
memset(rq, 0, sizeof(*rq));
rq->capacity = capacity > 0 ? capacity : 1;
rq->entries = (jzx_actor**)jzx_alloc(allocator, sizeof(jzx_actor*) * rq->capacity);
if (!rq->entries) {
return JZX_ERR_NO_MEMORY;
}
memset(rq->entries, 0, sizeof(jzx_actor*) * rq->capacity);
return JZX_OK;
}
src/jzx_runtime.c#L327-L332static void jzx_run_queue_deinit(jzx_run_queue* rq, jzx_allocator* allocator) {
if (rq->entries) {
jzx_free(allocator, rq->entries);
}
memset(rq, 0, sizeof(*rq));
}
src/jzx_runtime.c#L334-L342static int jzx_run_queue_push(jzx_run_queue* rq, jzx_actor* actor) {
if (rq->count == rq->capacity) {
return -1;
}
rq->entries[rq->tail] = actor;
rq->tail = (rq->tail + 1) % rq->capacity;
rq->count++;
return 0;
}
src/jzx_runtime.c#L344-L353static jzx_actor* jzx_run_queue_pop(jzx_run_queue* rq) {
if (rq->count == 0) {
return NULL;
}
jzx_actor* actor = rq->entries[rq->head];
rq->entries[rq->head] = NULL;
rq->head = (rq->head + 1) % rq->capacity;
rq->count--;
return actor;
}
src/jzx_runtime.c#L355-L362static void jzx_schedule_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor || actor->in_run_queue) {
return;
}
if (jzx_run_queue_push(&loop->run_queue, actor) == 0) {
actor->in_run_queue = 1;
}
}
This is the scheduler’s “make runnable” operation. It ensures a runnable actor is in the run queue exactly once.
Actor teardown (resource cleanup)
src/jzx_runtime.c#L364-L392static void jzx_teardown_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor) {
return;
}
jzx_io_remove_actor(loop, actor->id);
jzx_exit_reason reason = JZX_EXIT_NORMAL;
if (actor->status == JZX_ACTOR_FAILED) {
reason = JZX_EXIT_FAIL;
}
jzx_obs_actor_stop(loop, actor->id, reason);
if (actor->supervisor) {
jzx_child_exit* ev = (jzx_child_exit*)jzx_alloc(&loop->allocator, sizeof(jzx_child_exit));
if (ev) {
ev->child = actor->id;
ev->status = actor->status;
jzx_err err = jzx_send_internal(loop, actor->supervisor, ev, sizeof(jzx_child_exit),
JZX_TAG_SYS_CHILD_EXIT, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
}
}
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_actor_table_remove(&loop->actors, actor);
if (actor->supervisor_state) {
jzx_supervisor_state_destroy(actor->supervisor_state, &loop->allocator);
}
jzx_free(&loop->allocator, actor);
}
This function owns the “stop means free” contract:
- Deinitializes mailbox.
- Removes I/O watches owned by the actor.
- Frees supervision state if present.
- Removes the actor from the actor table and frees its allocation.
Supervision logic (restart strategies)
src/jzx_runtime.c#L394-L396// -----------------------------------------------------------------------------
// Supervisor helpers
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L398-L411static jzx_child_state* jzx_supervisor_find_child(jzx_supervisor_state* sup, jzx_actor_id id,
size_t* out_idx) {
if (!sup)
return NULL;
for (size_t i = 0; i < sup->child_count; ++i) {
if (sup->children[i].id == id) {
if (out_idx) {
*out_idx = i;
}
return &sup->children[i];
}
}
return NULL;
}
src/jzx_runtime.c#L413-L424static jzx_err jzx_supervisor_spawn_child(jzx_loop* loop, jzx_actor_id supervisor_id,
jzx_child_state* child) {
jzx_spawn_opts opts = {
.behavior = child->spec.behavior,
.state = child->spec.state,
.supervisor = supervisor_id,
.mailbox_cap = child->spec.mailbox_cap,
.name = child->spec.name,
};
child->last_restart_ms = jzx_now_ms();
return jzx_spawn(loop, &opts, &child->id);
}
src/jzx_runtime.c#L426-L431static void jzx_supervisor_stop_child(jzx_loop* loop, jzx_child_state* child) {
if (child->id != 0) {
(void)jzx_actor_stop(loop, child->id);
child->id = 0;
}
}
src/jzx_runtime.c#L433-L452static void jzx_supervisor_schedule_restart(jzx_loop* loop, jzx_actor* sup_actor, size_t child_idx,
uint32_t delay_ms) {
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (!sup || child_idx >= sup->child_count)
return;
if (delay_ms == 0) {
(void)jzx_supervisor_spawn_child(loop, sup_actor->id, &sup->children[child_idx]);
return;
}
jzx_child_restart* payload =
(jzx_child_restart*)jzx_alloc(&loop->allocator, sizeof(jzx_child_restart));
if (!payload)
return;
payload->child_index = (uint32_t)child_idx;
jzx_err err = jzx_send_after(loop, sup_actor->id, delay_ms, payload, sizeof(jzx_child_restart),
JZX_TAG_SYS_CHILD_RESTART, NULL);
if (err != JZX_OK) {
jzx_free(&loop->allocator, payload);
}
}
This typically schedules a system message/timer that triggers the actual restart later.
src/jzx_runtime.c#L454-L484static uint32_t jzx_supervisor_compute_delay(const jzx_supervisor_state* sup,
const jzx_child_state* child) {
if (!sup || !child)
return 0;
uint32_t base = child->spec.restart_delay_ms;
jzx_backoff_type strategy = child->spec.backoff;
if (strategy == JZX_BACKOFF_NONE) {
strategy = sup->config.backoff;
}
uint32_t step = sup->config.backoff_delay_ms;
switch (strategy) {
case JZX_BACKOFF_NONE:
return base;
case JZX_BACKOFF_CONSTANT: {
uint32_t extra = jzx_sat_mul32(step, child->restart_count);
return jzx_sat_add32(base, extra);
}
case JZX_BACKOFF_EXPONENTIAL: {
uint32_t factor = 1u;
uint32_t shifts = child->restart_count;
if (shifts >= 31) {
factor = UINT32_MAX;
} else {
factor = 1u << shifts;
}
uint32_t scaled_base = jzx_sat_mul32(base ? base : step, factor);
return scaled_base;
}
}
return base;
}
This computes the restart delay based on:
- restart attempt count
- configured backoff type
- saturating arithmetic to avoid overflow
src/jzx_runtime.c#L486-L544static void jzx_supervisor_restart_strategy(jzx_loop* loop, jzx_actor* supervisor_actor,
size_t failed_idx, jzx_actor_id failed_child_id,
jzx_supervisor_state* sup) {
uint32_t failed_delay = sup->children[failed_idx].spec.restart_delay_ms;
switch (sup->config.strategy) {
case JZX_SUP_ONE_FOR_ONE:
if (failed_child_id) {
uint32_t attempt = sup->children[failed_idx].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, failed_child_id, attempt);
}
sup->children[failed_idx].restart_count += 1;
sup->children[failed_idx].last_restart_ms = jzx_now_ms();
failed_delay = jzx_supervisor_compute_delay(sup, &sup->children[failed_idx]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, failed_idx, failed_delay);
break;
case JZX_SUP_ONE_FOR_ALL:
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = 0; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
case JZX_SUP_REST_FOR_ONE:
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
}
}
This applies the configured strategy (one-for-one, one-for-all, rest-for-one).
src/jzx_runtime.c#L546-L606static jzx_behavior_result jzx_supervisor_behavior(jzx_context* ctx, const jzx_message* msg) {
jzx_actor* sup_actor = jzx_actor_table_lookup(&ctx->loop->actors, ctx->self);
if (!sup_actor || !sup_actor->supervisor_state) {
if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (msg->tag == JZX_TAG_SYS_CHILD_EXIT && msg->data) {
jzx_child_exit* ev = (jzx_child_exit*)msg->data;
jzx_actor_id failed_child_id = ev->child;
size_t idx = 0;
jzx_child_state* child = jzx_supervisor_find_child(sup, ev->child, &idx);
jzx_actor_status status = ev->status;
jzx_free(&ctx->loop->allocator, ev);
if (!child) {
return JZX_BEHAVIOR_OK;
}
child->id = 0;
int restart = 0;
if (child->spec.mode == JZX_CHILD_PERMANENT) {
restart = 1;
} else if (child->spec.mode == JZX_CHILD_TRANSIENT && status == JZX_ACTOR_FAILED) {
restart = 1;
}
if (!restart) {
return JZX_BEHAVIOR_OK;
}
uint64_t now = jzx_now_ms();
if (!jzx_supervisor_allow_restart(sup, now)) {
jzx_obs_supervisor_escalate(ctx->loop, ctx->self);
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(ctx->loop, &sup->children[i]);
}
sup_actor->status = JZX_ACTOR_FAILED;
return JZX_BEHAVIOR_FAIL;
}
jzx_supervisor_restart_strategy(ctx->loop, sup_actor, idx, failed_child_id, sup);
return JZX_BEHAVIOR_OK;
}
if (msg->tag == JZX_TAG_SYS_CHILD_RESTART && msg->data) {
jzx_child_restart* ev = (jzx_child_restart*)msg->data;
uint32_t idx = ev->child_index;
jzx_free(&ctx->loop->allocator, ev);
if (idx < sup->child_count) {
(void)jzx_supervisor_spawn_child(ctx->loop, ctx->self, &sup->children[idx]);
}
return JZX_BEHAVIOR_OK;
}
if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}
Supervisor actors are just actors with a special behavior: they react to system tags like “child exit” and “restart child”.
Async queue (jzx_send_async)
src/jzx_runtime.c#L608-L610// -----------------------------------------------------------------------------
// Async queue
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L612-L620static jzx_err jzx_async_queue_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->async_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->async_mutex_initialized = 1;
loop->async_head = NULL;
loop->async_tail = NULL;
return JZX_OK;
}
src/jzx_runtime.c#L622-L638static void jzx_async_queue_destroy(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
pthread_mutex_destroy(&loop->async_mutex);
loop->async_mutex_initialized = 0;
while (head) {
jzx_async_msg* next = head->next;
jzx_free(&loop->allocator, head);
head = next;
}
}
src/jzx_runtime.c#L640-L667static jzx_err jzx_async_enqueue(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop || !loop->async_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
jzx_async_msg* msg = (jzx_async_msg*)jzx_alloc(&loop->allocator, sizeof(jzx_async_msg));
if (!msg) {
return JZX_ERR_NO_MEMORY;
}
msg->target = target;
msg->data = data;
msg->len = len;
msg->tag = tag;
msg->sender = sender;
msg->next = NULL;
pthread_mutex_lock(&loop->async_mutex);
if (!loop->async_head) {
loop->async_head = msg;
loop->async_tail = msg;
} else {
loop->async_tail->next = msg;
loop->async_tail = msg;
}
pthread_mutex_unlock(&loop->async_mutex);
jzx_wakeup_signal(loop);
return JZX_OK;
}
This is the cross-thread enqueue point.
Important: the runtime does not copy the payload; it stores the pointer.
src/jzx_runtime.c#L669-L679static jzx_async_msg* jzx_async_detach(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return NULL;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
return head;
}
Detaches the list under the mutex so the loop thread can process without holding the lock.
src/jzx_runtime.c#L681-L689static void jzx_async_dispatch(jzx_loop* loop, jzx_async_msg* head) {
jzx_async_msg* msg = head;
while (msg) {
jzx_async_msg* next = msg->next;
(void)jzx_send_internal(loop, msg->target, msg->data, msg->len, msg->tag, msg->sender);
jzx_free(&loop->allocator, msg);
msg = next;
}
}
Delivers detached async messages by converting them into normal mailbox enqueues.
src/jzx_runtime.c#L691-L696static void jzx_async_drain(jzx_loop* loop) {
jzx_async_msg* head = jzx_async_detach(loop);
if (head) {
jzx_async_dispatch(loop, head);
}
}
src/jzx_runtime.c#L698-L706static int jzx_async_has_pending(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->async_mutex);
int has = loop->async_head != NULL;
pthread_mutex_unlock(&loop->async_mutex);
return has;
}
Timer system (timer thread + due list)
src/jzx_runtime.c#L708-L710// -----------------------------------------------------------------------------
// Timer system
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L712-L724static void jzx_timer_insert_locked(jzx_loop* loop, jzx_timer_entry* entry) {
if (!loop->timer_head || entry->due_ms < loop->timer_head->due_ms) {
entry->next = loop->timer_head;
loop->timer_head = entry;
return;
}
jzx_timer_entry* cur = loop->timer_head;
while (cur->next && cur->next->due_ms <= entry->due_ms) {
cur = cur->next;
}
entry->next = cur->next;
cur->next = entry;
}
src/jzx_runtime.c#L726-L769static void* jzx_timer_thread_main(void* arg) {
jzx_loop* loop = (jzx_loop*)arg;
pthread_mutex_lock(&loop->timer_mutex);
while (!loop->timer_stop) {
uint64_t now = jzx_now_ms();
jzx_timer_entry* head = loop->timer_head;
if (!head) {
pthread_cond_wait(&loop->timer_cond, &loop->timer_mutex);
continue;
}
if (head->due_ms > now) {
uint64_t wait_ms = head->due_ms - now;
#if defined(__APPLE__)
struct timespec rel;
rel.tv_sec = (time_t)(wait_ms / 1000ull);
rel.tv_nsec = (long)((wait_ms % 1000ull) * 1000000ull);
(void)pthread_cond_timedwait_relative_np(&loop->timer_cond, &loop->timer_mutex, &rel);
#else
struct timespec ts;
#if defined(__linux__)
clockid_t clock_id = loop->timer_cond_monotonic ? CLOCK_MONOTONIC : CLOCK_REALTIME;
clock_gettime(clock_id, &ts);
#else
clock_gettime(CLOCK_REALTIME, &ts);
#endif
ts.tv_sec += (time_t)(wait_ms / 1000ull);
ts.tv_nsec += (long)((wait_ms % 1000ull) * 1000000ull);
if (ts.tv_nsec >= 1000000000l) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000l;
}
(void)pthread_cond_timedwait(&loop->timer_cond, &loop->timer_mutex, &ts);
#endif
continue;
}
loop->timer_head = head->next;
pthread_mutex_unlock(&loop->timer_mutex);
jzx_async_enqueue(loop, head->target, head->data, head->len, head->tag, 0);
jzx_free(&loop->allocator, head);
pthread_mutex_lock(&loop->timer_mutex);
}
pthread_mutex_unlock(&loop->timer_mutex);
return NULL;
}
This is the timer thread loop: wait until next due timer, then enqueue wakeups/deliveries.
src/jzx_runtime.c#L771-L812static jzx_err jzx_timer_system_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->timer_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->timer_cond_monotonic = 0;
#if defined(__linux__)
pthread_condattr_t attr;
pthread_condattr_t* attr_ptr = NULL;
if (pthread_condattr_init(&attr) == 0) {
attr_ptr = &attr;
if (pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0) {
loop->timer_cond_monotonic = 1;
}
}
int cond_rc = pthread_cond_init(&loop->timer_cond, attr_ptr);
if (attr_ptr) {
pthread_condattr_destroy(&attr);
}
if (cond_rc != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#else
if (pthread_cond_init(&loop->timer_cond, NULL) != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#endif
loop->timer_mutex_initialized = 1;
loop->timer_thread_running = 0;
loop->timer_stop = 0;
loop->timer_head = NULL;
loop->next_timer_id = 1;
if (pthread_create(&loop->timer_thread, NULL, jzx_timer_thread_main, loop) != 0) {
pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
return JZX_ERR_UNKNOWN;
}
loop->timer_thread_running = 1;
return JZX_OK;
}
src/jzx_runtime.c#L814-L842static void jzx_timer_system_shutdown(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->timer_mutex);
loop->timer_stop = 1;
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
if (loop->timer_thread_running) {
pthread_join(loop->timer_thread, NULL);
loop->timer_thread_running = 0;
}
pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* cur = loop->timer_head;
loop->timer_head = NULL;
pthread_mutex_unlock(&loop->timer_mutex);
while (cur) {
jzx_timer_entry* next = cur->next;
jzx_free(&loop->allocator, cur);
cur = next;
}
pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
}
src/jzx_runtime.c#L844-L852static int jzx_timer_has_pending(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->timer_mutex);
int has = loop->timer_head != NULL;
pthread_mutex_unlock(&loop->timer_mutex);
return has;
}
I/O watchers (fd table + backend registration)
src/jzx_runtime.c#L854-L856// -----------------------------------------------------------------------------
// I O watchers
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L858-L868static jzx_err jzx_io_init(jzx_loop* loop, uint32_t capacity) {
loop->io_capacity = capacity ? capacity : 1;
loop->io_count = 0;
loop->io_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * loop->io_capacity);
if (!loop->io_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(loop->io_watchers, 0, sizeof(jzx_io_watch) * loop->io_capacity);
return JZX_OK;
}
src/jzx_runtime.c#L870-L877static void jzx_io_deinit(jzx_loop* loop) {
if (loop->io_watchers) {
jzx_free(&loop->allocator, loop->io_watchers);
loop->io_watchers = NULL;
}
loop->io_capacity = 0;
loop->io_count = 0;
}
src/jzx_runtime.c#L879-L893static jzx_err jzx_io_reserve(jzx_loop* loop, uint32_t new_cap) {
jzx_io_watch* new_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * new_cap);
if (!new_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(new_watchers, 0, sizeof(jzx_io_watch) * new_cap);
if (loop->io_watchers) {
memcpy(new_watchers, loop->io_watchers, sizeof(jzx_io_watch) * loop->io_count);
jzx_free(&loop->allocator, loop->io_watchers);
}
loop->io_watchers = new_watchers;
loop->io_capacity = new_cap;
return JZX_OK;
}
src/jzx_runtime.c#L895-L905static jzx_io_watch* jzx_io_find(jzx_loop* loop, int fd, uint32_t* idx_out) {
for (uint32_t i = 0; i < loop->io_count; ++i) {
if (loop->io_watchers[i].fd == fd) {
if (idx_out) {
*idx_out = i;
}
return &loop->io_watchers[i];
}
}
return NULL;
}
src/jzx_runtime.c#L907-L919static void jzx_io_remove_index(jzx_loop* loop, uint32_t idx) {
if (idx >= loop->io_count) {
return;
}
if (loop->xev) {
jzx_xev_unwatch_fd(loop->xev, loop->io_watchers[idx].fd);
}
uint32_t last = loop->io_count - 1;
if (idx != last) {
loop->io_watchers[idx] = loop->io_watchers[last];
}
loop->io_count--;
}
src/jzx_runtime.c#L921-L929static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor) {
for (uint32_t i = 0; i < loop->io_count;) {
if (loop->io_watchers[i].owner == actor) {
jzx_io_remove_index(loop, i);
continue;
}
++i;
}
}
libxev → runtime bridge (jzx_io_xev_notify)
The src/jzx_xev.zig layer needs a single, C-callable “upcall” into the runtime when an fd becomes ready.
That is what jzx_io_xev_notify is for.
src/jzx_runtime.c#L931-L958uint8_t jzx_io_xev_notify(jzx_loop* loop, int fd, uint32_t readiness) {
if (!loop || !loop->running || fd < 0 || readiness == 0) {
return 0;
}
uint32_t idx = 0;
jzx_io_watch* watch = jzx_io_find(loop, fd, &idx);
if (!watch) {
return 0;
}
if (!jzx_actor_table_lookup(&loop->actors, watch->owner)) {
jzx_io_remove_index(loop, idx);
return 0;
}
jzx_io_event* ev = (jzx_io_event*)jzx_alloc(&loop->allocator, sizeof(jzx_io_event));
if (!ev) {
return 1;
}
ev->fd = fd;
ev->readiness = readiness;
jzx_err err =
jzx_send_internal(loop, watch->owner, ev, sizeof(jzx_io_event), JZX_TAG_SYS_IO, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
return 1;
}
Read it as a three-stage pipeline:
- Validate: if the loop isn’t running yet, or the event is invalid, return
0(not delivered). - Resolve: map
fd → watch → owner actor.- If the owner no longer exists, the watch is removed (self-healing).
- Deliver: allocate a
jzx_io_event, enqueue it as a system message (JZX_TAG_SYS_IO).
The return value is intentionally small and lossy:
0: nothing was delivered (no watch, invalid state, stale owner).1: the event was “handled” from the backend’s perspective.- Even if the message could not be enqueued, the backend shouldn’t spin endlessly on the same readiness.
Config helpers and loop lifecycle (public ABI entry points)
Default allocator hooks
If the caller doesn’t provide a custom allocator, libjzx uses malloc/free.
src/jzx_runtime.c#L960-L972// -----------------------------------------------------------------------------
// Config helpers
// -----------------------------------------------------------------------------
static void* default_alloc(void* ctx, size_t size) {
(void)ctx;
return malloc(size);
}
static void default_free(void* ctx, void* ptr) {
(void)ctx;
free(ptr);
}
Why this is written this way:
- The allocator signature includes a
ctxpointer so applications can route allocations to arenas/pools. - The default implementation ignores
ctxand forwards to the platform heap.
src/jzx_runtime.c#L974-L988void jzx_config_init(jzx_config* cfg) {
if (!cfg) {
return;
}
memset(cfg, 0, sizeof(*cfg));
cfg->allocator.alloc = default_alloc;
cfg->allocator.free = default_free;
cfg->allocator.ctx = NULL;
cfg->max_actors = 1024;
cfg->default_mailbox_cap = 1024;
cfg->max_msgs_per_actor = 64;
cfg->max_actors_per_tick = 1024;
cfg->max_io_watchers = 1024;
cfg->io_poll_timeout_ms = 10;
}
“Apply defaults” for partially-initialized configs
jzx_config_init gives you a fully populated config, but callers can also pass a partially-filled struct
(for example, override io_poll_timeout_ms while leaving everything else “default”).
apply_defaults is the guardrail that turns “some fields set to 0” into “reasonable default values”.
src/jzx_runtime.c#L990-L1015static void apply_defaults(jzx_config* cfg) {
if (!cfg->allocator.alloc) {
cfg->allocator.alloc = default_alloc;
}
if (!cfg->allocator.free) {
cfg->allocator.free = default_free;
}
if (cfg->max_actors == 0) {
cfg->max_actors = 1024;
}
if (cfg->default_mailbox_cap == 0) {
cfg->default_mailbox_cap = 1024;
}
if (cfg->max_msgs_per_actor == 0) {
cfg->max_msgs_per_actor = 64;
}
if (cfg->max_actors_per_tick == 0) {
cfg->max_actors_per_tick = 1024;
}
if (cfg->max_io_watchers == 0) {
cfg->max_io_watchers = 1024;
}
if (cfg->io_poll_timeout_ms == 0) {
cfg->io_poll_timeout_ms = 10;
}
}
Why this exists:
- It lets you do “override one knob” without having to update this file every time new knobs are added.
- It makes
0mean “use default” for most numeric config fields, which keeps the API ergonomic.
src/jzx_runtime.c#L1017-L1019// -----------------------------------------------------------------------------
// Loop lifecycle
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L1021-L1066jzx_loop* jzx_loop_create(const jzx_config* cfg) {
jzx_config local;
if (cfg) {
local = *cfg;
} else {
jzx_config_init(&local);
}
apply_defaults(&local);
jzx_loop* loop = (jzx_loop*)jzx_alloc(&local.allocator, sizeof(jzx_loop));
if (!loop) {
return NULL;
}
memset(loop, 0, sizeof(*loop));
loop->cfg = local;
loop->allocator = local.allocator;
loop->xev = jzx_xev_create();
if (!loop->xev) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_actor_table_init(&loop->actors, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_run_queue_init(&loop->run_queue, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_async_queue_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_timer_system_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_io_init(loop, local.max_io_watchers) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
loop->running = 0;
loop->stop_requested = 0;
return loop;
}
src/jzx_runtime.c#L1068-L1090void jzx_loop_destroy(jzx_loop* loop) {
if (!loop) {
return;
}
jzx_timer_system_shutdown(loop);
jzx_async_queue_destroy(loop);
if (loop->xev) {
jzx_xev_destroy(loop->xev);
loop->xev = NULL;
}
jzx_io_deinit(loop);
for (uint32_t i = 0; i < loop->actors.capacity; ++i) {
jzx_actor* actor = loop->actors.slots ? loop->actors.slots[i] : NULL;
if (actor) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
loop->actors.slots[i] = NULL;
}
}
jzx_actor_table_deinit(&loop->actors, &loop->allocator);
jzx_run_queue_deinit(&loop->run_queue, &loop->allocator);
jzx_free(&loop->allocator, loop);
}
src/jzx_runtime.c#L1092-L1162int jzx_loop_run(jzx_loop* loop) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (loop->running) {
return JZX_ERR_LOOP_CLOSED;
}
loop->running = 1;
int rc = JZX_OK;
while (!loop->stop_requested) {
jzx_async_drain(loop);
jzx_xev_run(loop->xev, 0);
uint32_t actors_processed = 0;
while (actors_processed < loop->cfg.max_actors_per_tick) {
jzx_actor* actor = jzx_run_queue_pop(&loop->run_queue);
if (!actor) {
break;
}
actor->in_run_queue = 0;
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
continue;
}
uint32_t processed_msgs = 0;
while (processed_msgs < loop->cfg.max_msgs_per_actor) {
jzx_message msg;
if (jzx_mailbox_pop(&actor->mailbox, &msg) != 0) {
break;
}
jzx_context ctx = {
.state = actor->state,
.self = actor->id,
.loop = loop,
};
jzx_behavior_result result = actor->behavior(&ctx, &msg);
processed_msgs++;
if (result == JZX_BEHAVIOR_STOP) {
actor->status = JZX_ACTOR_STOPPING;
break;
} else if (result == JZX_BEHAVIOR_FAIL) {
actor->status = JZX_ACTOR_FAILED;
break;
}
}
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
} else if (jzx_mailbox_has_items(&actor->mailbox)) {
jzx_schedule_actor(loop, actor);
}
actors_processed++;
}
if (loop->run_queue.count == 0) {
if (loop->actors.used == 0 && !jzx_async_has_pending(loop) &&
!jzx_timer_has_pending(loop) && loop->io_count == 0) {
for (uint32_t i = 0; i < 64; ++i) {
jzx_xev_run(loop->xev, 0);
}
break;
}
if (jzx_async_has_pending(loop)) {
continue;
}
jzx_xev_run(loop->xev, 1);
}
}
loop->running = 0;
loop->stop_requested = 0;
return rc;
}
This is the main event loop: it runs ready actors up to configured budgets and integrates I/O + timers + async sends.
src/jzx_runtime.c#L1164-L1175void jzx_loop_request_stop(jzx_loop* loop) {
if (!loop) {
return;
}
loop->stop_requested = 1;
jzx_wakeup_signal(loop);
if (loop->timer_mutex_initialized) {
pthread_mutex_lock(&loop->timer_mutex);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
}
}
src/jzx_runtime.c#L1177-L1182void jzx_loop_free(jzx_loop* loop, void* ptr) {
if (!loop || !ptr) {
return;
}
jzx_free(&loop->allocator, ptr);
}
src/jzx_runtime.c#L1184-L1195void jzx_loop_set_observer(jzx_loop* loop, const jzx_observer* obs, void* ctx) {
if (!loop) {
return;
}
if (obs) {
loop->observer = *obs;
loop->observer_ctx = ctx;
} else {
memset(&loop->observer, 0, sizeof(loop->observer));
loop->observer_ctx = NULL;
}
}
Actor APIs (spawn/send/stop/fail)
src/jzx_runtime.c#L1197-L1199// -----------------------------------------------------------------------------
// Actor APIs
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L1201-L1218static jzx_actor* jzx_actor_create(jzx_loop* loop, const jzx_spawn_opts* opts) {
jzx_actor* actor = (jzx_actor*)jzx_alloc(&loop->allocator, sizeof(jzx_actor));
if (!actor) {
return NULL;
}
memset(actor, 0, sizeof(*actor));
actor->status = JZX_ACTOR_RUNNING;
actor->behavior = opts->behavior;
actor->state = opts->state;
actor->supervisor = opts->supervisor;
if (jzx_mailbox_init(&actor->mailbox,
opts->mailbox_cap ? opts->mailbox_cap : loop->cfg.default_mailbox_cap,
&loop->allocator) != JZX_OK) {
jzx_free(&loop->allocator, actor);
return NULL;
}
return actor;
}
src/jzx_runtime.c#L1220-L1236jzx_err jzx_spawn(jzx_loop* loop, const jzx_spawn_opts* opts, jzx_actor_id* out_id) {
if (!loop || !opts || !opts->behavior) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_create(loop, opts);
if (!actor) {
return JZX_ERR_NO_MEMORY;
}
jzx_err err = jzx_actor_table_insert(&loop->actors, actor, &loop->allocator, out_id);
if (err != JZX_OK) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
return err;
}
jzx_obs_actor_start(loop, actor->id, opts->name);
return JZX_OK;
}
src/jzx_runtime.c#L1238-L1259static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, target);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_message msg = {
.data = data,
.len = len,
.tag = tag,
.sender = sender,
};
if (jzx_mailbox_push(&actor->mailbox, &msg) != 0) {
jzx_obs_mailbox_full(loop, target);
return JZX_ERR_MAILBOX_FULL;
}
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
src/jzx_runtime.c#L1261-L1263jzx_err jzx_send(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_send_internal(loop, target, data, len, tag, 0);
}
src/jzx_runtime.c#L1265-L1267jzx_err jzx_send_async(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_async_enqueue(loop, target, data, len, tag, 0);
}
src/jzx_runtime.c#L1269-L1280jzx_err jzx_actor_stop(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_STOPPING;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
src/jzx_runtime.c#L1282-L1293jzx_err jzx_actor_fail(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_FAILED;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
Supervisor APIs (public ABI entry points)
src/jzx_runtime.c#L1295-L1297// -----------------------------------------------------------------------------
// Supervisor spawn
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L1299-L1340jzx_err jzx_spawn_supervisor(jzx_loop* loop, const jzx_supervisor_init* init, jzx_actor_id parent,
jzx_actor_id* out_id) {
if (!loop || !init || !init->children || init->child_count == 0) {
return JZX_ERR_INVALID_ARG;
}
jzx_supervisor_state* state = jzx_supervisor_state_create(init, &loop->allocator);
if (!state) {
return JZX_ERR_NO_MEMORY;
}
jzx_spawn_opts opts = {
.behavior = jzx_supervisor_behavior,
.state = state,
.supervisor = parent,
.mailbox_cap = 0,
.name = NULL,
};
jzx_actor_id sup_id = 0;
jzx_err err = jzx_spawn(loop, &opts, &sup_id);
if (err != JZX_OK) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return err;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, sup_id);
if (!sup_actor) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return JZX_ERR_UNKNOWN;
}
sup_actor->supervisor_state = state;
for (size_t i = 0; i < state->child_count; ++i) {
err = jzx_supervisor_spawn_child(loop, sup_id, &state->children[i]);
if (err != JZX_OK) {
(void)jzx_actor_fail(loop, sup_id);
return err;
}
}
if (out_id) {
*out_id = sup_id;
}
return JZX_OK;
}
src/jzx_runtime.c#L1342-L1356jzx_err jzx_supervisor_child_id(jzx_loop* loop, jzx_actor_id supervisor, size_t index,
jzx_actor_id* out_id) {
if (!loop || !out_id) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, supervisor);
if (!sup_actor || !sup_actor->supervisor_state) {
return JZX_ERR_NO_SUCH_ACTOR;
}
if (index >= sup_actor->supervisor_state->child_count) {
return JZX_ERR_INVALID_ARG;
}
*out_id = sup_actor->supervisor_state->children[index].id;
return JZX_OK;
}
Timers and I/O APIs (public ABI entry points)
src/jzx_runtime.c#L1358-L1360// -----------------------------------------------------------------------------
// Timers & IO
// -----------------------------------------------------------------------------
src/jzx_runtime.c#L1362-L1391jzx_err jzx_send_after(jzx_loop* loop, jzx_actor_id target, uint32_t ms, void* data, size_t len,
uint32_t tag, jzx_timer_id* out_timer) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, target)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_timer_entry* entry = (jzx_timer_entry*)jzx_alloc(&loop->allocator, sizeof(jzx_timer_entry));
if (!entry) {
return JZX_ERR_NO_MEMORY;
}
entry->target = target;
entry->data = data;
entry->len = len;
entry->tag = tag;
entry->next = NULL;
pthread_mutex_lock(&loop->timer_mutex);
entry->id = loop->next_timer_id++;
entry->due_ms = jzx_now_ms() + (uint64_t)ms;
jzx_timer_insert_locked(loop, entry);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
if (out_timer) {
*out_timer = entry->id;
}
return JZX_OK;
}
src/jzx_runtime.c#L1393-L1416jzx_err jzx_cancel_timer(jzx_loop* loop, jzx_timer_id timer) {
if (!loop || !loop->timer_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* prev = NULL;
jzx_timer_entry* cur = loop->timer_head;
while (cur) {
if (cur->id == timer) {
if (prev) {
prev->next = cur->next;
} else {
loop->timer_head = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
jzx_free(&loop->allocator, cur);
return JZX_OK;
}
prev = cur;
cur = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
return JZX_ERR_TIMER_INVALID;
}
src/jzx_runtime.c#L1418-L1456jzx_err jzx_watch_fd(jzx_loop* loop, int fd, jzx_actor_id owner, uint32_t interest) {
if (!loop || !loop->xev || fd < 0 || interest == 0) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, owner)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_io_watch* existing = jzx_io_find(loop, fd, NULL);
if (existing) {
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
return err;
}
existing->owner = owner;
existing->interest = interest;
return JZX_OK;
}
if (loop->io_count == loop->io_capacity) {
jzx_err err = jzx_io_reserve(loop, loop->io_capacity * 2);
if (err != JZX_OK) {
return err;
}
}
uint32_t idx = loop->io_count;
loop->io_watchers[idx] = (jzx_io_watch){
.fd = fd,
.owner = owner,
.interest = interest,
.active = 1,
};
loop->io_count = idx + 1;
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
loop->io_count = idx;
memset(&loop->io_watchers[idx], 0, sizeof(loop->io_watchers[idx]));
return err;
}
return JZX_OK;
}
src/jzx_runtime.c#L1458-L1469jzx_err jzx_unwatch_fd(jzx_loop* loop, int fd) {
if (!loop || fd < 0) {
return JZX_ERR_INVALID_ARG;
}
uint32_t idx = 0;
jzx_io_watch* entry = jzx_io_find(loop, fd, &idx);
if (!entry) {
return JZX_ERR_IO_NOT_WATCHED;
}
jzx_io_remove_index(loop, idx);
return JZX_OK;
}
Appendix: full file (for grepping and context)
Show full src/jzx_runtime.c
src/jzx_runtime.c#L1-L1469#include "jzx_internal.h"
#include <stdlib.h>
#include <string.h>
#include <time.h>
// -----------------------------------------------------------------------------
// Utility helpers
// -----------------------------------------------------------------------------
static inline uint32_t jzx_id_index(jzx_actor_id id) {
return (uint32_t)(id & 0xffffffffu);
}
static inline uint32_t jzx_id_generation(jzx_actor_id id) {
return (uint32_t)(id >> 32u);
}
static inline jzx_actor_id jzx_make_id(uint32_t gen, uint32_t idx) {
return ((uint64_t)gen << 32u) | (uint64_t)idx;
}
static void* jzx_alloc(jzx_allocator* alloc, size_t size) {
return alloc->alloc ? alloc->alloc(alloc->ctx, size) : NULL;
}
static void jzx_free(jzx_allocator* alloc, void* ptr) {
if (alloc->free) {
alloc->free(alloc->ctx, ptr);
}
}
static jzx_supervisor_state* jzx_supervisor_state_create(const jzx_supervisor_init* init,
jzx_allocator* allocator) {
if (!init || init->child_count == 0 || !init->children) {
return NULL;
}
jzx_supervisor_state* state =
(jzx_supervisor_state*)jzx_alloc(allocator, sizeof(jzx_supervisor_state));
if (!state) {
return NULL;
}
memset(state, 0, sizeof(*state));
state->config = init->supervisor;
state->child_count = init->child_count;
size_t bytes = sizeof(jzx_child_state) * init->child_count;
state->children = (jzx_child_state*)jzx_alloc(allocator, bytes);
if (!state->children) {
jzx_free(allocator, state);
return NULL;
}
memset(state->children, 0, bytes);
for (size_t i = 0; i < init->child_count; ++i) {
state->children[i].spec = init->children[i];
state->children[i].id = 0;
state->children[i].restart_count = 0;
state->children[i].last_restart_ms = 0;
}
state->intensity_window_count = 0;
state->intensity_window_start_ms = 0;
return state;
}
static void jzx_supervisor_state_destroy(jzx_supervisor_state* state, jzx_allocator* allocator) {
if (!state)
return;
if (state->children) {
jzx_free(allocator, state->children);
}
jzx_free(allocator, state);
}
static int jzx_supervisor_allow_restart(jzx_supervisor_state* sup, uint64_t now_ms) {
if (!sup)
return 0;
if (sup->config.intensity == 0 || sup->config.period_ms == 0) {
return 1;
}
if (sup->intensity_window_start_ms == 0 ||
now_ms - sup->intensity_window_start_ms > sup->config.period_ms) {
sup->intensity_window_start_ms = now_ms;
sup->intensity_window_count = 0;
}
sup->intensity_window_count += 1;
if (sup->intensity_window_count > sup->config.intensity) {
return 0;
}
return 1;
}
static uint64_t jzx_now_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000ull + (uint64_t)ts.tv_nsec / 1000000ull;
}
static uint32_t jzx_sat_add32(uint32_t a, uint32_t b) {
uint64_t sum = (uint64_t)a + (uint64_t)b;
if (sum > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)sum;
}
static uint32_t jzx_sat_mul32(uint32_t a, uint32_t b) {
if (a == 0 || b == 0)
return 0;
uint64_t prod = (uint64_t)a * (uint64_t)b;
if (prod > UINT32_MAX) {
return UINT32_MAX;
}
return (uint32_t)prod;
}
static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender);
static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor);
// -----------------------------------------------------------------------------
// Wakeup helpers
// -----------------------------------------------------------------------------
static void jzx_wakeup_signal(jzx_loop* loop) {
if (!loop || !loop->xev) {
return;
}
jzx_xev_wakeup(loop->xev);
}
// -----------------------------------------------------------------------------
// Observer helpers
// -----------------------------------------------------------------------------
static void jzx_obs_actor_start(jzx_loop* loop, jzx_actor_id id, const char* name) {
if (loop && loop->observer.on_actor_start) {
loop->observer.on_actor_start(loop->observer_ctx, id, name);
}
}
static void jzx_obs_actor_stop(jzx_loop* loop, jzx_actor_id id, jzx_exit_reason reason) {
if (loop && loop->observer.on_actor_stop) {
loop->observer.on_actor_stop(loop->observer_ctx, id, reason);
}
}
static void jzx_obs_actor_restart(jzx_loop* loop, jzx_actor_id supervisor, jzx_actor_id child,
uint32_t attempt) {
if (loop && loop->observer.on_actor_restart) {
loop->observer.on_actor_restart(loop->observer_ctx, supervisor, child, attempt);
}
}
static void jzx_obs_supervisor_escalate(jzx_loop* loop, jzx_actor_id supervisor) {
if (loop && loop->observer.on_supervisor_escalate) {
loop->observer.on_supervisor_escalate(loop->observer_ctx, supervisor);
}
}
static void jzx_obs_mailbox_full(jzx_loop* loop, jzx_actor_id target) {
if (loop && loop->observer.on_mailbox_full) {
loop->observer.on_mailbox_full(loop->observer_ctx, target);
}
}
// -----------------------------------------------------------------------------
// Mailbox implementation
// -----------------------------------------------------------------------------
static jzx_err jzx_mailbox_init(jzx_mailbox_impl* box, uint32_t capacity,
jzx_allocator* allocator) {
if (capacity == 0) {
capacity = 1;
}
size_t bytes = sizeof(jzx_message) * capacity;
jzx_message* buffer = (jzx_message*)jzx_alloc(allocator, bytes);
if (!buffer) {
return JZX_ERR_NO_MEMORY;
}
memset(buffer, 0, bytes);
box->buffer = buffer;
box->capacity = capacity;
box->head = 0;
box->tail = 0;
box->count = 0;
return JZX_OK;
}
static void jzx_mailbox_deinit(jzx_mailbox_impl* box, jzx_allocator* allocator) {
if (box->buffer) {
jzx_free(allocator, box->buffer);
}
memset(box, 0, sizeof(*box));
}
static int jzx_mailbox_push(jzx_mailbox_impl* box, const jzx_message* msg) {
if (box->count == box->capacity) {
return -1;
}
box->buffer[box->tail] = *msg;
box->tail = (box->tail + 1) % box->capacity;
box->count++;
return 0;
}
static int jzx_mailbox_pop(jzx_mailbox_impl* box, jzx_message* out) {
if (box->count == 0) {
return -1;
}
*out = box->buffer[box->head];
box->head = (box->head + 1) % box->capacity;
box->count--;
return 0;
}
static int jzx_mailbox_has_items(const jzx_mailbox_impl* box) {
return box->count > 0;
}
// -----------------------------------------------------------------------------
// Actor table implementation
// -----------------------------------------------------------------------------
static jzx_err jzx_actor_table_init(jzx_actor_table* table, uint32_t capacity,
jzx_allocator* allocator) {
memset(table, 0, sizeof(*table));
table->capacity = capacity;
size_t slot_bytes = sizeof(jzx_actor*) * capacity;
size_t gen_bytes = sizeof(uint32_t) * capacity;
size_t stack_bytes = sizeof(uint32_t) * capacity;
table->slots = (jzx_actor**)jzx_alloc(allocator, slot_bytes);
table->generations = (uint32_t*)jzx_alloc(allocator, gen_bytes);
table->free_stack = (uint32_t*)jzx_alloc(allocator, stack_bytes);
if (!table->slots || !table->generations || !table->free_stack) {
return JZX_ERR_NO_MEMORY;
}
memset(table->slots, 0, slot_bytes);
for (uint32_t i = 0; i < capacity; ++i) {
table->generations[i] = 1;
table->free_stack[i] = capacity - 1 - i;
}
table->free_top = capacity;
table->used = 0;
return JZX_OK;
}
static void jzx_actor_table_deinit(jzx_actor_table* table, jzx_allocator* allocator) {
if (!table) {
return;
}
if (table->slots) {
jzx_free(allocator, table->slots);
}
if (table->generations) {
jzx_free(allocator, table->generations);
}
if (table->free_stack) {
jzx_free(allocator, table->free_stack);
}
memset(table, 0, sizeof(*table));
}
static jzx_actor* jzx_actor_table_lookup(jzx_actor_table* table, jzx_actor_id id) {
uint32_t idx = jzx_id_index(id);
if (idx >= table->capacity) {
return NULL;
}
if (table->generations[idx] != jzx_id_generation(id)) {
return NULL;
}
return table->slots[idx];
}
static jzx_err jzx_actor_table_insert(jzx_actor_table* table, jzx_actor* actor,
jzx_allocator* allocator, jzx_actor_id* out_id) {
(void)allocator;
if (table->free_top == 0) {
return JZX_ERR_MAX_ACTORS;
}
uint32_t idx = table->free_stack[--table->free_top];
uint32_t gen = table->generations[idx];
actor->id = jzx_make_id(gen, idx);
table->slots[idx] = actor;
table->used++;
if (out_id) {
*out_id = actor->id;
}
return JZX_OK;
}
static void jzx_actor_table_remove(jzx_actor_table* table, jzx_actor* actor) {
if (!actor) {
return;
}
uint32_t idx = jzx_id_index(actor->id);
if (idx >= table->capacity) {
return;
}
if (table->slots[idx] != actor) {
return;
}
table->slots[idx] = NULL;
table->generations[idx] += 1u;
table->free_stack[table->free_top++] = idx;
if (table->used > 0) {
table->used--;
}
}
// -----------------------------------------------------------------------------
// Run queue implementation
// -----------------------------------------------------------------------------
static jzx_err jzx_run_queue_init(jzx_run_queue* rq, uint32_t capacity, jzx_allocator* allocator) {
memset(rq, 0, sizeof(*rq));
rq->capacity = capacity > 0 ? capacity : 1;
rq->entries = (jzx_actor**)jzx_alloc(allocator, sizeof(jzx_actor*) * rq->capacity);
if (!rq->entries) {
return JZX_ERR_NO_MEMORY;
}
memset(rq->entries, 0, sizeof(jzx_actor*) * rq->capacity);
return JZX_OK;
}
static void jzx_run_queue_deinit(jzx_run_queue* rq, jzx_allocator* allocator) {
if (rq->entries) {
jzx_free(allocator, rq->entries);
}
memset(rq, 0, sizeof(*rq));
}
static int jzx_run_queue_push(jzx_run_queue* rq, jzx_actor* actor) {
if (rq->count == rq->capacity) {
return -1;
}
rq->entries[rq->tail] = actor;
rq->tail = (rq->tail + 1) % rq->capacity;
rq->count++;
return 0;
}
static jzx_actor* jzx_run_queue_pop(jzx_run_queue* rq) {
if (rq->count == 0) {
return NULL;
}
jzx_actor* actor = rq->entries[rq->head];
rq->entries[rq->head] = NULL;
rq->head = (rq->head + 1) % rq->capacity;
rq->count--;
return actor;
}
static void jzx_schedule_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor || actor->in_run_queue) {
return;
}
if (jzx_run_queue_push(&loop->run_queue, actor) == 0) {
actor->in_run_queue = 1;
}
}
static void jzx_teardown_actor(jzx_loop* loop, jzx_actor* actor) {
if (!actor) {
return;
}
jzx_io_remove_actor(loop, actor->id);
jzx_exit_reason reason = JZX_EXIT_NORMAL;
if (actor->status == JZX_ACTOR_FAILED) {
reason = JZX_EXIT_FAIL;
}
jzx_obs_actor_stop(loop, actor->id, reason);
if (actor->supervisor) {
jzx_child_exit* ev = (jzx_child_exit*)jzx_alloc(&loop->allocator, sizeof(jzx_child_exit));
if (ev) {
ev->child = actor->id;
ev->status = actor->status;
jzx_err err = jzx_send_internal(loop, actor->supervisor, ev, sizeof(jzx_child_exit),
JZX_TAG_SYS_CHILD_EXIT, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
}
}
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_actor_table_remove(&loop->actors, actor);
if (actor->supervisor_state) {
jzx_supervisor_state_destroy(actor->supervisor_state, &loop->allocator);
}
jzx_free(&loop->allocator, actor);
}
// -----------------------------------------------------------------------------
// Supervisor helpers
// -----------------------------------------------------------------------------
static jzx_child_state* jzx_supervisor_find_child(jzx_supervisor_state* sup, jzx_actor_id id,
size_t* out_idx) {
if (!sup)
return NULL;
for (size_t i = 0; i < sup->child_count; ++i) {
if (sup->children[i].id == id) {
if (out_idx) {
*out_idx = i;
}
return &sup->children[i];
}
}
return NULL;
}
static jzx_err jzx_supervisor_spawn_child(jzx_loop* loop, jzx_actor_id supervisor_id,
jzx_child_state* child) {
jzx_spawn_opts opts = {
.behavior = child->spec.behavior,
.state = child->spec.state,
.supervisor = supervisor_id,
.mailbox_cap = child->spec.mailbox_cap,
.name = child->spec.name,
};
child->last_restart_ms = jzx_now_ms();
return jzx_spawn(loop, &opts, &child->id);
}
static void jzx_supervisor_stop_child(jzx_loop* loop, jzx_child_state* child) {
if (child->id != 0) {
(void)jzx_actor_stop(loop, child->id);
child->id = 0;
}
}
static void jzx_supervisor_schedule_restart(jzx_loop* loop, jzx_actor* sup_actor, size_t child_idx,
uint32_t delay_ms) {
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (!sup || child_idx >= sup->child_count)
return;
if (delay_ms == 0) {
(void)jzx_supervisor_spawn_child(loop, sup_actor->id, &sup->children[child_idx]);
return;
}
jzx_child_restart* payload =
(jzx_child_restart*)jzx_alloc(&loop->allocator, sizeof(jzx_child_restart));
if (!payload)
return;
payload->child_index = (uint32_t)child_idx;
jzx_err err = jzx_send_after(loop, sup_actor->id, delay_ms, payload, sizeof(jzx_child_restart),
JZX_TAG_SYS_CHILD_RESTART, NULL);
if (err != JZX_OK) {
jzx_free(&loop->allocator, payload);
}
}
static uint32_t jzx_supervisor_compute_delay(const jzx_supervisor_state* sup,
const jzx_child_state* child) {
if (!sup || !child)
return 0;
uint32_t base = child->spec.restart_delay_ms;
jzx_backoff_type strategy = child->spec.backoff;
if (strategy == JZX_BACKOFF_NONE) {
strategy = sup->config.backoff;
}
uint32_t step = sup->config.backoff_delay_ms;
switch (strategy) {
case JZX_BACKOFF_NONE:
return base;
case JZX_BACKOFF_CONSTANT: {
uint32_t extra = jzx_sat_mul32(step, child->restart_count);
return jzx_sat_add32(base, extra);
}
case JZX_BACKOFF_EXPONENTIAL: {
uint32_t factor = 1u;
uint32_t shifts = child->restart_count;
if (shifts >= 31) {
factor = UINT32_MAX;
} else {
factor = 1u << shifts;
}
uint32_t scaled_base = jzx_sat_mul32(base ? base : step, factor);
return scaled_base;
}
}
return base;
}
static void jzx_supervisor_restart_strategy(jzx_loop* loop, jzx_actor* supervisor_actor,
size_t failed_idx, jzx_actor_id failed_child_id,
jzx_supervisor_state* sup) {
uint32_t failed_delay = sup->children[failed_idx].spec.restart_delay_ms;
switch (sup->config.strategy) {
case JZX_SUP_ONE_FOR_ONE:
if (failed_child_id) {
uint32_t attempt = sup->children[failed_idx].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, failed_child_id, attempt);
}
sup->children[failed_idx].restart_count += 1;
sup->children[failed_idx].last_restart_ms = jzx_now_ms();
failed_delay = jzx_supervisor_compute_delay(sup, &sup->children[failed_idx]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, failed_idx, failed_delay);
break;
case JZX_SUP_ONE_FOR_ALL:
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = 0; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
case JZX_SUP_REST_FOR_ONE:
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_actor_id child_id = sup->children[i].id;
if (i == failed_idx) {
child_id = failed_child_id;
}
if (child_id) {
uint32_t attempt = sup->children[i].restart_count + 1;
jzx_obs_actor_restart(loop, supervisor_actor->id, child_id, attempt);
}
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(loop, &sup->children[i]);
}
for (size_t i = failed_idx; i < sup->child_count; ++i) {
sup->children[i].restart_count += 1;
sup->children[i].last_restart_ms = jzx_now_ms();
uint32_t delay = jzx_supervisor_compute_delay(sup, &sup->children[i]);
jzx_supervisor_schedule_restart(loop, supervisor_actor, i, delay);
}
break;
}
}
static jzx_behavior_result jzx_supervisor_behavior(jzx_context* ctx, const jzx_message* msg) {
jzx_actor* sup_actor = jzx_actor_table_lookup(&ctx->loop->actors, ctx->self);
if (!sup_actor || !sup_actor->supervisor_state) {
if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}
jzx_supervisor_state* sup = sup_actor->supervisor_state;
if (msg->tag == JZX_TAG_SYS_CHILD_EXIT && msg->data) {
jzx_child_exit* ev = (jzx_child_exit*)msg->data;
jzx_actor_id failed_child_id = ev->child;
size_t idx = 0;
jzx_child_state* child = jzx_supervisor_find_child(sup, ev->child, &idx);
jzx_actor_status status = ev->status;
jzx_free(&ctx->loop->allocator, ev);
if (!child) {
return JZX_BEHAVIOR_OK;
}
child->id = 0;
int restart = 0;
if (child->spec.mode == JZX_CHILD_PERMANENT) {
restart = 1;
} else if (child->spec.mode == JZX_CHILD_TRANSIENT && status == JZX_ACTOR_FAILED) {
restart = 1;
}
if (!restart) {
return JZX_BEHAVIOR_OK;
}
uint64_t now = jzx_now_ms();
if (!jzx_supervisor_allow_restart(sup, now)) {
jzx_obs_supervisor_escalate(ctx->loop, ctx->self);
for (size_t i = 0; i < sup->child_count; ++i) {
jzx_supervisor_stop_child(ctx->loop, &sup->children[i]);
}
sup_actor->status = JZX_ACTOR_FAILED;
return JZX_BEHAVIOR_FAIL;
}
jzx_supervisor_restart_strategy(ctx->loop, sup_actor, idx, failed_child_id, sup);
return JZX_BEHAVIOR_OK;
}
if (msg->tag == JZX_TAG_SYS_CHILD_RESTART && msg->data) {
jzx_child_restart* ev = (jzx_child_restart*)msg->data;
uint32_t idx = ev->child_index;
jzx_free(&ctx->loop->allocator, ev);
if (idx < sup->child_count) {
(void)jzx_supervisor_spawn_child(ctx->loop, ctx->self, &sup->children[idx]);
}
return JZX_BEHAVIOR_OK;
}
if (msg->data) {
jzx_free(&ctx->loop->allocator, msg->data);
}
return JZX_BEHAVIOR_OK;
}
// -----------------------------------------------------------------------------
// Async queue
// -----------------------------------------------------------------------------
static jzx_err jzx_async_queue_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->async_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->async_mutex_initialized = 1;
loop->async_head = NULL;
loop->async_tail = NULL;
return JZX_OK;
}
static void jzx_async_queue_destroy(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
pthread_mutex_destroy(&loop->async_mutex);
loop->async_mutex_initialized = 0;
while (head) {
jzx_async_msg* next = head->next;
jzx_free(&loop->allocator, head);
head = next;
}
}
static jzx_err jzx_async_enqueue(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop || !loop->async_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
jzx_async_msg* msg = (jzx_async_msg*)jzx_alloc(&loop->allocator, sizeof(jzx_async_msg));
if (!msg) {
return JZX_ERR_NO_MEMORY;
}
msg->target = target;
msg->data = data;
msg->len = len;
msg->tag = tag;
msg->sender = sender;
msg->next = NULL;
pthread_mutex_lock(&loop->async_mutex);
if (!loop->async_head) {
loop->async_head = msg;
loop->async_tail = msg;
} else {
loop->async_tail->next = msg;
loop->async_tail = msg;
}
pthread_mutex_unlock(&loop->async_mutex);
jzx_wakeup_signal(loop);
return JZX_OK;
}
static jzx_async_msg* jzx_async_detach(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return NULL;
}
pthread_mutex_lock(&loop->async_mutex);
jzx_async_msg* head = loop->async_head;
loop->async_head = NULL;
loop->async_tail = NULL;
pthread_mutex_unlock(&loop->async_mutex);
return head;
}
static void jzx_async_dispatch(jzx_loop* loop, jzx_async_msg* head) {
jzx_async_msg* msg = head;
while (msg) {
jzx_async_msg* next = msg->next;
(void)jzx_send_internal(loop, msg->target, msg->data, msg->len, msg->tag, msg->sender);
jzx_free(&loop->allocator, msg);
msg = next;
}
}
static void jzx_async_drain(jzx_loop* loop) {
jzx_async_msg* head = jzx_async_detach(loop);
if (head) {
jzx_async_dispatch(loop, head);
}
}
static int jzx_async_has_pending(jzx_loop* loop) {
if (!loop->async_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->async_mutex);
int has = loop->async_head != NULL;
pthread_mutex_unlock(&loop->async_mutex);
return has;
}
// -----------------------------------------------------------------------------
// Timer system
// -----------------------------------------------------------------------------
static void jzx_timer_insert_locked(jzx_loop* loop, jzx_timer_entry* entry) {
if (!loop->timer_head || entry->due_ms < loop->timer_head->due_ms) {
entry->next = loop->timer_head;
loop->timer_head = entry;
return;
}
jzx_timer_entry* cur = loop->timer_head;
while (cur->next && cur->next->due_ms <= entry->due_ms) {
cur = cur->next;
}
entry->next = cur->next;
cur->next = entry;
}
static void* jzx_timer_thread_main(void* arg) {
jzx_loop* loop = (jzx_loop*)arg;
pthread_mutex_lock(&loop->timer_mutex);
while (!loop->timer_stop) {
uint64_t now = jzx_now_ms();
jzx_timer_entry* head = loop->timer_head;
if (!head) {
pthread_cond_wait(&loop->timer_cond, &loop->timer_mutex);
continue;
}
if (head->due_ms > now) {
uint64_t wait_ms = head->due_ms - now;
#if defined(__APPLE__)
struct timespec rel;
rel.tv_sec = (time_t)(wait_ms / 1000ull);
rel.tv_nsec = (long)((wait_ms % 1000ull) * 1000000ull);
(void)pthread_cond_timedwait_relative_np(&loop->timer_cond, &loop->timer_mutex, &rel);
#else
struct timespec ts;
#if defined(__linux__)
clockid_t clock_id = loop->timer_cond_monotonic ? CLOCK_MONOTONIC : CLOCK_REALTIME;
clock_gettime(clock_id, &ts);
#else
clock_gettime(CLOCK_REALTIME, &ts);
#endif
ts.tv_sec += (time_t)(wait_ms / 1000ull);
ts.tv_nsec += (long)((wait_ms % 1000ull) * 1000000ull);
if (ts.tv_nsec >= 1000000000l) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000l;
}
(void)pthread_cond_timedwait(&loop->timer_cond, &loop->timer_mutex, &ts);
#endif
continue;
}
loop->timer_head = head->next;
pthread_mutex_unlock(&loop->timer_mutex);
jzx_async_enqueue(loop, head->target, head->data, head->len, head->tag, 0);
jzx_free(&loop->allocator, head);
pthread_mutex_lock(&loop->timer_mutex);
}
pthread_mutex_unlock(&loop->timer_mutex);
return NULL;
}
static jzx_err jzx_timer_system_init(jzx_loop* loop) {
if (pthread_mutex_init(&loop->timer_mutex, NULL) != 0) {
return JZX_ERR_UNKNOWN;
}
loop->timer_cond_monotonic = 0;
#if defined(__linux__)
pthread_condattr_t attr;
pthread_condattr_t* attr_ptr = NULL;
if (pthread_condattr_init(&attr) == 0) {
attr_ptr = &attr;
if (pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0) {
loop->timer_cond_monotonic = 1;
}
}
int cond_rc = pthread_cond_init(&loop->timer_cond, attr_ptr);
if (attr_ptr) {
pthread_condattr_destroy(&attr);
}
if (cond_rc != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#else
if (pthread_cond_init(&loop->timer_cond, NULL) != 0) {
pthread_mutex_destroy(&loop->timer_mutex);
return JZX_ERR_UNKNOWN;
}
#endif
loop->timer_mutex_initialized = 1;
loop->timer_thread_running = 0;
loop->timer_stop = 0;
loop->timer_head = NULL;
loop->next_timer_id = 1;
if (pthread_create(&loop->timer_thread, NULL, jzx_timer_thread_main, loop) != 0) {
pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
return JZX_ERR_UNKNOWN;
}
loop->timer_thread_running = 1;
return JZX_OK;
}
static void jzx_timer_system_shutdown(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return;
}
pthread_mutex_lock(&loop->timer_mutex);
loop->timer_stop = 1;
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
if (loop->timer_thread_running) {
pthread_join(loop->timer_thread, NULL);
loop->timer_thread_running = 0;
}
pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* cur = loop->timer_head;
loop->timer_head = NULL;
pthread_mutex_unlock(&loop->timer_mutex);
while (cur) {
jzx_timer_entry* next = cur->next;
jzx_free(&loop->allocator, cur);
cur = next;
}
pthread_cond_destroy(&loop->timer_cond);
pthread_mutex_destroy(&loop->timer_mutex);
loop->timer_mutex_initialized = 0;
}
static int jzx_timer_has_pending(jzx_loop* loop) {
if (!loop->timer_mutex_initialized) {
return 0;
}
pthread_mutex_lock(&loop->timer_mutex);
int has = loop->timer_head != NULL;
pthread_mutex_unlock(&loop->timer_mutex);
return has;
}
// -----------------------------------------------------------------------------
// I O watchers
// -----------------------------------------------------------------------------
static jzx_err jzx_io_init(jzx_loop* loop, uint32_t capacity) {
loop->io_capacity = capacity ? capacity : 1;
loop->io_count = 0;
loop->io_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * loop->io_capacity);
if (!loop->io_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(loop->io_watchers, 0, sizeof(jzx_io_watch) * loop->io_capacity);
return JZX_OK;
}
static void jzx_io_deinit(jzx_loop* loop) {
if (loop->io_watchers) {
jzx_free(&loop->allocator, loop->io_watchers);
loop->io_watchers = NULL;
}
loop->io_capacity = 0;
loop->io_count = 0;
}
static jzx_err jzx_io_reserve(jzx_loop* loop, uint32_t new_cap) {
jzx_io_watch* new_watchers =
(jzx_io_watch*)jzx_alloc(&loop->allocator, sizeof(jzx_io_watch) * new_cap);
if (!new_watchers) {
return JZX_ERR_NO_MEMORY;
}
memset(new_watchers, 0, sizeof(jzx_io_watch) * new_cap);
if (loop->io_watchers) {
memcpy(new_watchers, loop->io_watchers, sizeof(jzx_io_watch) * loop->io_count);
jzx_free(&loop->allocator, loop->io_watchers);
}
loop->io_watchers = new_watchers;
loop->io_capacity = new_cap;
return JZX_OK;
}
static jzx_io_watch* jzx_io_find(jzx_loop* loop, int fd, uint32_t* idx_out) {
for (uint32_t i = 0; i < loop->io_count; ++i) {
if (loop->io_watchers[i].fd == fd) {
if (idx_out) {
*idx_out = i;
}
return &loop->io_watchers[i];
}
}
return NULL;
}
static void jzx_io_remove_index(jzx_loop* loop, uint32_t idx) {
if (idx >= loop->io_count) {
return;
}
if (loop->xev) {
jzx_xev_unwatch_fd(loop->xev, loop->io_watchers[idx].fd);
}
uint32_t last = loop->io_count - 1;
if (idx != last) {
loop->io_watchers[idx] = loop->io_watchers[last];
}
loop->io_count--;
}
static void jzx_io_remove_actor(jzx_loop* loop, jzx_actor_id actor) {
for (uint32_t i = 0; i < loop->io_count;) {
if (loop->io_watchers[i].owner == actor) {
jzx_io_remove_index(loop, i);
continue;
}
++i;
}
}
uint8_t jzx_io_xev_notify(jzx_loop* loop, int fd, uint32_t readiness) {
if (!loop || !loop->running || fd < 0 || readiness == 0) {
return 0;
}
uint32_t idx = 0;
jzx_io_watch* watch = jzx_io_find(loop, fd, &idx);
if (!watch) {
return 0;
}
if (!jzx_actor_table_lookup(&loop->actors, watch->owner)) {
jzx_io_remove_index(loop, idx);
return 0;
}
jzx_io_event* ev = (jzx_io_event*)jzx_alloc(&loop->allocator, sizeof(jzx_io_event));
if (!ev) {
return 1;
}
ev->fd = fd;
ev->readiness = readiness;
jzx_err err =
jzx_send_internal(loop, watch->owner, ev, sizeof(jzx_io_event), JZX_TAG_SYS_IO, 0);
if (err != JZX_OK) {
jzx_free(&loop->allocator, ev);
}
return 1;
}
// -----------------------------------------------------------------------------
// Config helpers
// -----------------------------------------------------------------------------
static void* default_alloc(void* ctx, size_t size) {
(void)ctx;
return malloc(size);
}
static void default_free(void* ctx, void* ptr) {
(void)ctx;
free(ptr);
}
void jzx_config_init(jzx_config* cfg) {
if (!cfg) {
return;
}
memset(cfg, 0, sizeof(*cfg));
cfg->allocator.alloc = default_alloc;
cfg->allocator.free = default_free;
cfg->allocator.ctx = NULL;
cfg->max_actors = 1024;
cfg->default_mailbox_cap = 1024;
cfg->max_msgs_per_actor = 64;
cfg->max_actors_per_tick = 1024;
cfg->max_io_watchers = 1024;
cfg->io_poll_timeout_ms = 10;
}
static void apply_defaults(jzx_config* cfg) {
if (!cfg->allocator.alloc) {
cfg->allocator.alloc = default_alloc;
}
if (!cfg->allocator.free) {
cfg->allocator.free = default_free;
}
if (cfg->max_actors == 0) {
cfg->max_actors = 1024;
}
if (cfg->default_mailbox_cap == 0) {
cfg->default_mailbox_cap = 1024;
}
if (cfg->max_msgs_per_actor == 0) {
cfg->max_msgs_per_actor = 64;
}
if (cfg->max_actors_per_tick == 0) {
cfg->max_actors_per_tick = 1024;
}
if (cfg->max_io_watchers == 0) {
cfg->max_io_watchers = 1024;
}
if (cfg->io_poll_timeout_ms == 0) {
cfg->io_poll_timeout_ms = 10;
}
}
// -----------------------------------------------------------------------------
// Loop lifecycle
// -----------------------------------------------------------------------------
jzx_loop* jzx_loop_create(const jzx_config* cfg) {
jzx_config local;
if (cfg) {
local = *cfg;
} else {
jzx_config_init(&local);
}
apply_defaults(&local);
jzx_loop* loop = (jzx_loop*)jzx_alloc(&local.allocator, sizeof(jzx_loop));
if (!loop) {
return NULL;
}
memset(loop, 0, sizeof(*loop));
loop->cfg = local;
loop->allocator = local.allocator;
loop->xev = jzx_xev_create();
if (!loop->xev) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_actor_table_init(&loop->actors, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_run_queue_init(&loop->run_queue, local.max_actors, &loop->allocator) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_async_queue_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_timer_system_init(loop) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
if (jzx_io_init(loop, local.max_io_watchers) != JZX_OK) {
jzx_loop_destroy(loop);
return NULL;
}
loop->running = 0;
loop->stop_requested = 0;
return loop;
}
void jzx_loop_destroy(jzx_loop* loop) {
if (!loop) {
return;
}
jzx_timer_system_shutdown(loop);
jzx_async_queue_destroy(loop);
if (loop->xev) {
jzx_xev_destroy(loop->xev);
loop->xev = NULL;
}
jzx_io_deinit(loop);
for (uint32_t i = 0; i < loop->actors.capacity; ++i) {
jzx_actor* actor = loop->actors.slots ? loop->actors.slots[i] : NULL;
if (actor) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
loop->actors.slots[i] = NULL;
}
}
jzx_actor_table_deinit(&loop->actors, &loop->allocator);
jzx_run_queue_deinit(&loop->run_queue, &loop->allocator);
jzx_free(&loop->allocator, loop);
}
int jzx_loop_run(jzx_loop* loop) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (loop->running) {
return JZX_ERR_LOOP_CLOSED;
}
loop->running = 1;
int rc = JZX_OK;
while (!loop->stop_requested) {
jzx_async_drain(loop);
jzx_xev_run(loop->xev, 0);
uint32_t actors_processed = 0;
while (actors_processed < loop->cfg.max_actors_per_tick) {
jzx_actor* actor = jzx_run_queue_pop(&loop->run_queue);
if (!actor) {
break;
}
actor->in_run_queue = 0;
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
continue;
}
uint32_t processed_msgs = 0;
while (processed_msgs < loop->cfg.max_msgs_per_actor) {
jzx_message msg;
if (jzx_mailbox_pop(&actor->mailbox, &msg) != 0) {
break;
}
jzx_context ctx = {
.state = actor->state,
.self = actor->id,
.loop = loop,
};
jzx_behavior_result result = actor->behavior(&ctx, &msg);
processed_msgs++;
if (result == JZX_BEHAVIOR_STOP) {
actor->status = JZX_ACTOR_STOPPING;
break;
} else if (result == JZX_BEHAVIOR_FAIL) {
actor->status = JZX_ACTOR_FAILED;
break;
}
}
if (actor->status == JZX_ACTOR_STOPPING || actor->status == JZX_ACTOR_FAILED) {
jzx_teardown_actor(loop, actor);
} else if (jzx_mailbox_has_items(&actor->mailbox)) {
jzx_schedule_actor(loop, actor);
}
actors_processed++;
}
if (loop->run_queue.count == 0) {
if (loop->actors.used == 0 && !jzx_async_has_pending(loop) &&
!jzx_timer_has_pending(loop) && loop->io_count == 0) {
for (uint32_t i = 0; i < 64; ++i) {
jzx_xev_run(loop->xev, 0);
}
break;
}
if (jzx_async_has_pending(loop)) {
continue;
}
jzx_xev_run(loop->xev, 1);
}
}
loop->running = 0;
loop->stop_requested = 0;
return rc;
}
void jzx_loop_request_stop(jzx_loop* loop) {
if (!loop) {
return;
}
loop->stop_requested = 1;
jzx_wakeup_signal(loop);
if (loop->timer_mutex_initialized) {
pthread_mutex_lock(&loop->timer_mutex);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
}
}
void jzx_loop_free(jzx_loop* loop, void* ptr) {
if (!loop || !ptr) {
return;
}
jzx_free(&loop->allocator, ptr);
}
void jzx_loop_set_observer(jzx_loop* loop, const jzx_observer* obs, void* ctx) {
if (!loop) {
return;
}
if (obs) {
loop->observer = *obs;
loop->observer_ctx = ctx;
} else {
memset(&loop->observer, 0, sizeof(loop->observer));
loop->observer_ctx = NULL;
}
}
// -----------------------------------------------------------------------------
// Actor APIs
// -----------------------------------------------------------------------------
static jzx_actor* jzx_actor_create(jzx_loop* loop, const jzx_spawn_opts* opts) {
jzx_actor* actor = (jzx_actor*)jzx_alloc(&loop->allocator, sizeof(jzx_actor));
if (!actor) {
return NULL;
}
memset(actor, 0, sizeof(*actor));
actor->status = JZX_ACTOR_RUNNING;
actor->behavior = opts->behavior;
actor->state = opts->state;
actor->supervisor = opts->supervisor;
if (jzx_mailbox_init(&actor->mailbox,
opts->mailbox_cap ? opts->mailbox_cap : loop->cfg.default_mailbox_cap,
&loop->allocator) != JZX_OK) {
jzx_free(&loop->allocator, actor);
return NULL;
}
return actor;
}
jzx_err jzx_spawn(jzx_loop* loop, const jzx_spawn_opts* opts, jzx_actor_id* out_id) {
if (!loop || !opts || !opts->behavior) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_create(loop, opts);
if (!actor) {
return JZX_ERR_NO_MEMORY;
}
jzx_err err = jzx_actor_table_insert(&loop->actors, actor, &loop->allocator, out_id);
if (err != JZX_OK) {
jzx_mailbox_deinit(&actor->mailbox, &loop->allocator);
jzx_free(&loop->allocator, actor);
return err;
}
jzx_obs_actor_start(loop, actor->id, opts->name);
return JZX_OK;
}
static jzx_err jzx_send_internal(jzx_loop* loop, jzx_actor_id target, void* data, size_t len,
uint32_t tag, jzx_actor_id sender) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, target);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_message msg = {
.data = data,
.len = len,
.tag = tag,
.sender = sender,
};
if (jzx_mailbox_push(&actor->mailbox, &msg) != 0) {
jzx_obs_mailbox_full(loop, target);
return JZX_ERR_MAILBOX_FULL;
}
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
jzx_err jzx_send(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_send_internal(loop, target, data, len, tag, 0);
}
jzx_err jzx_send_async(jzx_loop* loop, jzx_actor_id target, void* data, size_t len, uint32_t tag) {
return jzx_async_enqueue(loop, target, data, len, tag, 0);
}
jzx_err jzx_actor_stop(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_STOPPING;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
jzx_err jzx_actor_fail(jzx_loop* loop, jzx_actor_id id) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* actor = jzx_actor_table_lookup(&loop->actors, id);
if (!actor) {
return JZX_ERR_NO_SUCH_ACTOR;
}
actor->status = JZX_ACTOR_FAILED;
jzx_schedule_actor(loop, actor);
return JZX_OK;
}
// -----------------------------------------------------------------------------
// Supervisor spawn
// -----------------------------------------------------------------------------
jzx_err jzx_spawn_supervisor(jzx_loop* loop, const jzx_supervisor_init* init, jzx_actor_id parent,
jzx_actor_id* out_id) {
if (!loop || !init || !init->children || init->child_count == 0) {
return JZX_ERR_INVALID_ARG;
}
jzx_supervisor_state* state = jzx_supervisor_state_create(init, &loop->allocator);
if (!state) {
return JZX_ERR_NO_MEMORY;
}
jzx_spawn_opts opts = {
.behavior = jzx_supervisor_behavior,
.state = state,
.supervisor = parent,
.mailbox_cap = 0,
.name = NULL,
};
jzx_actor_id sup_id = 0;
jzx_err err = jzx_spawn(loop, &opts, &sup_id);
if (err != JZX_OK) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return err;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, sup_id);
if (!sup_actor) {
jzx_supervisor_state_destroy(state, &loop->allocator);
return JZX_ERR_UNKNOWN;
}
sup_actor->supervisor_state = state;
for (size_t i = 0; i < state->child_count; ++i) {
err = jzx_supervisor_spawn_child(loop, sup_id, &state->children[i]);
if (err != JZX_OK) {
(void)jzx_actor_fail(loop, sup_id);
return err;
}
}
if (out_id) {
*out_id = sup_id;
}
return JZX_OK;
}
jzx_err jzx_supervisor_child_id(jzx_loop* loop, jzx_actor_id supervisor, size_t index,
jzx_actor_id* out_id) {
if (!loop || !out_id) {
return JZX_ERR_INVALID_ARG;
}
jzx_actor* sup_actor = jzx_actor_table_lookup(&loop->actors, supervisor);
if (!sup_actor || !sup_actor->supervisor_state) {
return JZX_ERR_NO_SUCH_ACTOR;
}
if (index >= sup_actor->supervisor_state->child_count) {
return JZX_ERR_INVALID_ARG;
}
*out_id = sup_actor->supervisor_state->children[index].id;
return JZX_OK;
}
// -----------------------------------------------------------------------------
// Timers & IO
// -----------------------------------------------------------------------------
jzx_err jzx_send_after(jzx_loop* loop, jzx_actor_id target, uint32_t ms, void* data, size_t len,
uint32_t tag, jzx_timer_id* out_timer) {
if (!loop) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, target)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_timer_entry* entry = (jzx_timer_entry*)jzx_alloc(&loop->allocator, sizeof(jzx_timer_entry));
if (!entry) {
return JZX_ERR_NO_MEMORY;
}
entry->target = target;
entry->data = data;
entry->len = len;
entry->tag = tag;
entry->next = NULL;
pthread_mutex_lock(&loop->timer_mutex);
entry->id = loop->next_timer_id++;
entry->due_ms = jzx_now_ms() + (uint64_t)ms;
jzx_timer_insert_locked(loop, entry);
pthread_cond_broadcast(&loop->timer_cond);
pthread_mutex_unlock(&loop->timer_mutex);
if (out_timer) {
*out_timer = entry->id;
}
return JZX_OK;
}
jzx_err jzx_cancel_timer(jzx_loop* loop, jzx_timer_id timer) {
if (!loop || !loop->timer_mutex_initialized) {
return JZX_ERR_INVALID_ARG;
}
pthread_mutex_lock(&loop->timer_mutex);
jzx_timer_entry* prev = NULL;
jzx_timer_entry* cur = loop->timer_head;
while (cur) {
if (cur->id == timer) {
if (prev) {
prev->next = cur->next;
} else {
loop->timer_head = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
jzx_free(&loop->allocator, cur);
return JZX_OK;
}
prev = cur;
cur = cur->next;
}
pthread_mutex_unlock(&loop->timer_mutex);
return JZX_ERR_TIMER_INVALID;
}
jzx_err jzx_watch_fd(jzx_loop* loop, int fd, jzx_actor_id owner, uint32_t interest) {
if (!loop || !loop->xev || fd < 0 || interest == 0) {
return JZX_ERR_INVALID_ARG;
}
if (!jzx_actor_table_lookup(&loop->actors, owner)) {
return JZX_ERR_NO_SUCH_ACTOR;
}
jzx_io_watch* existing = jzx_io_find(loop, fd, NULL);
if (existing) {
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
return err;
}
existing->owner = owner;
existing->interest = interest;
return JZX_OK;
}
if (loop->io_count == loop->io_capacity) {
jzx_err err = jzx_io_reserve(loop, loop->io_capacity * 2);
if (err != JZX_OK) {
return err;
}
}
uint32_t idx = loop->io_count;
loop->io_watchers[idx] = (jzx_io_watch){
.fd = fd,
.owner = owner,
.interest = interest,
.active = 1,
};
loop->io_count = idx + 1;
jzx_err err = jzx_xev_watch_fd(loop->xev, loop, fd, interest);
if (err != JZX_OK) {
loop->io_count = idx;
memset(&loop->io_watchers[idx], 0, sizeof(loop->io_watchers[idx]));
return err;
}
return JZX_OK;
}
jzx_err jzx_unwatch_fd(jzx_loop* loop, int fd) {
if (!loop || fd < 0) {
return JZX_ERR_INVALID_ARG;
}
uint32_t idx = 0;
jzx_io_watch* entry = jzx_io_find(loop, fd, &idx);
if (!entry) {
return JZX_ERR_IO_NOT_WATCHED;
}
jzx_io_remove_index(loop, idx);
return JZX_OK;
}