Zig example — examples/zig/echo_server.zig
This is a libxev-backed TCP echo server built on top of libjzx’s I/O watcher mechanism.
It demonstrates:
- watching a listening socket for readability
- accepting clients and spawning one actor per connection
- watching each client socket for read/write readiness
- a simple “read → write back” state machine with backpressure
- correct memory ownership when receiving runtime-allocated I/O events
Cross-links
- Run it: Quickstart
- I/O APIs: watch/unwatch (
include/jzx/jzx.h) - Backend bridge: libxev integration (
src/jzx_xev.zig) - Under the hood: Runtime core (
src/jzx_runtime.c)
High-level architecture
There are two actor types:
- Listener actor
- owns the listening socket
- watches it for
READreadiness - accepts as many connections as possible per readiness notification
- spawns a connection actor for each accepted socket
- Connection actor
- owns a client socket and a buffer
- watches for
READwhen it wants to receive data - watches for
WRITEwhen it has pending data to flush
The runtime delivers I/O readiness as normal messages:
msg.tag == JZX_TAG_SYS_IOmsg.datapoints to ajzx_io_eventallocated by the loop
That means the actor must:
- cast
msg.datato*c.jzx_io_event - free it using
c.jzx_loop_free(loop, msg.data)
Imports and state structs
const std = @import("std");
const jzx = @import("jzx");
const c = jzx.c;
const posix = std.posix;
const ListenerState = struct {
fd: posix.socket_t,
};
const ConnState = struct {
fd: posix.socket_t,
pending_len: usize = 0,
pending_off: usize = 0,
buf: [4096]u8 = undefined,
};
What’s here:
posix = std.posixgives access to low-level syscalls (socket, accept, read, write, fcntl).ListenerStateis the listener actor’s state:- a single field
fdfor the listening socket.
- a single field
ConnStateis per-connection state:fd: client socketbuf: fixed buffer for reads and writespending_len/pending_off: “how much data remains to be written”
The “pending” fields implement a simple, explicit backpressure model:
- If we couldn’t write everything immediately, we remember how much remains and wait for a
WRITEreadiness event.
Helper: set sockets non-blocking
Accepted sockets need to be non-blocking, because readiness-driven loops must treat WouldBlock as “try again later”.
fn setNonBlocking(fd: posix.fd_t) !void {
const raw = try posix.fcntl(fd, posix.F.GETFL, 0);
var flags: posix.O = @bitCast(@as(u32, @intCast(raw)));
flags.NONBLOCK = true;
_ = try posix.fcntl(fd, posix.F.SETFL, @as(usize, @intCast(@as(u32, @bitCast(flags)))));
}
What this does:
fcntl(GETFL)reads current file status flags.- It sets the
NONBLOCKbit. fcntl(SETFL)writes the updated flags back.
The conversions (@intCast, @bitCast) are Zig’s way of making the integer/bitfield transitions explicit.
Helper: set close-on-exec
This prevents leaking sockets into child processes if the program ever execs something.
fn setCloexec(fd: posix.fd_t) !void {
const flags = try posix.fcntl(fd, posix.F.GETFD, 0);
_ = try posix.fcntl(fd, posix.F.SETFD, flags | posix.FD_CLOEXEC);
}
Connection actor behavior
The connection actor handles two kinds of readiness:
READ: data available from clientWRITE: socket can accept more bytes (finish flushing pending output)
Decode runtime I/O messages
fn connBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
if (msg_ptr.tag != c.JZX_TAG_SYS_IO or msg_ptr.data == null) {
return c.JZX_BEHAVIOR_OK;
}
const state = @as(*ConnState, @ptrCast(@alignCast(ctx_ptr.state.?)));
const ev = @as(*c.jzx_io_event, @ptrCast(@alignCast(msg_ptr.data.?)));
defer c.jzx_loop_free(ctx_ptr.loop.?, msg_ptr.data.?);
Key details:
- This actor ignores all non-I/O messages:
msg_ptr.tag != JZX_TAG_SYS_IO→ ignoremsg_ptr.data == null→ ignore (defensive)
ctx_ptr.state.?is the per-actor state pointer set during spawn.- It is cast to
*ConnState.
- It is cast to
msg_ptr.data.?is cast to*c.jzx_io_event.defer c.jzx_loop_free(...)is crucial:- The runtime allocates the
jzx_io_eventusing the loop’s allocator. - The actor must return it to that same allocator.
- The runtime allocates the
WRITE readiness: flush pending output
If there’s pending output, a WRITE readiness notification is our chance to flush more bytes.
if ((ev.readiness & c.JZX_IO_WRITE) != 0 and state.pending_off < state.pending_len) {
const slice = state.buf[state.pending_off..state.pending_len];
const wrote = posix.write(state.fd, slice) catch |err| switch (err) {
error.WouldBlock => 0,
else => {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
},
};
state.pending_off += wrote;
if (state.pending_off >= state.pending_len) {
state.pending_len = 0;
state.pending_off = 0;
_ = c.jzx_watch_fd(ctx_ptr.loop.?, state.fd, ctx_ptr.self, c.JZX_IO_READ);
}
}
What’s happening:
slice = buf[pending_off..pending_len]selects the unsent portion.posix.writeattempts to write it:WouldBlockmeans “try again later” → wrote 0 bytes.- Any other error is treated as fatal:
- close fd
- free state
- stop actor
- After writing enough bytes to finish the pending range:
- reset pending counters
- change interest back to
READ(we’re ready to read again)
Why flip the watch interest:
- It avoids continuous
WRITEnotifications when we don’t have anything to write.
READ readiness: read, then echo back
The echo path reads into buf, then immediately attempts to write the same bytes back.
if ((ev.readiness & c.JZX_IO_READ) != 0 and state.pending_len == 0) {
const n = posix.read(state.fd, state.buf[0..]) catch |err| switch (err) {
error.WouldBlock => return c.JZX_BEHAVIOR_OK,
else => {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
},
};
if (n == 0) {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
}
state.pending_len = n;
state.pending_off = 0;
const wrote = posix.write(state.fd, state.buf[0..n]) catch |err| switch (err) {
error.WouldBlock => 0,
else => {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
},
};
state.pending_off = wrote;
if (state.pending_off < state.pending_len) {
_ = c.jzx_watch_fd(ctx_ptr.loop.?, state.fd, ctx_ptr.self, c.JZX_IO_WRITE);
} else {
state.pending_len = 0;
state.pending_off = 0;
_ = c.jzx_watch_fd(ctx_ptr.loop.?, state.fd, ctx_ptr.self, c.JZX_IO_READ);
}
}
Important details:
- The read path only runs if
pending_len == 0.- Why: if we already have unsent output, we don’t want to read more and grow buffering.
- This is a simple “one buffer in flight” rule.
n == 0means EOF (client closed connection).- This actor closes fd, frees state, and stops.
- It tries to write immediately after reading:
- If the socket accepts all bytes, we go back to
READ. - If the socket accepts only some bytes (or wouldblock), we switch to
WRITEand wait for the next writable event.
- If the socket accepts all bytes, we go back to
Final cleanup invariants
if (state.pending_off >= state.pending_len) {
state.pending_len = 0;
state.pending_off = 0;
}
return c.JZX_BEHAVIOR_OK;
}
This ensures:
- if we “caught up” on writes, pending state resets to “no pending output”.
Listener actor behavior
The listener actor:
- receives
SYS_IOevents for the listening socket - loops
accept()until it would block (drain the accept queue) - spawns a connection actor for each accepted socket
- registers the new socket for
READevents owned by the connection actor
Decode runtime I/O messages
fn listenerBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
if (msg_ptr.tag != c.JZX_TAG_SYS_IO or msg_ptr.data == null) {
return c.JZX_BEHAVIOR_OK;
}
const state = @as(*ListenerState, @ptrCast(@alignCast(ctx_ptr.state.?)));
const ev = @as(*c.jzx_io_event, @ptrCast(@alignCast(msg_ptr.data.?)));
defer c.jzx_loop_free(ctx_ptr.loop.?, msg_ptr.data.?);
if ((ev.readiness & c.JZX_IO_READ) == 0) {
return c.JZX_BEHAVIOR_OK;
}
Same ownership rule as the connection actor:
msg.datais runtime-allocatedjzx_io_event→ must be freed viajzx_loop_free.
Accept loop + spawn connection actor
while (true) {
const client_fd = posix.accept(state.fd, null, null, 0) catch |err| switch (err) {
error.WouldBlock => break,
else => {
std.debug.print("[listener] accept error: {s}\n", .{@errorName(err)});
break;
},
};
setNonBlocking(client_fd) catch {};
setCloexec(client_fd) catch {};
const conn_state = std.heap.c_allocator.create(ConnState) catch {
posix.close(client_fd);
continue;
};
conn_state.* = .{ .fd = client_fd };
var opts = c.jzx_spawn_opts{
.behavior = connBehavior,
.state = conn_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
const rc = c.jzx_spawn(ctx_ptr.loop.?, &opts, &actor_id);
if (rc != c.JZX_OK) {
std.heap.c_allocator.destroy(conn_state);
posix.close(client_fd);
continue;
}
const watch_rc = c.jzx_watch_fd(ctx_ptr.loop.?, client_fd, actor_id, c.JZX_IO_READ);
if (watch_rc != c.JZX_OK) {
posix.close(client_fd);
std.heap.c_allocator.destroy(conn_state);
_ = c.jzx_actor_stop(ctx_ptr.loop.?, actor_id);
continue;
}
std.debug.print("[listener] accepted fd={d} actor={d}\n", .{ client_fd, actor_id });
}
Key details:
- The accept loop drains the kernel accept queue:
- it keeps accepting until
WouldBlock. - This avoids “one accept per readiness event” under load.
- it keeps accepting until
ConnStateis allocated withstd.heap.c_allocator.- Unlike
jzx_io_event, this is user-owned memory; the connection actor frees it on stop.
- Unlike
- After spawning the connection actor, the listener registers the client fd with the runtime:
owner = actor_id→ I/O events for that fd are delivered to the connection actor.
Keep the listener alive
return c.JZX_BEHAVIOR_OK;
}
Why this matters:
- The listener should keep accepting new connections until the process exits or the loop is stopped.
JZX_BEHAVIOR_OKis the “stay alive” result; it tells the runtime to keep the actor runnable for future messages.
main(): bind, listen, spawn listener, watch fd
The main function:
- parses an optional port argument
- creates a listening socket
- spawns the listener actor
- registers the listening socket for
READreadiness
pub fn main() !void {
const argv = try std.process.argsAlloc(std.heap.c_allocator);
defer std.process.argsFree(std.heap.c_allocator, argv);
const port: u16 = if (argv.len >= 2) blk: {
break :blk std.fmt.parseUnsigned(u16, argv[1], 10) catch 5555;
} else 5555;
var loop = try jzx.Loop.create(null);
defer loop.deinit();
const addr = try std.net.Address.parseIp4("0.0.0.0", port);
const listen_fd = try posix.socket(addr.any.family, posix.SOCK.STREAM | posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC, 0);
defer posix.close(listen_fd);
var yes: c_int = 1;
try posix.setsockopt(listen_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, std.mem.asBytes(&yes));
try posix.bind(listen_fd, &addr.any, addr.getOsSockLen());
try posix.listen(listen_fd, 128);
var listener_state = ListenerState{ .fd = listen_fd };
var listener_opts = c.jzx_spawn_opts{
.behavior = listenerBehavior,
.state = &listener_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = "listener",
};
var listener_id: c.jzx_actor_id = 0;
if (c.jzx_spawn(loop.ptr, &listener_opts, &listener_id) != c.JZX_OK) {
std.debug.print("failed to spawn listener actor\n", .{});
return;
}
try loop.watchFd(@intCast(listen_fd), listener_id, c.JZX_IO_READ);
std.debug.print("echo server listening on 0.0.0.0:{d}\n", .{port});
std.debug.print("connect with: nc 127.0.0.1 {d}\n", .{port});
try loop.run();
}
Key details:
- The listening socket is created with
NONBLOCKandCLOEXECalready. ListenerStatelives on the stack:- safe because
loop.run()doesn’t return until the program exits or loop stops.
- safe because
loop.watchFd(listen_fd, listener_id, READ)registers the listen fd with the runtime:- The runtime will deliver
SYS_IOmessages tolistener_idwhen clients connect.
- The runtime will deliver
Full listing (for reference)
const std = @import("std");
const jzx = @import("jzx");
const c = jzx.c;
const posix = std.posix;
const ListenerState = struct {
fd: posix.socket_t,
};
const ConnState = struct {
fd: posix.socket_t,
pending_len: usize = 0,
pending_off: usize = 0,
buf: [4096]u8 = undefined,
};
fn setNonBlocking(fd: posix.fd_t) !void {
const raw = try posix.fcntl(fd, posix.F.GETFL, 0);
var flags: posix.O = @bitCast(@as(u32, @intCast(raw)));
flags.NONBLOCK = true;
_ = try posix.fcntl(fd, posix.F.SETFL, @as(usize, @intCast(@as(u32, @bitCast(flags)))));
}
fn setCloexec(fd: posix.fd_t) !void {
const flags = try posix.fcntl(fd, posix.F.GETFD, 0);
_ = try posix.fcntl(fd, posix.F.SETFD, flags | posix.FD_CLOEXEC);
}
fn connBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
if (msg_ptr.tag != c.JZX_TAG_SYS_IO or msg_ptr.data == null) {
return c.JZX_BEHAVIOR_OK;
}
const state = @as(*ConnState, @ptrCast(@alignCast(ctx_ptr.state.?)));
const ev = @as(*c.jzx_io_event, @ptrCast(@alignCast(msg_ptr.data.?)));
defer c.jzx_loop_free(ctx_ptr.loop.?, msg_ptr.data.?);
if ((ev.readiness & c.JZX_IO_WRITE) != 0 and state.pending_off < state.pending_len) {
const slice = state.buf[state.pending_off..state.pending_len];
const wrote = posix.write(state.fd, slice) catch |err| switch (err) {
error.WouldBlock => 0,
else => {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
},
};
state.pending_off += wrote;
if (state.pending_off >= state.pending_len) {
state.pending_len = 0;
state.pending_off = 0;
_ = c.jzx_watch_fd(ctx_ptr.loop.?, state.fd, ctx_ptr.self, c.JZX_IO_READ);
}
}
if ((ev.readiness & c.JZX_IO_READ) != 0 and state.pending_len == 0) {
const n = posix.read(state.fd, state.buf[0..]) catch |err| switch (err) {
error.WouldBlock => return c.JZX_BEHAVIOR_OK,
else => {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
},
};
if (n == 0) {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
}
state.pending_len = n;
state.pending_off = 0;
const wrote = posix.write(state.fd, state.buf[0..n]) catch |err| switch (err) {
error.WouldBlock => 0,
else => {
posix.close(state.fd);
std.heap.c_allocator.destroy(state);
return c.JZX_BEHAVIOR_STOP;
},
};
state.pending_off = wrote;
if (state.pending_off < state.pending_len) {
_ = c.jzx_watch_fd(ctx_ptr.loop.?, state.fd, ctx_ptr.self, c.JZX_IO_WRITE);
} else {
state.pending_len = 0;
state.pending_off = 0;
_ = c.jzx_watch_fd(ctx_ptr.loop.?, state.fd, ctx_ptr.self, c.JZX_IO_READ);
}
}
if (state.pending_off >= state.pending_len) {
state.pending_len = 0;
state.pending_off = 0;
}
return c.JZX_BEHAVIOR_OK;
}
fn listenerBehavior(ctx: [*c]c.jzx_context, msg: [*c]const c.jzx_message) callconv(.c) c.jzx_behavior_result {
const ctx_ptr = @as(*c.jzx_context, @ptrCast(ctx));
const msg_ptr = @as(*const c.jzx_message, @ptrCast(msg));
if (msg_ptr.tag != c.JZX_TAG_SYS_IO or msg_ptr.data == null) {
return c.JZX_BEHAVIOR_OK;
}
const state = @as(*ListenerState, @ptrCast(@alignCast(ctx_ptr.state.?)));
const ev = @as(*c.jzx_io_event, @ptrCast(@alignCast(msg_ptr.data.?)));
defer c.jzx_loop_free(ctx_ptr.loop.?, msg_ptr.data.?);
if ((ev.readiness & c.JZX_IO_READ) == 0) {
return c.JZX_BEHAVIOR_OK;
}
while (true) {
const client_fd = posix.accept(state.fd, null, null, 0) catch |err| switch (err) {
error.WouldBlock => break,
else => {
std.debug.print("[listener] accept error: {s}\n", .{@errorName(err)});
break;
},
};
setNonBlocking(client_fd) catch {};
setCloexec(client_fd) catch {};
const conn_state = std.heap.c_allocator.create(ConnState) catch {
posix.close(client_fd);
continue;
};
conn_state.* = .{ .fd = client_fd };
var opts = c.jzx_spawn_opts{
.behavior = connBehavior,
.state = conn_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = null,
};
var actor_id: c.jzx_actor_id = 0;
const rc = c.jzx_spawn(ctx_ptr.loop.?, &opts, &actor_id);
if (rc != c.JZX_OK) {
std.heap.c_allocator.destroy(conn_state);
posix.close(client_fd);
continue;
}
const watch_rc = c.jzx_watch_fd(ctx_ptr.loop.?, client_fd, actor_id, c.JZX_IO_READ);
if (watch_rc != c.JZX_OK) {
posix.close(client_fd);
std.heap.c_allocator.destroy(conn_state);
_ = c.jzx_actor_stop(ctx_ptr.loop.?, actor_id);
continue;
}
std.debug.print("[listener] accepted fd={d} actor={d}\n", .{ client_fd, actor_id });
}
return c.JZX_BEHAVIOR_OK;
}
pub fn main() !void {
const argv = try std.process.argsAlloc(std.heap.c_allocator);
defer std.process.argsFree(std.heap.c_allocator, argv);
const port: u16 = if (argv.len >= 2) blk: {
break :blk std.fmt.parseUnsigned(u16, argv[1], 10) catch 5555;
} else 5555;
var loop = try jzx.Loop.create(null);
defer loop.deinit();
const addr = try std.net.Address.parseIp4("0.0.0.0", port);
const listen_fd = try posix.socket(addr.any.family, posix.SOCK.STREAM | posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC, 0);
defer posix.close(listen_fd);
var yes: c_int = 1;
try posix.setsockopt(listen_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, std.mem.asBytes(&yes));
try posix.bind(listen_fd, &addr.any, addr.getOsSockLen());
try posix.listen(listen_fd, 128);
var listener_state = ListenerState{ .fd = listen_fd };
var listener_opts = c.jzx_spawn_opts{
.behavior = listenerBehavior,
.state = &listener_state,
.supervisor = 0,
.mailbox_cap = 0,
.name = "listener",
};
var listener_id: c.jzx_actor_id = 0;
if (c.jzx_spawn(loop.ptr, &listener_opts, &listener_id) != c.JZX_OK) {
std.debug.print("failed to spawn listener actor\n", .{});
return;
}
try loop.watchFd(@intCast(listen_fd), listener_id, c.JZX_IO_READ);
std.debug.print("echo server listening on 0.0.0.0:{d}\n", .{port});
std.debug.print("connect with: nc 127.0.0.1 {d}\n", .{port});
try loop.run();
}