update to latest zig (0.12.0-dev.3480+9dac8db2d)

This commit is contained in:
Ali Chraghi
2024-03-28 14:45:40 +03:30
parent e93090cb89
commit ef8a4a75eb
8 changed files with 468 additions and 466 deletions

View File

@ -1,7 +1,7 @@
.{ .{
.name = "iofthetiger", .name = "iofthetiger",
.version = "0.1.0", .version = "0.1.0",
.minimum_zig_version = "0.12.0-dev.2334+aef1da163", .minimum_zig_version = "0.12.0-dev.3480+9dac8db2d",
.paths = .{ .paths = .{
"build.zig", "build.zig",
"build.zig.zon", "build.zig.zon",

View File

@ -1,5 +1,6 @@
const std = @import("std"); const std = @import("std");
const os = std.os; const os = std.os;
const posix = std.posix;
const mem = std.mem; const mem = std.mem;
const assert = std.debug.assert; const assert = std.debug.assert;
const log = std.log.scoped(.io); const log = std.log.scoped(.io);
@ -12,7 +13,7 @@ const sector_size = 4096;
const direct_io = true; const direct_io = true;
pub const IO = struct { pub const IO = struct {
kq: os.fd_t, kq: posix.fd_t,
time: Time = .{}, time: Time = .{},
io_inflight: usize = 0, io_inflight: usize = 0,
timeouts: FIFO(Completion) = .{ .name = "io_timeouts" }, timeouts: FIFO(Completion) = .{ .name = "io_timeouts" },
@ -23,14 +24,14 @@ pub const IO = struct {
_ = entries; _ = entries;
_ = flags; _ = flags;
const kq = try os.kqueue(); const kq = try posix.kqueue();
assert(kq > -1); assert(kq > -1);
return IO{ .kq = kq }; return IO{ .kq = kq };
} }
pub fn deinit(self: *IO) void { pub fn deinit(self: *IO) void {
assert(self.kq > -1); assert(self.kq > -1);
os.close(self.kq); posix.close(self.kq);
self.kq = -1; self.kq = -1;
} }
@ -76,7 +77,7 @@ pub const IO = struct {
fn flush(self: *IO, wait_for_completions: bool) !void { fn flush(self: *IO, wait_for_completions: bool) !void {
var io_pending = self.io_pending.peek(); var io_pending = self.io_pending.peek();
var events: [256]os.Kevent = undefined; var events: [256]posix.Kevent = undefined;
// Check timeouts and fill events with completions in io_pending // Check timeouts and fill events with completions in io_pending
// (they will be submitted through kevent). // (they will be submitted through kevent).
@ -87,7 +88,7 @@ pub const IO = struct {
// Only call kevent() if we need to submit io events or if we need to wait for completions. // Only call kevent() if we need to submit io events or if we need to wait for completions.
if (change_events > 0 or self.completed.empty()) { if (change_events > 0 or self.completed.empty()) {
// Zero timeouts for kevent() implies a non-blocking poll // Zero timeouts for kevent() implies a non-blocking poll
var ts = std.mem.zeroes(os.timespec); var ts = std.mem.zeroes(posix.timespec);
// We need to wait (not poll) on kevent if there's nothing to submit or complete. // We need to wait (not poll) on kevent if there's nothing to submit or complete.
// We should never wait indefinitely (timeout_ptr = null for kevent) given: // We should never wait indefinitely (timeout_ptr = null for kevent) given:
@ -103,7 +104,7 @@ pub const IO = struct {
} }
} }
const new_events = try os.kevent( const new_events = try posix.kevent(
self.kq, self.kq,
events[0..change_events], events[0..change_events],
events[0..events.len], events[0..events.len],
@ -133,25 +134,25 @@ pub const IO = struct {
} }
} }
fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { fn flush_io(_: *IO, events: []posix.Kevent, io_pending_top: *?*Completion) usize {
for (events, 0..) |*event, flushed| { for (events, 0..) |*event, flushed| {
const completion = io_pending_top.* orelse return flushed; const completion = io_pending_top.* orelse return flushed;
io_pending_top.* = completion.next; io_pending_top.* = completion.next;
const event_info = switch (completion.operation) { const event_info = switch (completion.operation) {
.accept => |op| [2]c_int{ op.socket, os.system.EVFILT_READ }, .accept => |op| [2]c_int{ op.socket, posix.system.EVFILT_READ },
.connect => |op| [2]c_int{ op.socket, os.system.EVFILT_WRITE }, .connect => |op| [2]c_int{ op.socket, posix.system.EVFILT_WRITE },
.read => |op| [2]c_int{ op.fd, os.system.EVFILT_READ }, .read => |op| [2]c_int{ op.fd, posix.system.EVFILT_READ },
.write => |op| [2]c_int{ op.fd, os.system.EVFILT_WRITE }, .write => |op| [2]c_int{ op.fd, posix.system.EVFILT_WRITE },
.recv => |op| [2]c_int{ op.socket, os.system.EVFILT_READ }, .recv => |op| [2]c_int{ op.socket, posix.system.EVFILT_READ },
.send => |op| [2]c_int{ op.socket, os.system.EVFILT_WRITE }, .send => |op| [2]c_int{ op.socket, posix.system.EVFILT_WRITE },
else => @panic("invalid completion operation queued for io"), else => @panic("invalid completion operation queued for io"),
}; };
event.* = .{ event.* = .{
.ident = @as(u32, @intCast(event_info[0])), .ident = @as(u32, @intCast(event_info[0])),
.filter = @as(i16, @intCast(event_info[1])), .filter = @as(i16, @intCast(event_info[1])),
.flags = os.system.EV_ADD | os.system.EV_ENABLE | os.system.EV_ONESHOT, .flags = posix.system.EV_ADD | posix.system.EV_ENABLE | posix.system.EV_ONESHOT,
.fflags = 0, .fflags = 0,
.data = 0, .data = 0,
.udata = @intFromPtr(completion), .udata = @intFromPtr(completion),
@ -198,29 +199,29 @@ pub const IO = struct {
const Operation = union(enum) { const Operation = union(enum) {
accept: struct { accept: struct {
socket: os.socket_t, socket: posix.socket_t,
}, },
close: struct { close: struct {
fd: os.fd_t, fd: posix.fd_t,
}, },
connect: struct { connect: struct {
socket: os.socket_t, socket: posix.socket_t,
address: std.net.Address, address: std.net.Address,
initiated: bool, initiated: bool,
}, },
read: struct { read: struct {
fd: os.fd_t, fd: posix.fd_t,
buf: [*]u8, buf: [*]u8,
len: u32, len: u32,
offset: u64, offset: u64,
}, },
recv: struct { recv: struct {
socket: os.socket_t, socket: posix.socket_t,
buf: [*]u8, buf: [*]u8,
len: u32, len: u32,
}, },
send: struct { send: struct {
socket: os.socket_t, socket: posix.socket_t,
buf: [*]const u8, buf: [*]const u8,
len: u32, len: u32,
}, },
@ -228,7 +229,7 @@ pub const IO = struct {
expires: u64, expires: u64,
}, },
write: struct { write: struct {
fd: os.fd_t, fd: posix.fd_t,
buf: [*]const u8, buf: [*]const u8,
len: u32, len: u32,
offset: u64, offset: u64,
@ -288,7 +289,7 @@ pub const IO = struct {
} }
} }
pub const AcceptError = os.AcceptError || os.SetSockOptError; pub const AcceptError = posix.AcceptError || posix.SetSockOptError;
pub fn accept( pub fn accept(
self: *IO, self: *IO,
@ -297,10 +298,10 @@ pub const IO = struct {
comptime callback: fn ( comptime callback: fn (
context: Context, context: Context,
completion: *Completion, completion: *Completion,
result: AcceptError!os.socket_t, result: AcceptError!posix.socket_t,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
) void { ) void {
self.submit( self.submit(
context, context,
@ -311,21 +312,21 @@ pub const IO = struct {
.socket = socket, .socket = socket,
}, },
struct { struct {
fn do_operation(op: anytype) AcceptError!os.socket_t { fn do_operation(op: anytype) AcceptError!posix.socket_t {
const fd = try os.accept( const fd = try posix.accept(
op.socket, op.socket,
null, null,
null, null,
os.SOCK.NONBLOCK | os.SOCK.CLOEXEC, posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC,
); );
errdefer os.close(fd); errdefer posix.close(fd);
// Darwin doesn't support os.MSG_NOSIGNAL to avoid getting SIGPIPE on socket send(). // Darwin doesn't support posix.MSG_NOSIGNAL to avoid getting SIGPIPE on socket send().
// Instead, it uses the SO_NOSIGPIPE socket option which does the same for all send()s. // Instead, it uses the SO_NOSIGPIPE socket option which does the same for all send()s.
os.setsockopt( posix.setsockopt(
fd, fd,
os.SOL.SOCKET, posix.SOL.SOCKET,
os.SO.NOSIGPIPE, posix.SO.NOSIGPIPE,
&mem.toBytes(@as(c_int, 1)), &mem.toBytes(@as(c_int, 1)),
) catch |err| return switch (err) { ) catch |err| return switch (err) {
error.TimeoutTooBig => unreachable, error.TimeoutTooBig => unreachable,
@ -346,7 +347,7 @@ pub const IO = struct {
DiskQuota, DiskQuota,
InputOutput, InputOutput,
NoSpaceLeft, NoSpaceLeft,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn close( pub fn close(
self: *IO, self: *IO,
@ -358,7 +359,7 @@ pub const IO = struct {
result: CloseError!void, result: CloseError!void,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
) void { ) void {
self.submit( self.submit(
context, context,
@ -370,19 +371,19 @@ pub const IO = struct {
}, },
struct { struct {
fn do_operation(op: anytype) CloseError!void { fn do_operation(op: anytype) CloseError!void {
return switch (os.errno(os.system.close(op.fd))) { return switch (posix.errno(posix.system.close(op.fd))) {
.SUCCESS => {}, .SUCCESS => {},
.BADF => error.FileDescriptorInvalid, .BADF => error.FileDescriptorInvalid,
.INTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 .INTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
.IO => error.InputOutput, .IO => error.InputOutput,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
} }
}, },
); );
} }
pub const ConnectError = os.ConnectError; pub const ConnectError = posix.ConnectError;
pub fn connect( pub fn connect(
self: *IO, self: *IO,
@ -394,7 +395,7 @@ pub const IO = struct {
result: ConnectError!void, result: ConnectError!void,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
address: std.net.Address, address: std.net.Address,
) void { ) void {
self.submit( self.submit(
@ -412,8 +413,8 @@ pub const IO = struct {
// Don't call connect after being rescheduled by io_pending as it gives EISCONN. // Don't call connect after being rescheduled by io_pending as it gives EISCONN.
// Instead, check the socket error to see if has been connected successfully. // Instead, check the socket error to see if has been connected successfully.
const result = switch (op.initiated) { const result = switch (op.initiated) {
true => os.getsockoptError(op.socket), true => posix.getsockoptError(op.socket),
else => os.connect(op.socket, &op.address.any, op.address.getOsSockLen()), else => posix.connect(op.socket, &op.address.any, op.address.getOsSockLen()),
}; };
op.initiated = true; op.initiated = true;
@ -433,7 +434,7 @@ pub const IO = struct {
SystemResources, SystemResources,
Unseekable, Unseekable,
ConnectionTimedOut, ConnectionTimedOut,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn read( pub fn read(
self: *IO, self: *IO,
@ -445,7 +446,7 @@ pub const IO = struct {
result: ReadError!usize, result: ReadError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
buffer: []u8, buffer: []u8,
offset: u64, offset: u64,
) void { ) void {
@ -463,13 +464,13 @@ pub const IO = struct {
struct { struct {
fn do_operation(op: anytype) ReadError!usize { fn do_operation(op: anytype) ReadError!usize {
while (true) { while (true) {
const rc = os.system.pread( const rc = posix.system.pread(
op.fd, op.fd,
op.buf, op.buf,
op.len, op.len,
@as(isize, @bitCast(op.offset)), @as(isize, @bitCast(op.offset)),
); );
return switch (os.errno(rc)) { return switch (posix.errno(rc)) {
.SUCCESS => @as(usize, @intCast(rc)), .SUCCESS => @as(usize, @intCast(rc)),
.INTR => continue, .INTR => continue,
.AGAIN => error.WouldBlock, .AGAIN => error.WouldBlock,
@ -485,7 +486,7 @@ pub const IO = struct {
.OVERFLOW => error.Unseekable, .OVERFLOW => error.Unseekable,
.SPIPE => error.Unseekable, .SPIPE => error.Unseekable,
.TIMEDOUT => error.ConnectionTimedOut, .TIMEDOUT => error.ConnectionTimedOut,
else => |err| os.unexpectedErrno(err), else => |err| posix.unexpectedErrno(err),
}; };
} }
} }
@ -493,7 +494,7 @@ pub const IO = struct {
); );
} }
pub const RecvError = os.RecvFromError; pub const RecvError = posix.RecvFromError;
pub fn recv( pub fn recv(
self: *IO, self: *IO,
@ -505,7 +506,7 @@ pub const IO = struct {
result: RecvError!usize, result: RecvError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
buffer: []u8, buffer: []u8,
) void { ) void {
self.submit( self.submit(
@ -520,13 +521,13 @@ pub const IO = struct {
}, },
struct { struct {
fn do_operation(op: anytype) RecvError!usize { fn do_operation(op: anytype) RecvError!usize {
return os.recv(op.socket, op.buf[0..op.len], 0); return posix.recv(op.socket, op.buf[0..op.len], 0);
} }
}, },
); );
} }
pub const SendError = os.SendError; pub const SendError = posix.SendError;
pub fn send( pub fn send(
self: *IO, self: *IO,
@ -538,7 +539,7 @@ pub const IO = struct {
result: SendError!usize, result: SendError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
buffer: []const u8, buffer: []const u8,
) void { ) void {
self.submit( self.submit(
@ -553,13 +554,13 @@ pub const IO = struct {
}, },
struct { struct {
fn do_operation(op: anytype) SendError!usize { fn do_operation(op: anytype) SendError!usize {
return os.send(op.socket, op.buf[0..op.len], 0); return posix.send(op.socket, op.buf[0..op.len], 0);
} }
}, },
); );
} }
pub const TimeoutError = error{Canceled} || os.UnexpectedError; pub const TimeoutError = error{Canceled} || posix.UnexpectedError;
pub fn timeout( pub fn timeout(
self: *IO, self: *IO,
@ -608,7 +609,7 @@ pub const IO = struct {
); );
} }
pub const WriteError = os.PWriteError; pub const WriteError = posix.PWriteError;
pub fn write( pub fn write(
self: *IO, self: *IO,
@ -620,7 +621,7 @@ pub const IO = struct {
result: WriteError!usize, result: WriteError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
buffer: []const u8, buffer: []const u8,
offset: u64, offset: u64,
) void { ) void {
@ -637,7 +638,7 @@ pub const IO = struct {
}, },
struct { struct {
fn do_operation(op: anytype) WriteError!usize { fn do_operation(op: anytype) WriteError!usize {
return os.pwrite(op.fd, op.buf[0..op.len], op.offset); return posix.pwrite(op.fd, op.buf[0..op.len], op.offset);
} }
}, },
); );
@ -646,23 +647,23 @@ pub const IO = struct {
pub const INVALID_SOCKET = -1; pub const INVALID_SOCKET = -1;
/// Creates a socket that can be used for async operations with the IO instance. /// Creates a socket that can be used for async operations with the IO instance.
pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t { pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !posix.socket_t {
_ = self; _ = self;
const fd = try os.socket(family, sock_type | os.SOCK.NONBLOCK, protocol); const fd = try posix.socket(family, sock_type | posix.SOCK.NONBLOCK, protocol);
errdefer os.closeSocket(fd); errdefer posix.close(fd);
// darwin doesn't support os.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE. // darwin doesn't support posix.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE.
try os.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1))); try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1)));
return fd; return fd;
} }
/// Opens a directory with read only access. /// Opens a directory with read only access.
pub fn open_dir(dir_path: []const u8) !os.fd_t { pub fn open_dir(dir_path: []const u8) !posix.fd_t {
return os.open(dir_path, os.O.CLOEXEC | os.O.RDONLY, 0); return posix.open(dir_path, posix.O.CLOEXEC | posix.O.RDONLY, 0);
} }
pub const INVALID_FILE: os.fd_t = -1; pub const INVALID_FILE: posix.fd_t = -1;
/// Opens or creates a journal file: /// Opens or creates a journal file:
/// - For reading and writing. /// - For reading and writing.
@ -673,11 +674,11 @@ pub const IO = struct {
/// The caller is responsible for ensuring that the parent directory inode is durable. /// The caller is responsible for ensuring that the parent directory inode is durable.
/// - Verifies that the file size matches the expected file size before returning. /// - Verifies that the file size matches the expected file size before returning.
pub fn open_file( pub fn open_file(
dir_fd: os.fd_t, dir_fd: posix.fd_t,
relative_path: []const u8, relative_path: []const u8,
size: u64, size: u64,
method: enum { create, create_or_open, open }, method: enum { create, create_or_open, open },
) !os.fd_t { ) !posix.fd_t {
assert(relative_path.len > 0); assert(relative_path.len > 0);
assert(size % sector_size == 0); assert(size % sector_size == 0);
@ -686,21 +687,21 @@ pub const IO = struct {
// Opening with O_DSYNC is essential for both durability and correctness. // Opening with O_DSYNC is essential for both durability and correctness.
// O_DSYNC enables us to omit fsync() calls in the data plane, since we sync to the disk on every write. // O_DSYNC enables us to omit fsync() calls in the data plane, since we sync to the disk on every write.
var flags: u32 = os.O.CLOEXEC | os.O.RDWR | os.O.DSYNC; var flags: u32 = posix.O.CLOEXEC | posix.O.RDWR | posix.O.DSYNC;
var mode: os.mode_t = 0; var mode: posix.mode_t = 0;
// TODO Document this and investigate whether this is in fact correct to set here. // TODO Document this and investigate whether this is in fact correct to set here.
if (@hasDecl(os.O, "LARGEFILE")) flags |= os.O.LARGEFILE; if (@hasDecl(posix.O, "LARGEFILE")) flags |= posix.O.LARGEFILE;
switch (method) { switch (method) {
.create => { .create => {
flags |= os.O.CREAT; flags |= posix.O.CREAT;
flags |= os.O.EXCL; flags |= posix.O.EXCL;
mode = 0o666; mode = 0o666;
log.info("creating \"{s}\"...", .{relative_path}); log.info("creating \"{s}\"...", .{relative_path});
}, },
.create_or_open => { .create_or_open => {
flags |= os.O.CREAT; flags |= posix.O.CREAT;
mode = 0o666; mode = 0o666;
log.info("opening or creating \"{s}\"...", .{relative_path}); log.info("opening or creating \"{s}\"...", .{relative_path});
}, },
@ -710,25 +711,25 @@ pub const IO = struct {
} }
// This is critical as we rely on O_DSYNC for fsync() whenever we write to the file: // This is critical as we rely on O_DSYNC for fsync() whenever we write to the file:
assert((flags & os.O.DSYNC) > 0); assert((flags & posix.O.DSYNC) > 0);
// Be careful with openat(2): "If pathname is absolute, then dirfd is ignored." (man page) // Be careful with openat(2): "If pathname is absolute, then dirfd is ignored." (man page)
assert(!std.fs.path.isAbsolute(relative_path)); assert(!std.fs.path.isAbsolute(relative_path));
const fd = try os.openat(dir_fd, relative_path, flags, mode); const fd = try posix.openat(dir_fd, relative_path, flags, mode);
// TODO Return a proper error message when the path exists or does not exist (init/start). // TODO Return a proper error message when the path exists or does not exist (init/start).
errdefer os.close(fd); errdefer posix.close(fd);
// TODO Check that the file is actually a file. // TODO Check that the file is actually a file.
// On darwin assume that Direct I/O is always supported. // On darwin assume that Direct I/O is always supported.
// Use F_NOCACHE to disable the page cache as O_DIRECT doesn't exist. // Use F_NOCACHE to disable the page cache as O_DIRECT doesn't exist.
if (direct_io) { if (direct_io) {
_ = try os.fcntl(fd, os.F.NOCACHE, 1); _ = try posix.fcntl(fd, posix.F.NOCACHE, 1);
} }
// Obtain an advisory exclusive lock that works only if all processes actually use flock(). // Obtain an advisory exclusive lock that works only if all processes actually use flock().
// LOCK_NB means that we want to fail the lock without waiting if another process has it. // LOCK_NB means that we want to fail the lock without waiting if another process has it.
os.flock(fd, os.LOCK.EX | os.LOCK.NB) catch |err| switch (err) { posix.flock(fd, posix.LOCK.EX | posix.LOCK.NB) catch |err| switch (err) {
error.WouldBlock => @panic("another process holds the data file lock"), error.WouldBlock => @panic("another process holds the data file lock"),
else => return err, else => return err,
}; };
@ -750,7 +751,7 @@ pub const IO = struct {
try fs_sync(dir_fd); try fs_sync(dir_fd);
// TODO Document that `size` is now `data_file_size_min` from `main.zig`. // TODO Document that `size` is now `data_file_size_min` from `main.zig`.
const stat = try os.fstat(fd); const stat = try posix.fstat(fd);
if (stat.size < size) @panic("data file inode size was truncated or corrupted"); if (stat.size < size) @panic("data file inode size was truncated or corrupted");
return fd; return fd;
@ -758,13 +759,13 @@ pub const IO = struct {
/// Darwin's fsync() syscall does not flush past the disk cache. We must use F_FULLFSYNC instead. /// Darwin's fsync() syscall does not flush past the disk cache. We must use F_FULLFSYNC instead.
/// https://twitter.com/TigerBeetleDB/status/1422491736224436225 /// https://twitter.com/TigerBeetleDB/status/1422491736224436225
fn fs_sync(fd: os.fd_t) !void { fn fs_sync(fd: posix.fd_t) !void {
_ = os.fcntl(fd, os.F.FULLFSYNC, 1) catch return os.fsync(fd); _ = posix.fcntl(fd, posix.F.FULLFSYNC, 1) catch return posix.fsync(fd);
} }
/// Allocates a file contiguously using fallocate() if supported. /// Allocates a file contiguously using fallocate() if supported.
/// Alternatively, writes to the last sector so that at least the file size is correct. /// Alternatively, writes to the last sector so that at least the file size is correct.
fn fs_allocate(fd: os.fd_t, size: u64) !void { fn fs_allocate(fd: posix.fd_t, size: u64) !void {
log.info("allocating {}...", .{std.fmt.fmtIntSizeBin(size)}); log.info("allocating {}...", .{std.fmt.fmtIntSizeBin(size)});
// Darwin doesn't have fallocate() but we can simulate it using fcntl()s. // Darwin doesn't have fallocate() but we can simulate it using fcntl()s.
@ -779,27 +780,27 @@ pub const IO = struct {
const fstore_t = extern struct { const fstore_t = extern struct {
fst_flags: c_uint, fst_flags: c_uint,
fst_posmode: c_int, fst_posmode: c_int,
fst_offset: os.off_t, fst_offset: posix.off_t,
fst_length: os.off_t, fst_length: posix.off_t,
fst_bytesalloc: os.off_t, fst_bytesalloc: posix.off_t,
}; };
var store = fstore_t{ var store = fstore_t{
.fst_flags = F_ALLOCATECONTIG | F_ALLOCATEALL, .fst_flags = F_ALLOCATECONTIG | F_ALLOCATEALL,
.fst_posmode = F_PEOFPOSMODE, .fst_posmode = F_PEOFPOSMODE,
.fst_offset = 0, .fst_offset = 0,
.fst_length = @as(os.off_t, @intCast(size)), .fst_length = @as(posix.off_t, @intCast(size)),
.fst_bytesalloc = 0, .fst_bytesalloc = 0,
}; };
// Try to pre-allocate contiguous space and fall back to default non-contiguous. // Try to pre-allocate contiguous space and fall back to default non-contiguous.
var res = os.system.fcntl(fd, os.F.PREALLOCATE, @intFromPtr(&store)); var res = posix.system.fcntl(fd, posix.F.PREALLOCATE, @intFromPtr(&store));
if (os.errno(res) != .SUCCESS) { if (posix.errno(res) != .SUCCESS) {
store.fst_flags = F_ALLOCATEALL; store.fst_flags = F_ALLOCATEALL;
res = os.system.fcntl(fd, os.F.PREALLOCATE, @intFromPtr(&store)); res = posix.system.fcntl(fd, posix.F.PREALLOCATE, @intFromPtr(&store));
} }
switch (os.errno(res)) { switch (posix.errno(res)) {
.SUCCESS => {}, .SUCCESS => {},
.ACCES => unreachable, // F_SETLK or F_SETSIZE of F_WRITEBOOTSTRAP .ACCES => unreachable, // F_SETLK or F_SETSIZE of F_WRITEBOOTSTRAP
.BADF => return error.FileDescriptorInvalid, .BADF => return error.FileDescriptorInvalid,
@ -811,11 +812,11 @@ pub const IO = struct {
.OVERFLOW => return error.FileTooBig, .OVERFLOW => return error.FileTooBig,
.SRCH => unreachable, // F_SETOWN .SRCH => unreachable, // F_SETOWN
.OPNOTSUPP => return error.OperationNotSupported, // not reported but need same error union .OPNOTSUPP => return error.OperationNotSupported, // not reported but need same error union
else => |errno| return os.unexpectedErrno(errno), else => |errno| return posix.unexpectedErrno(errno),
} }
// Now actually perform the allocation. // Now actually perform the allocation.
return os.ftruncate(fd, size) catch |err| switch (err) { return posix.ftruncate(fd, size) catch |err| switch (err) {
error.AccessDenied => error.PermissionDenied, error.AccessDenied => error.PermissionDenied,
else => |e| e, else => |e| e,
}; };

View File

@ -1,8 +1,8 @@
const std = @import("std"); const std = @import("std");
const assert = std.debug.assert; const assert = std.debug.assert;
const os = std.os; const posix = std.posix;
const linux = os.linux; const linux = std.os.linux;
const IO_Uring = linux.IO_Uring; const IoUring = linux.IoUring;
const io_uring_cqe = linux.io_uring_cqe; const io_uring_cqe = linux.io_uring_cqe;
const io_uring_sqe = linux.io_uring_sqe; const io_uring_sqe = linux.io_uring_sqe;
const log = std.log.scoped(.io); const log = std.log.scoped(.io);
@ -17,7 +17,7 @@ const direct_io_required = true;
const sector_size = 4096; const sector_size = 4096;
pub const IO = struct { pub const IO = struct {
ring: IO_Uring, ring: IoUring,
/// Operations not yet submitted to the kernel and waiting on available space in the /// Operations not yet submitted to the kernel and waiting on available space in the
/// submission queue. /// submission queue.
@ -31,13 +31,13 @@ pub const IO = struct {
pub fn init(entries: u12, flags: u32) !IO { pub fn init(entries: u12, flags: u32) !IO {
// Detect the linux version to ensure that we support all io_uring ops used. // Detect the linux version to ensure that we support all io_uring ops used.
const uts = std.os.uname(); const uts = std.posix.uname();
const version = try parse_dirty_semver(&uts.release); const version = try parse_dirty_semver(&uts.release);
if (version.order(std.SemanticVersion{ .major = 5, .minor = 5, .patch = 0 }) == .lt) { if (version.order(std.SemanticVersion{ .major = 5, .minor = 5, .patch = 0 }) == .lt) {
@panic("Linux kernel 5.5 or greater is required for io_uring OP_ACCEPT"); @panic("Linux kernel 5.5 or greater is required for io_uring OP_ACCEPT");
} }
return IO{ .ring = try IO_Uring.init(entries, flags) }; return IO{ .ring = try IoUring.init(entries, flags) };
} }
pub fn deinit(self: *IO) void { pub fn deinit(self: *IO) void {
@ -74,10 +74,10 @@ pub const IO = struct {
// We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the // We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the
// timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are // timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are
// dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC. // dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC.
var current_ts: os.timespec = undefined; var current_ts: posix.timespec = undefined;
os.clock_gettime(os.CLOCK.MONOTONIC, &current_ts) catch unreachable; posix.clock_gettime(posix.CLOCK.MONOTONIC, &current_ts) catch unreachable;
// The absolute CLOCK_MONOTONIC time after which we may return from this function: // The absolute CLOCK_MONOTONIC time after which we may return from this function:
const timeout_ts: os.linux.kernel_timespec = .{ const timeout_ts: linux.kernel_timespec = .{
.tv_sec = current_ts.tv_sec, .tv_sec = current_ts.tv_sec,
.tv_nsec = current_ts.tv_nsec + nanoseconds, .tv_nsec = current_ts.tv_nsec + nanoseconds,
}; };
@ -90,7 +90,7 @@ pub const IO = struct {
break :blk self.ring.get_sqe() catch unreachable; break :blk self.ring.get_sqe() catch unreachable;
}; };
// Submit an absolute timeout that will be canceled if any other SQE completes first: // Submit an absolute timeout that will be canceled if any other SQE completes first:
linux.io_uring_prep_timeout(timeout_sqe, &timeout_ts, 1, os.linux.IORING_TIMEOUT_ABS); timeout_sqe.prep_timeout(&timeout_ts, 1, linux.IORING_TIMEOUT_ABS);
timeout_sqe.user_data = 0; timeout_sqe.user_data = 0;
timeouts += 1; timeouts += 1;
@ -155,7 +155,7 @@ pub const IO = struct {
// it was completed due to the completion of an event, in which case `cqe.res` // it was completed due to the completion of an event, in which case `cqe.res`
// would be 0. It is possible for multiple timeout operations to complete at the // would be 0. It is possible for multiple timeout operations to complete at the
// same time if the nanoseconds value passed to `run_for_ns()` is very short. // same time if the nanoseconds value passed to `run_for_ns()` is very short.
if (-cqe.res == @intFromEnum(os.E.TIME)) etime.* = true; if (-cqe.res == @intFromEnum(posix.E.TIME)) etime.* = true;
continue; continue;
} }
const completion = @as(*Completion, @ptrFromInt(@as(usize, @intCast(cqe.user_data)))); const completion = @as(*Completion, @ptrFromInt(@as(usize, @intCast(cqe.user_data))));
@ -217,45 +217,41 @@ pub const IO = struct {
fn prep(completion: *Completion, sqe: *io_uring_sqe) void { fn prep(completion: *Completion, sqe: *io_uring_sqe) void {
switch (completion.operation) { switch (completion.operation) {
.accept => |*op| { .accept => |*op| {
linux.io_uring_prep_accept( sqe.prep_accept(
sqe,
op.socket, op.socket,
&op.address, &op.address,
&op.address_size, &op.address_size,
os.SOCK.CLOEXEC, posix.SOCK.CLOEXEC,
); );
}, },
.close => |op| { .close => |op| {
linux.io_uring_prep_close(sqe, op.fd); sqe.prep_close(op.fd);
}, },
.connect => |*op| { .connect => |*op| {
linux.io_uring_prep_connect( sqe.prep_connect(
sqe,
op.socket, op.socket,
&op.address.any, &op.address.any,
op.address.getOsSockLen(), op.address.getOsSockLen(),
); );
}, },
.read => |op| { .read => |op| {
linux.io_uring_prep_read( sqe.prep_read(
sqe,
op.fd, op.fd,
op.buffer[0..bufferLimit(op.buffer.len)], op.buffer[0..bufferLimit(op.buffer.len)],
op.offset, op.offset,
); );
}, },
.recv => |op| { .recv => |op| {
linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL); sqe.prep_recv(op.socket, op.buffer, posix.MSG.NOSIGNAL);
}, },
.send => |op| { .send => |op| {
linux.io_uring_prep_send(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL); sqe.prep_send(op.socket, op.buffer, posix.MSG.NOSIGNAL);
}, },
.timeout => |*op| { .timeout => |*op| {
linux.io_uring_prep_timeout(sqe, &op.timespec, 0, 0); sqe.prep_timeout(&op.timespec, 0, 0);
}, },
.write => |op| { .write => |op| {
linux.io_uring_prep_write( sqe.prep_write(
sqe,
op.fd, op.fd,
op.buffer[0..bufferLimit(op.buffer.len)], op.buffer[0..bufferLimit(op.buffer.len)],
op.offset, op.offset,
@ -268,9 +264,9 @@ pub const IO = struct {
fn complete(completion: *Completion) void { fn complete(completion: *Completion) void {
switch (completion.operation) { switch (completion.operation) {
.accept => { .accept => {
const result: anyerror!os.socket_t = blk: { const result: anyerror!posix.socket_t = blk: {
if (completion.result < 0) { if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) { const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => { .INTR => {
completion.io.enqueue(completion); completion.io.enqueue(completion);
return; return;
@ -288,11 +284,11 @@ pub const IO = struct {
.OPNOTSUPP => error.OperationNotSupported, .OPNOTSUPP => error.OperationNotSupported,
.PERM => error.PermissionDenied, .PERM => error.PermissionDenied,
.PROTO => error.ProtocolFailure, .PROTO => error.ProtocolFailure,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
break :blk err; break :blk err;
} else { } else {
break :blk @as(os.socket_t, @intCast(completion.result)); break :blk @as(posix.socket_t, @intCast(completion.result));
} }
}; };
completion.callback(completion.context, completion, &result); completion.callback(completion.context, completion, &result);
@ -300,13 +296,13 @@ pub const IO = struct {
.close => { .close => {
const result: anyerror!void = blk: { const result: anyerror!void = blk: {
if (completion.result < 0) { if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) { const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 .INTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425
.BADF => error.FileDescriptorInvalid, .BADF => error.FileDescriptorInvalid,
.DQUOT => error.DiskQuota, .DQUOT => error.DiskQuota,
.IO => error.InputOutput, .IO => error.InputOutput,
.NOSPC => error.NoSpaceLeft, .NOSPC => error.NoSpaceLeft,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
break :blk err; break :blk err;
} else { } else {
@ -318,7 +314,7 @@ pub const IO = struct {
.connect => { .connect => {
const result: anyerror!void = blk: { const result: anyerror!void = blk: {
if (completion.result < 0) { if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) { const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => { .INTR => {
completion.io.enqueue(completion); completion.io.enqueue(completion);
return; return;
@ -340,7 +336,7 @@ pub const IO = struct {
.PERM => error.PermissionDenied, .PERM => error.PermissionDenied,
.PROTOTYPE => error.ProtocolNotSupported, .PROTOTYPE => error.ProtocolNotSupported,
.TIMEDOUT => error.ConnectionTimedOut, .TIMEDOUT => error.ConnectionTimedOut,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
break :blk err; break :blk err;
} else { } else {
@ -352,7 +348,7 @@ pub const IO = struct {
.read => { .read => {
const result: anyerror!usize = blk: { const result: anyerror!usize = blk: {
if (completion.result < 0) { if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) { const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => { .INTR => {
completion.io.enqueue(completion); completion.io.enqueue(completion);
return; return;
@ -370,7 +366,7 @@ pub const IO = struct {
.OVERFLOW => error.Unseekable, .OVERFLOW => error.Unseekable,
.SPIPE => error.Unseekable, .SPIPE => error.Unseekable,
.TIMEDOUT => error.ConnectionTimedOut, .TIMEDOUT => error.ConnectionTimedOut,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
break :blk err; break :blk err;
} else { } else {
@ -382,7 +378,7 @@ pub const IO = struct {
.recv => { .recv => {
const result: anyerror!usize = blk: { const result: anyerror!usize = blk: {
if (completion.result < 0) { if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) { const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => { .INTR => {
completion.io.enqueue(completion); completion.io.enqueue(completion);
return; return;
@ -398,7 +394,7 @@ pub const IO = struct {
.CONNRESET => error.ConnectionResetByPeer, .CONNRESET => error.ConnectionResetByPeer,
.TIMEDOUT => error.ConnectionTimedOut, .TIMEDOUT => error.ConnectionTimedOut,
.OPNOTSUPP => error.OperationNotSupported, .OPNOTSUPP => error.OperationNotSupported,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
break :blk err; break :blk err;
} else { } else {
@ -410,7 +406,7 @@ pub const IO = struct {
.send => { .send => {
const result: anyerror!usize = blk: { const result: anyerror!usize = blk: {
if (completion.result < 0) { if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) { const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => { .INTR => {
completion.io.enqueue(completion); completion.io.enqueue(completion);
return; return;
@ -433,7 +429,7 @@ pub const IO = struct {
.OPNOTSUPP => error.OperationNotSupported, .OPNOTSUPP => error.OperationNotSupported,
.PIPE => error.BrokenPipe, .PIPE => error.BrokenPipe,
.TIMEDOUT => error.ConnectionTimedOut, .TIMEDOUT => error.ConnectionTimedOut,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
break :blk err; break :blk err;
} else { } else {
@ -444,21 +440,21 @@ pub const IO = struct {
}, },
.timeout => { .timeout => {
assert(completion.result < 0); assert(completion.result < 0);
const result: anyerror!void = switch (@as(os.E, @enumFromInt(-completion.result))) { const result: anyerror!void = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => { .INTR => {
completion.io.enqueue(completion); completion.io.enqueue(completion);
return; return;
}, },
.CANCELED => error.Canceled, .CANCELED => error.Canceled,
.TIME => {}, // A success. .TIME => {}, // A success.
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
completion.callback(completion.context, completion, &result); completion.callback(completion.context, completion, &result);
}, },
.write => { .write => {
const result: anyerror!usize = blk: { const result: anyerror!usize = blk: {
if (completion.result < 0) { if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) { const err = switch (@as(posix.E, @enumFromInt(-completion.result))) {
.INTR => { .INTR => {
completion.io.enqueue(completion); completion.io.enqueue(completion);
return; return;
@ -477,7 +473,7 @@ pub const IO = struct {
.PERM => error.AccessDenied, .PERM => error.AccessDenied,
.PIPE => error.BrokenPipe, .PIPE => error.BrokenPipe,
.SPIPE => error.Unseekable, .SPIPE => error.Unseekable,
else => |errno| os.unexpectedErrno(errno), else => |errno| posix.unexpectedErrno(errno),
}; };
break :blk err; break :blk err;
} else { } else {
@ -493,35 +489,35 @@ pub const IO = struct {
/// This union encodes the set of operations supported as well as their arguments. /// This union encodes the set of operations supported as well as their arguments.
const Operation = union(enum) { const Operation = union(enum) {
accept: struct { accept: struct {
socket: os.socket_t, socket: posix.socket_t,
address: os.sockaddr = undefined, address: posix.sockaddr = undefined,
address_size: os.socklen_t = @sizeOf(os.sockaddr), address_size: posix.socklen_t = @sizeOf(posix.sockaddr),
}, },
close: struct { close: struct {
fd: os.fd_t, fd: posix.fd_t,
}, },
connect: struct { connect: struct {
socket: os.socket_t, socket: posix.socket_t,
address: std.net.Address, address: std.net.Address,
}, },
read: struct { read: struct {
fd: os.fd_t, fd: posix.fd_t,
buffer: []u8, buffer: []u8,
offset: u64, offset: u64,
}, },
recv: struct { recv: struct {
socket: os.socket_t, socket: posix.socket_t,
buffer: []u8, buffer: []u8,
}, },
send: struct { send: struct {
socket: os.socket_t, socket: posix.socket_t,
buffer: []const u8, buffer: []const u8,
}, },
timeout: struct { timeout: struct {
timespec: os.linux.kernel_timespec, timespec: linux.kernel_timespec,
}, },
write: struct { write: struct {
fd: os.fd_t, fd: posix.fd_t,
buffer: []const u8, buffer: []const u8,
offset: u64, offset: u64,
}, },
@ -539,7 +535,7 @@ pub const IO = struct {
OperationNotSupported, OperationNotSupported,
PermissionDenied, PermissionDenied,
ProtocolFailure, ProtocolFailure,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn accept( pub fn accept(
self: *IO, self: *IO,
@ -548,10 +544,10 @@ pub const IO = struct {
comptime callback: fn ( comptime callback: fn (
context: Context, context: Context,
completion: *Completion, completion: *Completion,
result: AcceptError!os.socket_t, result: AcceptError!posix.socket_t,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
) void { ) void {
completion.* = .{ completion.* = .{
.io = self, .io = self,
@ -561,7 +557,7 @@ pub const IO = struct {
callback( callback(
@ptrCast(@alignCast(ctx)), @ptrCast(@alignCast(ctx)),
comp, comp,
@as(*const AcceptError!os.socket_t, @ptrCast(@alignCast(res))).*, @as(*const AcceptError!posix.socket_t, @ptrCast(@alignCast(res))).*,
); );
} }
}.wrapper, }.wrapper,
@ -569,7 +565,7 @@ pub const IO = struct {
.accept = .{ .accept = .{
.socket = socket, .socket = socket,
.address = undefined, .address = undefined,
.address_size = @sizeOf(os.sockaddr), .address_size = @sizeOf(posix.sockaddr),
}, },
}, },
}; };
@ -581,7 +577,7 @@ pub const IO = struct {
DiskQuota, DiskQuota,
InputOutput, InputOutput,
NoSpaceLeft, NoSpaceLeft,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn close( pub fn close(
self: *IO, self: *IO,
@ -593,7 +589,7 @@ pub const IO = struct {
result: CloseError!void, result: CloseError!void,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
) void { ) void {
completion.* = .{ completion.* = .{
.io = self, .io = self,
@ -631,7 +627,7 @@ pub const IO = struct {
ProtocolNotSupported, ProtocolNotSupported,
ConnectionTimedOut, ConnectionTimedOut,
SystemResources, SystemResources,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn connect( pub fn connect(
self: *IO, self: *IO,
@ -643,7 +639,7 @@ pub const IO = struct {
result: ConnectError!void, result: ConnectError!void,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
address: std.net.Address, address: std.net.Address,
) void { ) void {
completion.* = .{ completion.* = .{
@ -678,7 +674,7 @@ pub const IO = struct {
SystemResources, SystemResources,
Unseekable, Unseekable,
ConnectionTimedOut, ConnectionTimedOut,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn read( pub fn read(
self: *IO, self: *IO,
@ -690,7 +686,7 @@ pub const IO = struct {
result: ReadError!usize, result: ReadError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
buffer: []u8, buffer: []u8,
offset: u64, offset: u64,
) void { ) void {
@ -726,7 +722,7 @@ pub const IO = struct {
FileDescriptorNotASocket, FileDescriptorNotASocket,
ConnectionTimedOut, ConnectionTimedOut,
OperationNotSupported, OperationNotSupported,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn recv( pub fn recv(
self: *IO, self: *IO,
@ -738,7 +734,7 @@ pub const IO = struct {
result: RecvError!usize, result: RecvError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
buffer: []u8, buffer: []u8,
) void { ) void {
completion.* = .{ completion.* = .{
@ -777,7 +773,7 @@ pub const IO = struct {
OperationNotSupported, OperationNotSupported,
BrokenPipe, BrokenPipe,
ConnectionTimedOut, ConnectionTimedOut,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn send( pub fn send(
self: *IO, self: *IO,
@ -789,7 +785,7 @@ pub const IO = struct {
result: SendError!usize, result: SendError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
buffer: []const u8, buffer: []const u8,
) void { ) void {
completion.* = .{ completion.* = .{
@ -814,7 +810,7 @@ pub const IO = struct {
self.enqueue(completion); self.enqueue(completion);
} }
pub const TimeoutError = error{Canceled} || os.UnexpectedError; pub const TimeoutError = error{Canceled} || posix.UnexpectedError;
pub fn timeout( pub fn timeout(
self: *IO, self: *IO,
@ -849,7 +845,7 @@ pub const IO = struct {
// Special case a zero timeout as a yield. // Special case a zero timeout as a yield.
if (nanoseconds == 0) { if (nanoseconds == 0) {
completion.result = -@as(i32, @intCast(@intFromEnum(std.os.E.TIME))); completion.result = -@as(i32, @intCast(@intFromEnum(std.posix.E.TIME)));
self.completed.push(completion); self.completed.push(completion);
return; return;
} }
@ -869,7 +865,7 @@ pub const IO = struct {
Unseekable, Unseekable,
AccessDenied, AccessDenied,
BrokenPipe, BrokenPipe,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn write( pub fn write(
self: *IO, self: *IO,
@ -881,7 +877,7 @@ pub const IO = struct {
result: WriteError!usize, result: WriteError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
buffer: []const u8, buffer: []const u8,
offset: u64, offset: u64,
) void { ) void {
@ -911,17 +907,17 @@ pub const IO = struct {
pub const INVALID_SOCKET = -1; pub const INVALID_SOCKET = -1;
/// Creates a socket that can be used for async operations with the IO instance. /// Creates a socket that can be used for async operations with the IO instance.
pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t { pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !posix.socket_t {
_ = self; _ = self;
return os.socket(family, sock_type, protocol); return posix.socket(family, sock_type, protocol);
} }
/// Opens a directory with read only access. /// Opens a directory with read only access.
pub fn open_dir(dir_path: []const u8) !os.fd_t { pub fn open_dir(dir_path: []const u8) !posix.fd_t {
return os.open(dir_path, os.O.CLOEXEC | os.O.RDONLY, 0); return posix.open(dir_path, posix.O.CLOEXEC | posix.O.RDONLY, 0);
} }
pub const INVALID_FILE: os.fd_t = -1; pub const INVALID_FILE: posix.fd_t = -1;
/// Opens or creates a journal file: /// Opens or creates a journal file:
/// - For reading and writing. /// - For reading and writing.
@ -932,22 +928,22 @@ pub const IO = struct {
/// The caller is responsible for ensuring that the parent directory inode is durable. /// The caller is responsible for ensuring that the parent directory inode is durable.
/// - Verifies that the file size matches the expected file size before returning. /// - Verifies that the file size matches the expected file size before returning.
pub fn open_file( pub fn open_file(
dir_fd: os.fd_t, dir_fd: posix.fd_t,
relative_path: []const u8, relative_path: []const u8,
size: u64, size: u64,
method: enum { create, create_or_open, open }, method: enum { create, create_or_open, open },
) !os.fd_t { ) !posix.fd_t {
assert(relative_path.len > 0); assert(relative_path.len > 0);
assert(size % sector_size == 0); assert(size % sector_size == 0);
// TODO Use O_EXCL when opening as a block device to obtain a mandatory exclusive lock. // TODO Use O_EXCL when opening as a block device to obtain a mandatory exclusive lock.
// This is much stronger than an advisory exclusive lock, and is required on some platforms. // This is much stronger than an advisory exclusive lock, and is required on some platforms.
var flags: u32 = os.O.CLOEXEC | os.O.RDWR | os.O.DSYNC; var flags: u32 = posix.O.CLOEXEC | posix.O.RDWR | posix.O.DSYNC;
var mode: os.mode_t = 0; var mode: posix.mode_t = 0;
// TODO Document this and investigate whether this is in fact correct to set here. // TODO Document this and investigate whether this is in fact correct to set here.
if (@hasDecl(os.O, "LARGEFILE")) flags |= os.O.LARGEFILE; if (@hasDecl(posix.O, "LARGEFILE")) flags |= posix.O.LARGEFILE;
var direct_io_supported = false; var direct_io_supported = false;
const dir_on_tmpfs = try fs_is_tmpfs(dir_fd); const dir_on_tmpfs = try fs_is_tmpfs(dir_fd);
@ -962,7 +958,7 @@ pub const IO = struct {
if (direct_io and !dir_on_tmpfs) { if (direct_io and !dir_on_tmpfs) {
direct_io_supported = try fs_supports_direct_io(dir_fd); direct_io_supported = try fs_supports_direct_io(dir_fd);
if (direct_io_supported) { if (direct_io_supported) {
flags |= os.O.DIRECT; flags |= posix.O.DIRECT;
} else if (!direct_io_required) { } else if (!direct_io_required) {
log.warn("file system does not support Direct I/O", .{}); log.warn("file system does not support Direct I/O", .{});
} else { } else {
@ -974,13 +970,13 @@ pub const IO = struct {
switch (method) { switch (method) {
.create => { .create => {
flags |= os.O.CREAT; flags |= posix.O.CREAT;
flags |= os.O.EXCL; flags |= posix.O.EXCL;
mode = 0o666; mode = 0o666;
log.info("creating \"{s}\"...", .{relative_path}); log.info("creating \"{s}\"...", .{relative_path});
}, },
.create_or_open => { .create_or_open => {
flags |= os.O.CREAT; flags |= posix.O.CREAT;
mode = 0o666; mode = 0o666;
log.info("opening or creating \"{s}\"...", .{relative_path}); log.info("opening or creating \"{s}\"...", .{relative_path});
}, },
@ -990,19 +986,19 @@ pub const IO = struct {
} }
// This is critical as we rely on O_DSYNC for fsync() whenever we write to the file: // This is critical as we rely on O_DSYNC for fsync() whenever we write to the file:
assert((flags & os.O.DSYNC) > 0); assert((flags & posix.O.DSYNC) > 0);
// Be careful with openat(2): "If pathname is absolute, then dirfd is ignored." (man page) // Be careful with openat(2): "If pathname is absolute, then dirfd is ignored." (man page)
assert(!std.fs.path.isAbsolute(relative_path)); assert(!std.fs.path.isAbsolute(relative_path));
const fd = try os.openat(dir_fd, relative_path, flags, mode); const fd = try posix.openat(dir_fd, relative_path, flags, mode);
// TODO Return a proper error message when the path exists or does not exist (init/start). // TODO Return a proper error message when the path exists or does not exist (init/start).
errdefer os.close(fd); errdefer posix.close(fd);
// TODO Check that the file is actually a file. // TODO Check that the file is actually a file.
// Obtain an advisory exclusive lock that works only if all processes actually use flock(). // Obtain an advisory exclusive lock that works only if all processes actually use flock().
// LOCK_NB means that we want to fail the lock without waiting if another process has it. // LOCK_NB means that we want to fail the lock without waiting if another process has it.
os.flock(fd, os.LOCK.EX | os.LOCK.NB) catch |err| switch (err) { posix.flock(fd, posix.LOCK.EX | posix.LOCK.NB) catch |err| switch (err) {
error.WouldBlock => @panic("another process holds the data file lock"), error.WouldBlock => @panic("another process holds the data file lock"),
else => return err, else => return err,
}; };
@ -1023,7 +1019,7 @@ pub const IO = struct {
const write_offset = size - sector.len; const write_offset = size - sector.len;
var written: usize = 0; var written: usize = 0;
while (written < sector.len) { while (written < sector.len) {
written += try os.pwrite(fd, sector[written..], write_offset + written); written += try posix.pwrite(fd, sector[written..], write_offset + written);
} }
}, },
else => |e| return e, else => |e| return e,
@ -1034,14 +1030,14 @@ pub const IO = struct {
// making decisions on data that was never durably written by a previously crashed process. // making decisions on data that was never durably written by a previously crashed process.
// We therefore always fsync when we open the path, also to wait for any pending O_DSYNC. // We therefore always fsync when we open the path, also to wait for any pending O_DSYNC.
// Thanks to Alex Miller from FoundationDB for diving into our source and pointing this out. // Thanks to Alex Miller from FoundationDB for diving into our source and pointing this out.
try os.fsync(fd); try posix.fsync(fd);
// We fsync the parent directory to ensure that the file inode is durably written. // We fsync the parent directory to ensure that the file inode is durably written.
// The caller is responsible for the parent directory inode stored under the grandparent. // The caller is responsible for the parent directory inode stored under the grandparent.
// We always do this when opening because we don't know if this was done before crashing. // We always do this when opening because we don't know if this was done before crashing.
try os.fsync(dir_fd); try posix.fsync(dir_fd);
const stat = try os.fstat(fd); const stat = try posix.fstat(fd);
if (stat.size < size) @panic("data file inode size was truncated or corrupted"); if (stat.size < size) @panic("data file inode size was truncated or corrupted");
return fd; return fd;
@ -1049,56 +1045,56 @@ pub const IO = struct {
/// Detects whether the underlying file system for a given directory fd is tmpfs. This is used /// Detects whether the underlying file system for a given directory fd is tmpfs. This is used
/// to relax our Direct I/O check - running on tmpfs for benchmarking is useful. /// to relax our Direct I/O check - running on tmpfs for benchmarking is useful.
fn fs_is_tmpfs(dir_fd: std.os.fd_t) !bool { fn fs_is_tmpfs(dir_fd: std.posix.fd_t) !bool {
var statfs: stdx.StatFs = undefined; var statfs: stdx.StatFs = undefined;
while (true) { while (true) {
const res = stdx.fstatfs(dir_fd, &statfs); const res = stdx.fstatfs(dir_fd, &statfs);
switch (os.linux.getErrno(res)) { switch (linux.getErrno(res)) {
.SUCCESS => { .SUCCESS => {
return statfs.f_type == stdx.TmpfsMagic; return statfs.f_type == stdx.TmpfsMagic;
}, },
.INTR => continue, .INTR => continue,
else => |err| return os.unexpectedErrno(err), else => |err| return posix.unexpectedErrno(err),
} }
} }
} }
/// Detects whether the underlying file system for a given directory fd supports Direct I/O. /// Detects whether the underlying file system for a given directory fd supports Direct I/O.
/// Not all Linux file systems support `O_DIRECT`, e.g. a shared macOS volume. /// Not all Linux file systems support `O_DIRECT`, e.g. a shared macOS volume.
fn fs_supports_direct_io(dir_fd: std.os.fd_t) !bool { fn fs_supports_direct_io(dir_fd: std.posix.fd_t) !bool {
if (!@hasDecl(std.os.O, "DIRECT")) return false; if (!@hasDecl(std.posix.O, "DIRECT")) return false;
const path = "fs_supports_direct_io"; const path = "fs_supports_direct_io";
const dir = std.fs.Dir{ .fd = dir_fd }; const dir = std.fs.Dir{ .fd = dir_fd };
const fd = try os.openatZ(dir_fd, path, os.O.CLOEXEC | os.O.CREAT | os.O.TRUNC, 0o666); const fd = try posix.openatZ(dir_fd, path, posix.O.CLOEXEC | posix.O.CREAT | posix.O.TRUNC, 0o666);
defer os.close(fd); defer posix.close(fd);
defer dir.deleteFile(path) catch {}; defer dir.deleteFile(path) catch {};
while (true) { while (true) {
const res = os.linux.openat(dir_fd, path, os.O.CLOEXEC | os.O.RDONLY | os.O.DIRECT, 0); const res = linux.openat(dir_fd, path, posix.O.CLOEXEC | posix.O.RDONLY | posix.O.DIRECT, 0);
switch (os.linux.getErrno(res)) { switch (linux.getErrno(res)) {
.SUCCESS => { .SUCCESS => {
os.close(@as(os.fd_t, @intCast(res))); posix.close(@as(posix.fd_t, @intCast(res)));
return true; return true;
}, },
.INTR => continue, .INTR => continue,
.INVAL => return false, .INVAL => return false,
else => |err| return os.unexpectedErrno(err), else => |err| return posix.unexpectedErrno(err),
} }
} }
} }
/// Allocates a file contiguously using fallocate() if supported. /// Allocates a file contiguously using fallocate() if supported.
/// Alternatively, writes to the last sector so that at least the file size is correct. /// Alternatively, writes to the last sector so that at least the file size is correct.
fn fs_allocate(fd: os.fd_t, size: u64) !void { fn fs_allocate(fd: posix.fd_t, size: u64) !void {
const mode: i32 = 0; const mode: i32 = 0;
const offset: i64 = 0; const offset: i64 = 0;
const length = @as(i64, @intCast(size)); const length = @as(i64, @intCast(size));
while (true) { while (true) {
const rc = os.linux.fallocate(fd, mode, offset, length); const rc = linux.fallocate(fd, mode, offset, length);
switch (os.linux.getErrno(rc)) { switch (linux.getErrno(rc)) {
.SUCCESS => return, .SUCCESS => return,
.BADF => return error.FileDescriptorInvalid, .BADF => return error.FileDescriptorInvalid,
.FBIG => return error.FileTooBig, .FBIG => return error.FileTooBig,
@ -1112,7 +1108,7 @@ pub const IO = struct {
.PERM => return error.PermissionDenied, .PERM => return error.PermissionDenied,
.SPIPE => return error.Unseekable, .SPIPE => return error.Unseekable,
.TXTBSY => return error.FileBusy, .TXTBSY => return error.FileBusy,
else => |errno| return os.unexpectedErrno(errno), else => |errno| return posix.unexpectedErrno(errno),
} }
} }
} }

View File

@ -1,5 +1,6 @@
const std = @import("std"); const std = @import("std");
const os = std.os; const posix = std.posix;
const windows = std.os.windows;
const assert = std.debug.assert; const assert = std.debug.assert;
const log = std.log.scoped(.io); const log = std.log.scoped(.io);
@ -10,7 +11,7 @@ const bufferLimit = @import("../io.zig").bufferLimit;
const sector_size = 4096; const sector_size = 4096;
pub const IO = struct { pub const IO = struct {
iocp: os.windows.HANDLE, iocp: windows.HANDLE,
timer: Time = .{}, timer: Time = .{},
io_pending: usize = 0, io_pending: usize = 0,
timeouts: FIFO(Completion) = .{ .name = "io_timeouts" }, timeouts: FIFO(Completion) = .{ .name = "io_timeouts" },
@ -20,19 +21,19 @@ pub const IO = struct {
_ = entries; _ = entries;
_ = flags; _ = flags;
_ = try os.windows.WSAStartup(2, 2); _ = try windows.WSAStartup(2, 2);
errdefer os.windows.WSACleanup() catch unreachable; errdefer windows.WSACleanup() catch unreachable;
const iocp = try os.windows.CreateIoCompletionPort(os.windows.INVALID_HANDLE_VALUE, null, 0, 0); const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 0);
return IO{ .iocp = iocp }; return IO{ .iocp = iocp };
} }
pub fn deinit(self: *IO) void { pub fn deinit(self: *IO) void {
assert(self.iocp != os.windows.INVALID_HANDLE_VALUE); assert(self.iocp != windows.INVALID_HANDLE_VALUE);
os.windows.CloseHandle(self.iocp); windows.CloseHandle(self.iocp);
self.iocp = os.windows.INVALID_HANDLE_VALUE; self.iocp = windows.INVALID_HANDLE_VALUE;
os.windows.WSACleanup() catch unreachable; windows.WSACleanup() catch unreachable;
} }
pub fn tick(self: *IO) !void { pub fn tick(self: *IO) !void {
@ -66,16 +67,16 @@ pub const IO = struct {
if (self.completed.empty()) { if (self.completed.empty()) {
// Compute how long to poll by flushing timeout completions. // Compute how long to poll by flushing timeout completions.
// NOTE: this may push to completed queue // NOTE: this may push to completed queue
var timeout_ms: ?os.windows.DWORD = null; var timeout_ms: ?windows.DWORD = null;
if (self.flush_timeouts()) |expires_ns| { if (self.flush_timeouts()) |expires_ns| {
// 0ns expires should have been completed not returned // 0ns expires should have been completed not returned
assert(expires_ns != 0); assert(expires_ns != 0);
// Round up sub-millisecond expire times to the next millisecond // Round up sub-millisecond expire times to the next millisecond
const expires_ms = (expires_ns + (std.time.ns_per_ms / 2)) / std.time.ns_per_ms; const expires_ms = (expires_ns + (std.time.ns_per_ms / 2)) / std.time.ns_per_ms;
// Saturating cast to DWORD milliseconds // Saturating cast to DWORD milliseconds
const expires = std.math.cast(os.windows.DWORD, expires_ms) orelse std.math.maxInt(os.windows.DWORD); const expires = std.math.cast(windows.DWORD, expires_ms) orelse std.math.maxInt(windows.DWORD);
// max DWORD is reserved for INFINITE so cap the cast at max - 1 // max DWORD is reserved for INFINITE so cap the cast at max - 1
timeout_ms = if (expires == os.windows.INFINITE) expires - 1 else expires; timeout_ms = if (expires == windows.INFINITE) expires - 1 else expires;
} }
// Poll for IO iff theres IO pending and flush_timeouts() found no ready completions // Poll for IO iff theres IO pending and flush_timeouts() found no ready completions
@ -87,8 +88,8 @@ pub const IO = struct {
.non_blocking => 0, .non_blocking => 0,
}; };
var events: [64]os.windows.OVERLAPPED_ENTRY = undefined; var events: [64]windows.OVERLAPPED_ENTRY = undefined;
const num_events: u32 = os.windows.GetQueuedCompletionStatusEx( const num_events: u32 = windows.GetQueuedCompletionStatusEx(
self.iocp, self.iocp,
&events, &events,
io_timeout, io_timeout,
@ -170,13 +171,13 @@ pub const IO = struct {
}; };
const Overlapped = struct { const Overlapped = struct {
raw: os.windows.OVERLAPPED, raw: windows.OVERLAPPED,
completion: *Completion, completion: *Completion,
}; };
const Transfer = struct { const Transfer = struct {
socket: os.socket_t, socket: posix.socket_t,
buf: os.windows.ws2_32.WSABUF, buf: windows.ws2_32.WSABUF,
overlapped: Overlapped, overlapped: Overlapped,
pending: bool, pending: bool,
}; };
@ -184,12 +185,12 @@ pub const IO = struct {
const Operation = union(enum) { const Operation = union(enum) {
accept: struct { accept: struct {
overlapped: Overlapped, overlapped: Overlapped,
listen_socket: os.socket_t, listen_socket: posix.socket_t,
client_socket: os.socket_t, client_socket: posix.socket_t,
addr_buffer: [(@sizeOf(std.net.Address) + 16) * 2]u8 align(4), addr_buffer: [(@sizeOf(std.net.Address) + 16) * 2]u8 align(4),
}, },
connect: struct { connect: struct {
socket: os.socket_t, socket: posix.socket_t,
address: std.net.Address, address: std.net.Address,
overlapped: Overlapped, overlapped: Overlapped,
pending: bool, pending: bool,
@ -197,19 +198,19 @@ pub const IO = struct {
send: Transfer, send: Transfer,
recv: Transfer, recv: Transfer,
read: struct { read: struct {
fd: os.fd_t, fd: posix.fd_t,
buf: [*]u8, buf: [*]u8,
len: u32, len: u32,
offset: u64, offset: u64,
}, },
write: struct { write: struct {
fd: os.fd_t, fd: posix.fd_t,
buf: [*]const u8, buf: [*]const u8,
len: u32, len: u32,
offset: u64, offset: u64,
}, },
close: struct { close: struct {
fd: os.fd_t, fd: posix.fd_t,
}, },
timeout: struct { timeout: struct {
deadline: u64, deadline: u64,
@ -270,7 +271,7 @@ pub const IO = struct {
} }
} }
pub const AcceptError = os.AcceptError || os.SetSockOptError; pub const AcceptError = posix.AcceptError || posix.SetSockOptError;
pub fn accept( pub fn accept(
self: *IO, self: *IO,
@ -279,10 +280,10 @@ pub const IO = struct {
comptime callback: fn ( comptime callback: fn (
context: Context, context: Context,
completion: *Completion, completion: *Completion,
result: AcceptError!os.socket_t, result: AcceptError!posix.socket_t,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
) void { ) void {
self.submit( self.submit(
context, context,
@ -296,31 +297,31 @@ pub const IO = struct {
.addr_buffer = undefined, .addr_buffer = undefined,
}, },
struct { struct {
fn do_operation(ctx: Completion.Context, op: anytype) AcceptError!os.socket_t { fn do_operation(ctx: Completion.Context, op: anytype) AcceptError!posix.socket_t {
var flags: os.windows.DWORD = undefined; var flags: windows.DWORD = undefined;
var transferred: os.windows.DWORD = undefined; var transferred: windows.DWORD = undefined;
const rc = switch (op.client_socket) { const rc = switch (op.client_socket) {
// When first called, the client_socket is invalid so we start the op. // When first called, the client_socket is invalid so we start the op.
INVALID_SOCKET => blk: { INVALID_SOCKET => blk: {
// Create the socket that will be used for accept. // Create the socket that will be used for accept.
op.client_socket = ctx.io.open_socket( op.client_socket = ctx.io.open_socket(
os.AF.INET, posix.AF.INET,
os.SOCK.STREAM, posix.SOCK.STREAM,
os.IPPROTO.TCP, posix.IPPROTO.TCP,
) catch |err| switch (err) { ) catch |err| switch (err) {
error.AddressFamilyNotSupported, error.ProtocolNotSupported => unreachable, error.AddressFamilyNotSupported, error.ProtocolNotSupported => unreachable,
else => |e| return e, else => |e| return e,
}; };
var sync_bytes_read: os.windows.DWORD = undefined; var sync_bytes_read: windows.DWORD = undefined;
op.overlapped = .{ op.overlapped = .{
.raw = std.mem.zeroes(os.windows.OVERLAPPED), .raw = std.mem.zeroes(windows.OVERLAPPED),
.completion = ctx.completion, .completion = ctx.completion,
}; };
// Start the asynchronous accept with the created socket. // Start the asynchronous accept with the created socket.
break :blk os.windows.ws2_32.AcceptEx( break :blk windows.ws2_32.AcceptEx(
op.listen_socket, op.listen_socket,
op.client_socket, op.client_socket,
&op.addr_buffer, &op.addr_buffer,
@ -332,22 +333,22 @@ pub const IO = struct {
); );
}, },
// Called after accept was started, so get the result // Called after accept was started, so get the result
else => os.windows.ws2_32.WSAGetOverlappedResult( else => windows.ws2_32.WSAGetOverlappedResult(
op.listen_socket, op.listen_socket,
&op.overlapped.raw, &op.overlapped.raw,
&transferred, &transferred,
os.windows.FALSE, // dont wait windows.FALSE, // dont wait
&flags, &flags,
), ),
}; };
// return the socket if we succeed in accepting. // return the socket if we succeed in accepting.
if (rc != os.windows.FALSE) { if (rc != windows.FALSE) {
// enables getsockopt, setsockopt, getsockname, getpeername // enables getsockopt, setsockopt, getsockname, getpeername
_ = os.windows.ws2_32.setsockopt( _ = windows.ws2_32.setsockopt(
op.client_socket, op.client_socket,
os.windows.ws2_32.SOL.SOCKET, windows.ws2_32.SOL.SOCKET,
os.windows.ws2_32.SO.UPDATE_ACCEPT_CONTEXT, windows.ws2_32.SO.UPDATE_ACCEPT_CONTEXT,
null, null,
0, 0,
); );
@ -359,12 +360,12 @@ pub const IO = struct {
errdefer |err| switch (err) { errdefer |err| switch (err) {
error.WouldBlock => {}, error.WouldBlock => {},
else => { else => {
os.closeSocket(op.client_socket); posix.close(op.client_socket);
op.client_socket = INVALID_SOCKET; op.client_socket = INVALID_SOCKET;
}, },
}; };
return switch (os.windows.ws2_32.WSAGetLastError()) { return switch (windows.ws2_32.WSAGetLastError()) {
.WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock, .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSANOTINITIALISED => unreachable, // WSAStartup() was called
.WSAENETDOWN => unreachable, // WinSock error .WSAENETDOWN => unreachable, // WinSock error
@ -376,7 +377,7 @@ pub const IO = struct {
.WSAEMFILE => unreachable, // we create our own descriptor so its available .WSAEMFILE => unreachable, // we create our own descriptor so its available
.WSAENOBUFS => error.SystemResources, .WSAENOBUFS => error.SystemResources,
.WSAEINTR, .WSAEINPROGRESS => unreachable, // no blocking calls .WSAEINTR, .WSAEINPROGRESS => unreachable, // no blocking calls
else => |err| os.windows.unexpectedWSAError(err), else => |err| windows.unexpectedWSAError(err),
}; };
} }
}, },
@ -388,9 +389,9 @@ pub const IO = struct {
DiskQuota, DiskQuota,
InputOutput, InputOutput,
NoSpaceLeft, NoSpaceLeft,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub const ConnectError = os.ConnectError || error{FileDescriptorNotASocket}; pub const ConnectError = posix.ConnectError || error{FileDescriptorNotASocket};
pub fn connect( pub fn connect(
self: *IO, self: *IO,
@ -402,7 +403,7 @@ pub const IO = struct {
result: ConnectError!void, result: ConnectError!void,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
address: std.net.Address, address: std.net.Address,
) void { ) void {
self.submit( self.submit(
@ -418,17 +419,17 @@ pub const IO = struct {
}, },
struct { struct {
fn do_operation(ctx: Completion.Context, op: anytype) ConnectError!void { fn do_operation(ctx: Completion.Context, op: anytype) ConnectError!void {
var flags: os.windows.DWORD = undefined; var flags: windows.DWORD = undefined;
var transferred: os.windows.DWORD = undefined; var transferred: windows.DWORD = undefined;
const rc = blk: { const rc = blk: {
// Poll for the result if we've already started the connect op. // Poll for the result if we've already started the connect op.
if (op.pending) { if (op.pending) {
break :blk os.windows.ws2_32.WSAGetOverlappedResult( break :blk windows.ws2_32.WSAGetOverlappedResult(
op.socket, op.socket,
&op.overlapped.raw, &op.overlapped.raw,
&transferred, &transferred,
os.windows.FALSE, // dont wait windows.FALSE, // dont wait
&flags, &flags,
); );
} }
@ -436,7 +437,7 @@ pub const IO = struct {
// ConnectEx requires the socket to be initially bound (INADDR_ANY) // ConnectEx requires the socket to be initially bound (INADDR_ANY)
const inaddr_any = std.mem.zeroes([4]u8); const inaddr_any = std.mem.zeroes([4]u8);
const bind_addr = std.net.Address.initIp4(inaddr_any, 0); const bind_addr = std.net.Address.initIp4(inaddr_any, 0);
os.bind( posix.bind(
op.socket, op.socket,
&bind_addr.any, &bind_addr.any,
bind_addr.getOsSockLen(), bind_addr.getOsSockLen(),
@ -452,43 +453,43 @@ pub const IO = struct {
}; };
const LPFN_CONNECTEX = *const fn ( const LPFN_CONNECTEX = *const fn (
Socket: os.windows.ws2_32.SOCKET, Socket: windows.ws2_32.SOCKET,
SockAddr: *const os.windows.ws2_32.sockaddr, SockAddr: *const windows.ws2_32.sockaddr,
SockLen: os.socklen_t, SockLen: posix.socklen_t,
SendBuf: ?*const anyopaque, SendBuf: ?*const anyopaque,
SendBufLen: os.windows.DWORD, SendBufLen: windows.DWORD,
BytesSent: *os.windows.DWORD, BytesSent: *windows.DWORD,
Overlapped: *os.windows.OVERLAPPED, Overlapped: *windows.OVERLAPPED,
) callconv(os.windows.WINAPI) os.windows.BOOL; ) callconv(windows.WINAPI) windows.BOOL;
// Find the ConnectEx function by dynamically looking it up on the socket. // Find the ConnectEx function by dynamically looking it up on the socket.
// TODO: use `os.windows.loadWinsockExtensionFunction` once the function // TODO: use `windows.loadWinsockExtensionFunction` once the function
// pointer is no longer required to be comptime. // pointer is no longer required to be comptime.
var connect_ex: LPFN_CONNECTEX = undefined; var connect_ex: LPFN_CONNECTEX = undefined;
var num_bytes: os.windows.DWORD = undefined; var num_bytes: windows.DWORD = undefined;
const guid = os.windows.ws2_32.WSAID_CONNECTEX; const guid = windows.ws2_32.WSAID_CONNECTEX;
switch (os.windows.ws2_32.WSAIoctl( switch (windows.ws2_32.WSAIoctl(
op.socket, op.socket,
os.windows.ws2_32.SIO_GET_EXTENSION_FUNCTION_POINTER, windows.ws2_32.SIO_GET_EXTENSION_FUNCTION_POINTER,
@as(*const anyopaque, @ptrCast(&guid)), @as(*const anyopaque, @ptrCast(&guid)),
@sizeOf(os.windows.GUID), @sizeOf(windows.GUID),
@as(*anyopaque, @ptrCast(&connect_ex)), @as(*anyopaque, @ptrCast(&connect_ex)),
@sizeOf(LPFN_CONNECTEX), @sizeOf(LPFN_CONNECTEX),
&num_bytes, &num_bytes,
null, null,
null, null,
)) { )) {
os.windows.ws2_32.SOCKET_ERROR => switch (os.windows.ws2_32.WSAGetLastError()) { windows.ws2_32.SOCKET_ERROR => switch (windows.ws2_32.WSAGetLastError()) {
.WSAEOPNOTSUPP => unreachable, .WSAEOPNOTSUPP => unreachable,
.WSAENOTSOCK => unreachable, .WSAENOTSOCK => unreachable,
else => |err| return os.windows.unexpectedWSAError(err), else => |err| return windows.unexpectedWSAError(err),
}, },
else => assert(num_bytes == @sizeOf(LPFN_CONNECTEX)), else => assert(num_bytes == @sizeOf(LPFN_CONNECTEX)),
} }
op.pending = true; op.pending = true;
op.overlapped = .{ op.overlapped = .{
.raw = std.mem.zeroes(os.windows.OVERLAPPED), .raw = std.mem.zeroes(windows.OVERLAPPED),
.completion = ctx.completion, .completion = ctx.completion,
}; };
@ -505,12 +506,12 @@ pub const IO = struct {
}; };
// return if we succeeded in connecting // return if we succeeded in connecting
if (rc != os.windows.FALSE) { if (rc != windows.FALSE) {
// enables getsockopt, setsockopt, getsockname, getpeername // enables getsockopt, setsockopt, getsockname, getpeername
_ = os.windows.ws2_32.setsockopt( _ = windows.ws2_32.setsockopt(
op.socket, op.socket,
os.windows.ws2_32.SOL.SOCKET, windows.ws2_32.SOL.SOCKET,
os.windows.ws2_32.SO.UPDATE_CONNECT_CONTEXT, windows.ws2_32.SO.UPDATE_CONNECT_CONTEXT,
null, null,
0, 0,
); );
@ -518,7 +519,7 @@ pub const IO = struct {
return; return;
} }
return switch (os.windows.ws2_32.WSAGetLastError()) { return switch (windows.ws2_32.WSAGetLastError()) {
.WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE, .WSAEALREADY => error.WouldBlock, .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE, .WSAEALREADY => error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSANOTINITIALISED => unreachable, // WSAStartup() was called
.WSAENETDOWN => unreachable, // network subsystem is down .WSAENETDOWN => unreachable, // network subsystem is down
@ -532,14 +533,14 @@ pub const IO = struct {
.WSAENOTSOCK => unreachable, // socket is not bound or is listening .WSAENOTSOCK => unreachable, // socket is not bound or is listening
.WSAETIMEDOUT => error.ConnectionTimedOut, .WSAETIMEDOUT => error.ConnectionTimedOut,
.WSA_INVALID_HANDLE => unreachable, // we dont use hEvent in OVERLAPPED .WSA_INVALID_HANDLE => unreachable, // we dont use hEvent in OVERLAPPED
else => |err| os.windows.unexpectedWSAError(err), else => |err| windows.unexpectedWSAError(err),
}; };
} }
}, },
); );
} }
pub const SendError = os.SendError; pub const SendError = posix.SendError;
pub fn send( pub fn send(
self: *IO, self: *IO,
@ -551,12 +552,12 @@ pub const IO = struct {
result: SendError!usize, result: SendError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
buffer: []const u8, buffer: []const u8,
) void { ) void {
const transfer = Completion.Transfer{ const transfer = Completion.Transfer{
.socket = socket, .socket = socket,
.buf = os.windows.ws2_32.WSABUF{ .buf = windows.ws2_32.WSABUF{
.len = @as(u32, @intCast(bufferLimit(buffer.len))), .len = @as(u32, @intCast(bufferLimit(buffer.len))),
.buf = @constCast(buffer.ptr), .buf = @constCast(buffer.ptr),
}, },
@ -572,48 +573,48 @@ pub const IO = struct {
transfer, transfer,
struct { struct {
fn do_operation(ctx: Completion.Context, op: anytype) SendError!usize { fn do_operation(ctx: Completion.Context, op: anytype) SendError!usize {
var flags: os.windows.DWORD = undefined; var flags: windows.DWORD = undefined;
var transferred: os.windows.DWORD = undefined; var transferred: windows.DWORD = undefined;
const rc = blk: { const rc = blk: {
// Poll for the result if we've already started the send op. // Poll for the result if we've already started the send op.
if (op.pending) { if (op.pending) {
break :blk os.windows.ws2_32.WSAGetOverlappedResult( break :blk windows.ws2_32.WSAGetOverlappedResult(
op.socket, op.socket,
&op.overlapped.raw, &op.overlapped.raw,
&transferred, &transferred,
os.windows.FALSE, // dont wait windows.FALSE, // dont wait
&flags, &flags,
); );
} }
op.pending = true; op.pending = true;
op.overlapped = .{ op.overlapped = .{
.raw = std.mem.zeroes(os.windows.OVERLAPPED), .raw = std.mem.zeroes(windows.OVERLAPPED),
.completion = ctx.completion, .completion = ctx.completion,
}; };
// Start the send operation. // Start the send operation.
break :blk switch (os.windows.ws2_32.WSASend( break :blk switch (windows.ws2_32.WSASend(
op.socket, op.socket,
@as([*]os.windows.ws2_32.WSABUF, @ptrCast(&op.buf)), @as([*]windows.ws2_32.WSABUF, @ptrCast(&op.buf)),
1, // one buffer 1, // one buffer
&transferred, &transferred,
0, // no flags 0, // no flags
&op.overlapped.raw, &op.overlapped.raw,
null, null,
)) { )) {
os.windows.ws2_32.SOCKET_ERROR => @as(os.windows.BOOL, os.windows.FALSE), windows.ws2_32.SOCKET_ERROR => @as(windows.BOOL, windows.FALSE),
0 => os.windows.TRUE, 0 => windows.TRUE,
else => unreachable, else => unreachable,
}; };
}; };
// Return bytes transferred on success. // Return bytes transferred on success.
if (rc != os.windows.FALSE) if (rc != windows.FALSE)
return transferred; return transferred;
return switch (os.windows.ws2_32.WSAGetLastError()) { return switch (windows.ws2_32.WSAGetLastError()) {
.WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock, .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSANOTINITIALISED => unreachable, // WSAStartup() was called
.WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent .WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent
@ -632,14 +633,14 @@ pub const IO = struct {
.WSAEOPNOTSUPP => unreachable, // we dont use MSG_OOB or MSG_PARTIAL .WSAEOPNOTSUPP => unreachable, // we dont use MSG_OOB or MSG_PARTIAL
.WSAESHUTDOWN => error.BrokenPipe, .WSAESHUTDOWN => error.BrokenPipe,
.WSA_OPERATION_ABORTED => unreachable, // operation was cancelled .WSA_OPERATION_ABORTED => unreachable, // operation was cancelled
else => |err| os.windows.unexpectedWSAError(err), else => |err| windows.unexpectedWSAError(err),
}; };
} }
}, },
); );
} }
pub const RecvError = os.RecvFromError; pub const RecvError = posix.RecvFromError;
pub fn recv( pub fn recv(
self: *IO, self: *IO,
@ -651,12 +652,12 @@ pub const IO = struct {
result: RecvError!usize, result: RecvError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
socket: os.socket_t, socket: posix.socket_t,
buffer: []u8, buffer: []u8,
) void { ) void {
const transfer = Completion.Transfer{ const transfer = Completion.Transfer{
.socket = socket, .socket = socket,
.buf = os.windows.ws2_32.WSABUF{ .buf = windows.ws2_32.WSABUF{
.len = @as(u32, @intCast(bufferLimit(buffer.len))), .len = @as(u32, @intCast(bufferLimit(buffer.len))),
.buf = buffer.ptr, .buf = buffer.ptr,
}, },
@ -672,48 +673,48 @@ pub const IO = struct {
transfer, transfer,
struct { struct {
fn do_operation(ctx: Completion.Context, op: anytype) RecvError!usize { fn do_operation(ctx: Completion.Context, op: anytype) RecvError!usize {
var flags: os.windows.DWORD = 0; // used both as input and output var flags: windows.DWORD = 0; // used both as input and output
var transferred: os.windows.DWORD = undefined; var transferred: windows.DWORD = undefined;
const rc = blk: { const rc = blk: {
// Poll for the result if we've already started the recv op. // Poll for the result if we've already started the recv op.
if (op.pending) { if (op.pending) {
break :blk os.windows.ws2_32.WSAGetOverlappedResult( break :blk windows.ws2_32.WSAGetOverlappedResult(
op.socket, op.socket,
&op.overlapped.raw, &op.overlapped.raw,
&transferred, &transferred,
os.windows.FALSE, // dont wait windows.FALSE, // dont wait
&flags, &flags,
); );
} }
op.pending = true; op.pending = true;
op.overlapped = .{ op.overlapped = .{
.raw = std.mem.zeroes(os.windows.OVERLAPPED), .raw = std.mem.zeroes(windows.OVERLAPPED),
.completion = ctx.completion, .completion = ctx.completion,
}; };
// Start the recv operation. // Start the recv operation.
break :blk switch (os.windows.ws2_32.WSARecv( break :blk switch (windows.ws2_32.WSARecv(
op.socket, op.socket,
@as([*]os.windows.ws2_32.WSABUF, @ptrCast(&op.buf)), @as([*]windows.ws2_32.WSABUF, @ptrCast(&op.buf)),
1, // one buffer 1, // one buffer
&transferred, &transferred,
&flags, &flags,
&op.overlapped.raw, &op.overlapped.raw,
null, null,
)) { )) {
os.windows.ws2_32.SOCKET_ERROR => @as(os.windows.BOOL, os.windows.FALSE), windows.ws2_32.SOCKET_ERROR => @as(windows.BOOL, windows.FALSE),
0 => os.windows.TRUE, 0 => windows.TRUE,
else => unreachable, else => unreachable,
}; };
}; };
// Return bytes received on success. // Return bytes received on success.
if (rc != os.windows.FALSE) if (rc != windows.FALSE)
return transferred; return transferred;
return switch (os.windows.ws2_32.WSAGetLastError()) { return switch (windows.ws2_32.WSAGetLastError()) {
.WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock, .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSANOTINITIALISED => unreachable, // WSAStartup() was called
.WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent .WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent
@ -733,7 +734,7 @@ pub const IO = struct {
.WSAESHUTDOWN => error.SocketNotConnected, .WSAESHUTDOWN => error.SocketNotConnected,
.WSAETIMEDOUT => error.ConnectionRefused, .WSAETIMEDOUT => error.ConnectionRefused,
.WSA_OPERATION_ABORTED => unreachable, // operation was cancelled .WSA_OPERATION_ABORTED => unreachable, // operation was cancelled
else => |err| os.windows.unexpectedWSAError(err), else => |err| windows.unexpectedWSAError(err),
}; };
} }
}, },
@ -750,7 +751,7 @@ pub const IO = struct {
SystemResources, SystemResources,
Unseekable, Unseekable,
ConnectionTimedOut, ConnectionTimedOut,
} || os.UnexpectedError; } || posix.UnexpectedError;
pub fn read( pub fn read(
self: *IO, self: *IO,
@ -762,7 +763,7 @@ pub const IO = struct {
result: ReadError!usize, result: ReadError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
buffer: []u8, buffer: []u8,
offset: u64, offset: u64,
) void { ) void {
@ -781,7 +782,7 @@ pub const IO = struct {
fn do_operation(ctx: Completion.Context, op: anytype) ReadError!usize { fn do_operation(ctx: Completion.Context, op: anytype) ReadError!usize {
// Do a synchronous read for now. // Do a synchronous read for now.
_ = ctx; _ = ctx;
return os.pread(op.fd, op.buf[0..op.len], op.offset) catch |err| switch (err) { return posix.pread(op.fd, op.buf[0..op.len], op.offset) catch |err| switch (err) {
error.OperationAborted => unreachable, error.OperationAborted => unreachable,
error.BrokenPipe => unreachable, error.BrokenPipe => unreachable,
error.ConnectionTimedOut => unreachable, error.ConnectionTimedOut => unreachable,
@ -794,7 +795,7 @@ pub const IO = struct {
); );
} }
pub const WriteError = os.PWriteError; pub const WriteError = posix.PWriteError;
pub fn write( pub fn write(
self: *IO, self: *IO,
@ -806,7 +807,7 @@ pub const IO = struct {
result: WriteError!usize, result: WriteError!usize,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
buffer: []const u8, buffer: []const u8,
offset: u64, offset: u64,
) void { ) void {
@ -825,7 +826,7 @@ pub const IO = struct {
fn do_operation(ctx: Completion.Context, op: anytype) WriteError!usize { fn do_operation(ctx: Completion.Context, op: anytype) WriteError!usize {
// Do a synchronous write for now. // Do a synchronous write for now.
_ = ctx; _ = ctx;
return os.pwrite(op.fd, op.buf[0..op.len], op.offset); return posix.pwrite(op.fd, op.buf[0..op.len], op.offset);
} }
}, },
); );
@ -841,7 +842,7 @@ pub const IO = struct {
result: CloseError!void, result: CloseError!void,
) void, ) void,
completion: *Completion, completion: *Completion,
fd: os.fd_t, fd: posix.fd_t,
) void { ) void {
self.submit( self.submit(
context, context,
@ -855,19 +856,19 @@ pub const IO = struct {
// Check if the fd is a SOCKET by seeing if getsockopt() returns ENOTSOCK // Check if the fd is a SOCKET by seeing if getsockopt() returns ENOTSOCK
// https://stackoverflow.com/a/50981652 // https://stackoverflow.com/a/50981652
const socket: os.socket_t = @ptrCast(op.fd); const socket: posix.socket_t = @ptrCast(op.fd);
getsockoptError(socket) catch |err| switch (err) { getsockoptError(socket) catch |err| switch (err) {
error.FileDescriptorNotASocket => return os.windows.CloseHandle(op.fd), error.FileDescriptorNotASocket => return windows.CloseHandle(op.fd),
else => {}, else => {},
}; };
os.closeSocket(socket); posix.close(socket);
} }
}, },
); );
} }
pub const TimeoutError = error{Canceled} || os.UnexpectedError; pub const TimeoutError = error{Canceled} || posix.UnexpectedError;
pub fn timeout( pub fn timeout(
self: *IO, self: *IO,
@ -915,16 +916,16 @@ pub const IO = struct {
); );
} }
pub const INVALID_SOCKET = os.windows.ws2_32.INVALID_SOCKET; pub const INVALID_SOCKET = windows.ws2_32.INVALID_SOCKET;
/// Creates a socket that can be used for async operations with the IO instance. /// Creates a socket that can be used for async operations with the IO instance.
pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t { pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !posix.socket_t {
// SOCK_NONBLOCK | SOCK_CLOEXEC // SOCK_NONBLOCK | SOCK_CLOEXEC
var flags: os.windows.DWORD = 0; var flags: windows.DWORD = 0;
flags |= os.windows.ws2_32.WSA_FLAG_OVERLAPPED; flags |= windows.ws2_32.WSA_FLAG_OVERLAPPED;
flags |= os.windows.ws2_32.WSA_FLAG_NO_HANDLE_INHERIT; flags |= windows.ws2_32.WSA_FLAG_NO_HANDLE_INHERIT;
const socket = try os.windows.WSASocketW( const socket = try windows.WSASocketW(
@as(i32, @bitCast(family)), @as(i32, @bitCast(family)),
@as(i32, @bitCast(sock_type)), @as(i32, @bitCast(sock_type)),
@as(i32, @bitCast(protocol)), @as(i32, @bitCast(protocol)),
@ -932,68 +933,68 @@ pub const IO = struct {
0, 0,
flags, flags,
); );
errdefer os.closeSocket(socket); errdefer posix.close(socket);
const socket_iocp = try os.windows.CreateIoCompletionPort(socket, self.iocp, 0, 0); const socket_iocp = try windows.CreateIoCompletionPort(socket, self.iocp, 0, 0);
assert(socket_iocp == self.iocp); assert(socket_iocp == self.iocp);
// Ensure that synchronous IO completion doesn't queue an unneeded overlapped // Ensure that synchronous IO completion doesn't queue an unneeded overlapped
// and that the event for the socket (WaitForSingleObject) doesn't need to be set. // and that the event for the socket (WaitForSingleObject) doesn't need to be set.
var mode: os.windows.BYTE = 0; var mode: windows.BYTE = 0;
mode |= os.windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; mode |= windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
mode |= os.windows.FILE_SKIP_SET_EVENT_ON_HANDLE; mode |= windows.FILE_SKIP_SET_EVENT_ON_HANDLE;
const handle = @as(os.windows.HANDLE, @ptrCast(socket)); const handle = @as(windows.HANDLE, @ptrCast(socket));
try os.windows.SetFileCompletionNotificationModes(handle, mode); try windows.SetFileCompletionNotificationModes(handle, mode);
return socket; return socket;
} }
/// Opens a directory with read only access. /// Opens a directory with read only access.
pub fn open_dir(dir_path: []const u8) !os.fd_t { pub fn open_dir(dir_path: []const u8) !posix.fd_t {
const dir = try std.fs.cwd().openDir(dir_path, .{}); const dir = try std.fs.cwd().openDir(dir_path, .{});
return dir.fd; return dir.fd;
} }
pub const INVALID_FILE = os.windows.INVALID_HANDLE_VALUE; pub const INVALID_FILE = windows.INVALID_HANDLE_VALUE;
fn open_file_handle(relative_path: []const u8, method: enum { create, open }) !os.fd_t { fn open_file_handle(relative_path: []const u8, method: enum { create, open }) !posix.fd_t {
const path_w = try os.windows.sliceToPrefixedFileW(relative_path); const path_w = try windows.sliceToPrefixedFileW(relative_path);
// FILE_CREATE = O_CREAT | O_EXCL // FILE_CREATE = O_CREAT | O_EXCL
var creation_disposition: os.windows.DWORD = 0; var creation_disposition: windows.DWORD = 0;
switch (method) { switch (method) {
.create => { .create => {
creation_disposition = os.windows.FILE_CREATE; creation_disposition = windows.FILE_CREATE;
log.info("creating \"{s}\"...", .{relative_path}); log.info("creating \"{s}\"...", .{relative_path});
}, },
.open => { .open => {
creation_disposition = os.windows.OPEN_EXISTING; creation_disposition = windows.OPEN_EXISTING;
log.info("opening \"{s}\"...", .{relative_path}); log.info("opening \"{s}\"...", .{relative_path});
}, },
} }
// O_EXCL // O_EXCL
const shared_mode: os.windows.DWORD = 0; const shared_mode: windows.DWORD = 0;
// O_RDWR // O_RDWR
var access_mask: os.windows.DWORD = 0; var access_mask: windows.DWORD = 0;
access_mask |= os.windows.GENERIC_READ; access_mask |= windows.GENERIC_READ;
access_mask |= os.windows.GENERIC_WRITE; access_mask |= windows.GENERIC_WRITE;
// O_DIRECT | O_DSYNC // O_DIRECT | O_DSYNC
var attributes: os.windows.DWORD = 0; var attributes: windows.DWORD = 0;
attributes |= os.windows.FILE_FLAG_NO_BUFFERING; attributes |= windows.FILE_FLAG_NO_BUFFERING;
attributes |= os.windows.FILE_FLAG_WRITE_THROUGH; attributes |= windows.FILE_FLAG_WRITE_THROUGH;
// This is critical as we rely on O_DSYNC for fsync() whenever we write to the file: // This is critical as we rely on O_DSYNC for fsync() whenever we write to the file:
assert((attributes & os.windows.FILE_FLAG_WRITE_THROUGH) > 0); assert((attributes & windows.FILE_FLAG_WRITE_THROUGH) > 0);
// TODO: Add ReadFileEx/WriteFileEx support. // TODO: Add ReadFileEx/WriteFileEx support.
// Not currently needed for O_DIRECT disk IO. // Not currently needed for O_DIRECT disk IO.
// attributes |= os.windows.FILE_FLAG_OVERLAPPED; // attributes |= windows.FILE_FLAG_OVERLAPPED;
const handle = os.windows.kernel32.CreateFileW( const handle = windows.kernel32.CreateFileW(
path_w.span(), path_w.span(),
access_mask, access_mask,
shared_mode, shared_mode,
@ -1003,13 +1004,13 @@ pub const IO = struct {
null, // no existing template file null, // no existing template file
); );
if (handle == os.windows.INVALID_HANDLE_VALUE) { if (handle == windows.INVALID_HANDLE_VALUE) {
return switch (os.windows.kernel32.GetLastError()) { return switch (windows.kernel32.GetLastError()) {
.FILE_NOT_FOUND => error.FileNotFound, .FILE_NOT_FOUND => error.FileNotFound,
.SHARING_VIOLATION, .ACCESS_DENIED => error.AccessDenied, .SHARING_VIOLATION, .ACCESS_DENIED => error.AccessDenied,
else => |err| { else => |err| {
log.warn("CreateFileW(): {}", .{err}); log.warn("CreateFileW(): {}", .{err});
return os.windows.unexpectedError(err); return windows.unexpectedError(err);
}, },
}; };
} }
@ -1026,11 +1027,11 @@ pub const IO = struct {
/// The caller is responsible for ensuring that the parent directory inode is durable. /// The caller is responsible for ensuring that the parent directory inode is durable.
/// - Verifies that the file size matches the expected file size before returning. /// - Verifies that the file size matches the expected file size before returning.
pub fn open_file( pub fn open_file(
dir_handle: os.fd_t, dir_handle: posix.fd_t,
relative_path: []const u8, relative_path: []const u8,
size: u64, size: u64,
method: enum { create, create_or_open, open }, method: enum { create, create_or_open, open },
) !os.fd_t { ) !posix.fd_t {
assert(relative_path.len > 0); assert(relative_path.len > 0);
assert(size % sector_size == 0); assert(size % sector_size == 0);
@ -1042,7 +1043,7 @@ pub const IO = struct {
else => return err, else => return err,
}, },
}; };
errdefer os.windows.CloseHandle(handle); errdefer windows.CloseHandle(handle);
// Obtain an advisory exclusive lock // Obtain an advisory exclusive lock
// even when we haven't given shared access to other processes. // even when we haven't given shared access to other processes.
@ -1064,7 +1065,7 @@ pub const IO = struct {
const write_offset = size - sector.len; const write_offset = size - sector.len;
var written: usize = 0; var written: usize = 0;
while (written < sector.len) { while (written < sector.len) {
written += try os.pwrite(handle, sector[written..], write_offset + written); written += try posix.pwrite(handle, sector[written..], write_offset + written);
} }
}; };
} }
@ -1073,21 +1074,21 @@ pub const IO = struct {
// making decisions on data that was never durably written by a previously crashed process. // making decisions on data that was never durably written by a previously crashed process.
// We therefore always fsync when we open the path, also to wait for any pending O_DSYNC. // We therefore always fsync when we open the path, also to wait for any pending O_DSYNC.
// Thanks to Alex Miller from FoundationDB for diving into our source and pointing this out. // Thanks to Alex Miller from FoundationDB for diving into our source and pointing this out.
try os.fsync(handle); try posix.fsync(handle);
// We cannot fsync the directory handle on Windows. // We cannot fsync the directory handle on Windows.
// We have no way to open a directory with write access. // We have no way to open a directory with write access.
// //
// try os.fsync(dir_handle); // try posix.fsync(dir_handle);
_ = dir_handle; _ = dir_handle;
const file_size = try os.windows.GetFileSizeEx(handle); const file_size = try windows.GetFileSizeEx(handle);
if (file_size < size) @panic("data file inode size was truncated or corrupted"); if (file_size < size) @panic("data file inode size was truncated or corrupted");
return handle; return handle;
} }
fn fs_lock(handle: os.fd_t, size: u64) !void { fn fs_lock(handle: posix.fd_t, size: u64) !void {
// TODO: Look into using SetFileIoOverlappedRange() for better unbuffered async IO perf // TODO: Look into using SetFileIoOverlappedRange() for better unbuffered async IO perf
// NOTE: Requires SeLockMemoryPrivilege. // NOTE: Requires SeLockMemoryPrivilege.
@ -1096,21 +1097,21 @@ pub const IO = struct {
const LOCKFILE_FAIL_IMMEDIATELY = 0x1; const LOCKFILE_FAIL_IMMEDIATELY = 0x1;
extern "kernel32" fn LockFileEx( extern "kernel32" fn LockFileEx(
hFile: os.windows.HANDLE, hFile: windows.HANDLE,
dwFlags: os.windows.DWORD, dwFlags: windows.DWORD,
dwReserved: os.windows.DWORD, dwReserved: windows.DWORD,
nNumberOfBytesToLockLow: os.windows.DWORD, nNumberOfBytesToLockLow: windows.DWORD,
nNumberOfBytesToLockHigh: os.windows.DWORD, nNumberOfBytesToLockHigh: windows.DWORD,
lpOverlapped: ?*os.windows.OVERLAPPED, lpOverlapped: ?*windows.OVERLAPPED,
) callconv(os.windows.WINAPI) os.windows.BOOL; ) callconv(windows.WINAPI) windows.BOOL;
}; };
// hEvent = null // hEvent = null
// Offset & OffsetHigh = 0 // Offset & OffsetHigh = 0
var lock_overlapped = std.mem.zeroes(os.windows.OVERLAPPED); var lock_overlapped = std.mem.zeroes(windows.OVERLAPPED);
// LOCK_EX | LOCK_NB // LOCK_EX | LOCK_NB
var lock_flags: os.windows.DWORD = 0; var lock_flags: windows.DWORD = 0;
lock_flags |= kernel32.LOCKFILE_EXCLUSIVE_LOCK; lock_flags |= kernel32.LOCKFILE_EXCLUSIVE_LOCK;
lock_flags |= kernel32.LOCKFILE_FAIL_IMMEDIATELY; lock_flags |= kernel32.LOCKFILE_FAIL_IMMEDIATELY;
@ -1123,64 +1124,64 @@ pub const IO = struct {
&lock_overlapped, &lock_overlapped,
); );
if (locked == os.windows.FALSE) { if (locked == windows.FALSE) {
return switch (os.windows.kernel32.GetLastError()) { return switch (windows.kernel32.GetLastError()) {
.IO_PENDING => error.WouldBlock, .IO_PENDING => error.WouldBlock,
else => |err| os.windows.unexpectedError(err), else => |err| windows.unexpectedError(err),
}; };
} }
} }
fn fs_allocate(handle: os.fd_t, size: u64) !void { fn fs_allocate(handle: posix.fd_t, size: u64) !void {
// TODO: Look into using SetFileValidData() instead // TODO: Look into using SetFileValidData() instead
// NOTE: Requires SE_MANAGE_VOLUME_NAME privilege // NOTE: Requires SE_MANAGE_VOLUME_NAME privilege
// Move the file pointer to the start + size // Move the file pointer to the start + size
const seeked = os.windows.kernel32.SetFilePointerEx( const seeked = windows.kernel32.SetFilePointerEx(
handle, handle,
@as(i64, @intCast(size)), @as(i64, @intCast(size)),
null, // no reference to new file pointer null, // no reference to new file pointer
os.windows.FILE_BEGIN, windows.FILE_BEGIN,
); );
if (seeked == os.windows.FALSE) { if (seeked == windows.FALSE) {
return switch (os.windows.kernel32.GetLastError()) { return switch (windows.kernel32.GetLastError()) {
.INVALID_HANDLE => unreachable, .INVALID_HANDLE => unreachable,
.INVALID_PARAMETER => unreachable, .INVALID_PARAMETER => unreachable,
else => |err| os.windows.unexpectedError(err), else => |err| windows.unexpectedError(err),
}; };
} }
// Mark the moved file pointer (start + size) as the physical EOF. // Mark the moved file pointer (start + size) as the physical EOF.
const allocated = os.windows.kernel32.SetEndOfFile(handle); const allocated = windows.kernel32.SetEndOfFile(handle);
if (allocated == os.windows.FALSE) { if (allocated == windows.FALSE) {
const err = os.windows.kernel32.GetLastError(); const err = windows.kernel32.GetLastError();
return os.windows.unexpectedError(err); return windows.unexpectedError(err);
} }
} }
}; };
// TODO: use os.getsockoptError when fixed for windows in stdlib // TODO: use posix.getsockoptError when fixed for windows in stdlib
fn getsockoptError(socket: os.socket_t) IO.ConnectError!void { fn getsockoptError(socket: posix.socket_t) IO.ConnectError!void {
var err_code: u32 = undefined; var err_code: u32 = undefined;
var size: i32 = @sizeOf(u32); var size: i32 = @sizeOf(u32);
const rc = os.windows.ws2_32.getsockopt( const rc = windows.ws2_32.getsockopt(
socket, socket,
os.SOL.SOCKET, posix.SOL.SOCKET,
os.SO.ERROR, posix.SO.ERROR,
std.mem.asBytes(&err_code), std.mem.asBytes(&err_code),
&size, &size,
); );
if (rc != 0) { if (rc != 0) {
switch (os.windows.ws2_32.WSAGetLastError()) { switch (windows.ws2_32.WSAGetLastError()) {
.WSAENETDOWN => return error.NetworkUnreachable, .WSAENETDOWN => return error.NetworkUnreachable,
.WSANOTINITIALISED => unreachable, // WSAStartup() was never called .WSANOTINITIALISED => unreachable, // WSAStartup() was never called
.WSAEFAULT => unreachable, // The address pointed to by optval or optlen is not in a valid part of the process address space. .WSAEFAULT => unreachable, // The address pointed to by optval or optlen is not in a valid part of the process address space.
.WSAEINVAL => unreachable, // The level parameter is unknown or invalid .WSAEINVAL => unreachable, // The level parameter is unknown or invalid
.WSAENOPROTOOPT => unreachable, // The option is unknown at the level indicated. .WSAENOPROTOOPT => unreachable, // The option is unknown at the level indicated.
.WSAENOTSOCK => return error.FileDescriptorNotASocket, .WSAENOTSOCK => return error.FileDescriptorNotASocket,
else => |err| return os.windows.unexpectedWSAError(err), else => |err| return windows.unexpectedWSAError(err),
} }
} }
@ -1188,7 +1189,7 @@ fn getsockoptError(socket: os.socket_t) IO.ConnectError!void {
if (err_code == 0) if (err_code == 0)
return; return;
const ws_err = @as(os.windows.ws2_32.WinsockError, @enumFromInt(@as(u16, @intCast(err_code)))); const ws_err = @as(windows.ws2_32.WinsockError, @enumFromInt(@as(u16, @intCast(err_code))));
return switch (ws_err) { return switch (ws_err) {
.WSAEACCES => error.PermissionDenied, .WSAEACCES => error.PermissionDenied,
.WSAEADDRINUSE => error.AddressInUse, .WSAEADDRINUSE => error.AddressInUse,
@ -1204,6 +1205,6 @@ fn getsockoptError(socket: os.socket_t) IO.ConnectError!void {
.WSAEPROTOTYPE => unreachable, .WSAEPROTOTYPE => unreachable,
.WSAETIMEDOUT => error.ConnectionTimedOut, .WSAETIMEDOUT => error.ConnectionTimedOut,
.WSAECONNRESET => error.ConnectionResetByPeer, .WSAECONNRESET => error.ConnectionResetByPeer,
else => |e| os.windows.unexpectedWSAError(e), else => |e| windows.unexpectedWSAError(e),
}; };
} }

View File

@ -1,4 +1,5 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const Atomic = std.atomic.Value; const Atomic = std.atomic.Value;
const debug = std.debug; const debug = std.debug;
@ -6,7 +7,7 @@ const fifo = std.fifo;
const heap = std.heap; const heap = std.heap;
const log = std.log.scoped(.server); const log = std.log.scoped(.server);
const mem = std.mem; const mem = std.mem;
const os = std.os; const posix = std.posix;
const Thread = std.Thread; const Thread = std.Thread;
const IO = @import("io").IO; const IO = @import("io").IO;
@ -23,7 +24,7 @@ pub const Config = struct {
pub const recv_buf_len = 1024; pub const recv_buf_len = 1024;
}; };
const Queue = fifo.LinearFifo(os.socket_t, .{ .Static = Config.accept_buf_len }); const Queue = fifo.LinearFifo(posix.socket_t, .{ .Static = Config.accept_buf_len });
var running = Atomic(bool).init(true); var running = Atomic(bool).init(true);
@ -34,8 +35,10 @@ const Acceptor = struct {
}; };
pub fn main() !void { pub fn main() !void {
if (builtin.target.isBSD() or builtin.target.os.tag == .linux) {
// Handle OS signals for graceful shutdown. // Handle OS signals for graceful shutdown.
try addSignalHandlers(); try addSignalHandlers();
}
// Cross-platform IO setup. // Cross-platform IO setup.
var io = try IO.init(Config.io_entries, 0); var io = try IO.init(Config.io_entries, 0);
@ -43,11 +46,11 @@ pub fn main() !void {
// Listener setup // Listener setup
const address = try std.net.Address.parseIp4(Config.server_ip, Config.server_port); const address = try std.net.Address.parseIp4(Config.server_ip, Config.server_port);
const listener = try io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); const listener = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer os.closeSocket(listener); defer posix.close(listener);
try os.setsockopt(listener, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try os.bind(listener, &address.any, address.getOsSockLen()); try posix.bind(listener, &address.any, address.getOsSockLen());
try os.listen(listener, Config.kernel_backlog); try posix.listen(listener, Config.kernel_backlog);
log.info("server listening on IP {s} port {}. CTRL+C to shutdown.", .{ Config.server_ip, Config.server_port }); log.info("server listening on IP {s} port {}. CTRL+C to shutdown.", .{ Config.server_ip, Config.server_port });
@ -67,7 +70,7 @@ pub fn main() !void {
.queue = &queue, .queue = &queue,
}; };
while (running.load(.Monotonic)) { while (running.load(.monotonic)) {
// Start accepting. // Start accepting.
var acceptor_completion: IO.Completion = undefined; var acceptor_completion: IO.Completion = undefined;
io.accept(*Acceptor, &acceptor, acceptCallback, &acceptor_completion, listener); io.accept(*Acceptor, &acceptor, acceptCallback, &acceptor_completion, listener);
@ -83,7 +86,7 @@ pub fn main() !void {
fn acceptCallback( fn acceptCallback(
acceptor_ptr: *Acceptor, acceptor_ptr: *Acceptor,
completion: *IO.Completion, completion: *IO.Completion,
result: IO.AcceptError!os.socket_t, result: IO.AcceptError!posix.socket_t,
) void { ) void {
_ = completion; _ = completion;
const accepted_sock = result catch @panic("accept error"); const accepted_sock = result catch @panic("accept error");
@ -111,7 +114,7 @@ const Client = struct {
completion: IO.Completion, completion: IO.Completion,
io: *IO, io: *IO,
recv_buf: [Config.recv_buf_len]u8, recv_buf: [Config.recv_buf_len]u8,
socket: os.socket_t, socket: posix.socket_t,
thread_id: Thread.Id, thread_id: Thread.Id,
}; };
@ -127,9 +130,9 @@ fn handleClient(queue_mutex: *Thread.Mutex, queue: *Queue) !void {
var fba = heap.FixedBufferAllocator.init(&client_buf); var fba = heap.FixedBufferAllocator.init(&client_buf);
const allocator = fba.allocator(); const allocator = fba.allocator();
while (running.load(.Monotonic)) { while (running.load(.monotonic)) {
// Get next accepted client socket. // Get next accepted client socket.
const maybe_socket: ?os.socket_t = blk: { const maybe_socket: ?posix.socket_t = blk: {
queue_mutex.lock(); queue_mutex.lock();
defer queue_mutex.unlock(); defer queue_mutex.unlock();
break :blk queue.readItem(); break :blk queue.readItem();
@ -239,31 +242,31 @@ fn closeCallback(
fn addSignalHandlers() !void { fn addSignalHandlers() !void {
// Ignore broken pipes // Ignore broken pipes
{ {
var act = os.Sigaction{ var act = posix.Sigaction{
.handler = .{ .handler = .{
.handler = os.SIG.IGN, .handler = posix.SIG.IGN,
}, },
.mask = os.empty_sigset, .mask = posix.empty_sigset,
.flags = 0, .flags = 0,
}; };
try os.sigaction(os.SIG.PIPE, &act, null); try posix.sigaction(posix.SIG.PIPE, &act, null);
} }
// Catch SIGINT/SIGTERM for proper shutdown // Catch SIGINT/SIGTERM for proper shutdown
{ {
var act = os.Sigaction{ var act = posix.Sigaction{
.handler = .{ .handler = .{
.handler = struct { .handler = struct {
fn wrapper(sig: c_int) callconv(.C) void { fn wrapper(sig: c_int) callconv(.C) void {
log.info("Caught signal {d}; Shutting down...", .{sig}); log.info("Caught signal {d}; Shutting down...", .{sig});
running.store(false, .Release); running.store(false, .release);
} }
}.wrapper, }.wrapper,
}, },
.mask = os.empty_sigset, .mask = posix.empty_sigset,
.flags = 0, .flags = 0,
}; };
try os.sigaction(os.SIG.TERM, &act, null); try posix.sigaction(posix.SIG.TERM, &act, null);
try os.sigaction(os.SIG.INT, &act, null); try posix.sigaction(posix.SIG.INT, &act, null);
} }
} }

View File

@ -60,7 +60,7 @@ pub inline fn copy_left(
if (!disjoint_slices(T, T, target, source)) { if (!disjoint_slices(T, T, target, source)) {
assert(@intFromPtr(target.ptr) < @intFromPtr(source.ptr)); assert(@intFromPtr(target.ptr) < @intFromPtr(source.ptr));
} }
std.mem.copy(T, target, source); @memcpy(target[0..source.len], source);
} }
test "copy_left" { test "copy_left" {
@ -291,8 +291,8 @@ pub fn no_padding(comptime T: type) bool {
.Array => |info| return no_padding(info.child), .Array => |info| return no_padding(info.child),
.Struct => |info| { .Struct => |info| {
switch (info.layout) { switch (info.layout) {
.Auto => return false, .auto => return false,
.Extern => { .@"extern" => {
for (info.fields) |field| { for (info.fields) |field| {
if (!no_padding(field.type)) return false; if (!no_padding(field.type)) return false;
} }
@ -322,7 +322,7 @@ pub fn no_padding(comptime T: type) bool {
} }
return offset == @sizeOf(T); return offset == @sizeOf(T);
}, },
.Packed => return @bitSizeOf(T) == 8 * @sizeOf(T), .@"packed" => return @bitSizeOf(T) == 8 * @sizeOf(T),
} }
}, },
.Enum => |info| { .Enum => |info| {

View File

@ -1,6 +1,6 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin"); const builtin = @import("builtin");
const os = std.os; const posix = std.posix;
const testing = std.testing; const testing = std.testing;
const assert = std.debug.assert; const assert = std.debug.assert;
@ -13,7 +13,7 @@ test "write/read/close" {
io: IO, io: IO,
done: bool = false, done: bool = false,
fd: os.fd_t, fd: posix.fd_t,
write_buf: [20]u8 = [_]u8{97} ** 20, write_buf: [20]u8 = [_]u8{97} ** 20,
read_buf: [20]u8 = [_]u8{98} ** 20, read_buf: [20]u8 = [_]u8{98} ** 20,
@ -87,10 +87,10 @@ test "accept/connect/send/receive" {
io: *IO, io: *IO,
done: bool = false, done: bool = false,
server: os.socket_t, server: posix.socket_t,
client: os.socket_t, client: posix.socket_t,
accepted_sock: os.socket_t = undefined, accepted_sock: posix.socket_t = undefined,
send_buf: [10]u8 = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 }, send_buf: [10]u8 = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 },
recv_buf: [5]u8 = [_]u8{ 0, 1, 0, 1, 0 }, recv_buf: [5]u8 = [_]u8{ 0, 1, 0, 1, 0 },
@ -104,24 +104,24 @@ test "accept/connect/send/receive" {
const address = try std.net.Address.parseIp4("127.0.0.1", 0); const address = try std.net.Address.parseIp4("127.0.0.1", 0);
const kernel_backlog = 1; const kernel_backlog = 1;
const server = try io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); const server = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer os.closeSocket(server); defer posix.close(server);
const client = try io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); const client = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer os.closeSocket(client); defer posix.close(client);
try os.setsockopt( try posix.setsockopt(
server, server,
os.SOL.SOCKET, posix.SOL.SOCKET,
os.SO.REUSEADDR, posix.SO.REUSEADDR,
&std.mem.toBytes(@as(c_int, 1)), &std.mem.toBytes(@as(c_int, 1)),
); );
try os.bind(server, &address.any, address.getOsSockLen()); try posix.bind(server, &address.any, address.getOsSockLen());
try os.listen(server, kernel_backlog); try posix.listen(server, kernel_backlog);
var client_address = std.net.Address.initIp4(undefined, undefined); var client_address = std.net.Address.initIp4(undefined, undefined);
var client_address_len = client_address.getOsSockLen(); var client_address_len = client_address.getOsSockLen();
try os.getsockname(server, &client_address.any, &client_address_len); try posix.getsockname(server, &client_address.any, &client_address_len);
var self: Context = .{ var self: Context = .{
.io = &io, .io = &io,
@ -180,7 +180,7 @@ test "accept/connect/send/receive" {
fn accept_callback( fn accept_callback(
self: *Context, self: *Context,
completion: *IO.Completion, completion: *IO.Completion,
result: IO.AcceptError!os.socket_t, result: IO.AcceptError!posix.socket_t,
) void { ) void {
self.accepted_sock = result catch @panic("accept error"); self.accepted_sock = result catch @panic("accept error");
self.io.recv( self.io.recv(
@ -314,7 +314,7 @@ test "tick to wait" {
const Context = @This(); const Context = @This();
io: IO, io: IO,
accepted: os.socket_t = IO.INVALID_SOCKET, accepted: posix.socket_t = IO.INVALID_SOCKET,
connected: bool = false, connected: bool = false,
received: bool = false, received: bool = false,
@ -325,24 +325,24 @@ test "tick to wait" {
const address = try std.net.Address.parseIp4("127.0.0.1", 0); const address = try std.net.Address.parseIp4("127.0.0.1", 0);
const kernel_backlog = 1; const kernel_backlog = 1;
const server = try self.io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); const server = try self.io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer os.closeSocket(server); defer posix.close(server);
try os.setsockopt( try posix.setsockopt(
server, server,
os.SOL.SOCKET, posix.SOL.SOCKET,
os.SO.REUSEADDR, posix.SO.REUSEADDR,
&std.mem.toBytes(@as(c_int, 1)), &std.mem.toBytes(@as(c_int, 1)),
); );
try os.bind(server, &address.any, address.getOsSockLen()); try posix.bind(server, &address.any, address.getOsSockLen());
try os.listen(server, kernel_backlog); try posix.listen(server, kernel_backlog);
var client_address = std.net.Address.initIp4(undefined, undefined); var client_address = std.net.Address.initIp4(undefined, undefined);
var client_address_len = client_address.getOsSockLen(); var client_address_len = client_address.getOsSockLen();
try os.getsockname(server, &client_address.any, &client_address_len); try posix.getsockname(server, &client_address.any, &client_address_len);
const client = try self.io.open_socket(client_address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); const client = try self.io.open_socket(client_address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer os.closeSocket(client); defer posix.close(client);
// Start the accept // Start the accept
var server_completion: IO.Completion = undefined; var server_completion: IO.Completion = undefined;
@ -368,7 +368,7 @@ test "tick to wait" {
assert(self.connected); assert(self.connected);
assert(self.accepted != IO.INVALID_SOCKET); assert(self.accepted != IO.INVALID_SOCKET);
defer os.closeSocket(self.accepted); defer posix.close(self.accepted);
// Start receiving on the client // Start receiving on the client
var recv_completion: IO.Completion = undefined; var recv_completion: IO.Completion = undefined;
@ -410,7 +410,7 @@ test "tick to wait" {
fn accept_callback( fn accept_callback(
self: *Context, self: *Context,
completion: *IO.Completion, completion: *IO.Completion,
result: IO.AcceptError!os.socket_t, result: IO.AcceptError!posix.socket_t,
) void { ) void {
_ = completion; _ = completion;
@ -442,15 +442,15 @@ test "tick to wait" {
self.received = true; self.received = true;
} }
// TODO: use os.send() instead when it gets fixed for windows // TODO: use posix.send() instead when it gets fixed for windows
fn os_send(sock: os.socket_t, buf: []const u8, flags: u32) !usize { fn os_send(sock: posix.socket_t, buf: []const u8, flags: u32) !usize {
if (builtin.target.os.tag != .windows) { if (builtin.target.os.tag != .windows) {
return os.send(sock, buf, flags); return posix.send(sock, buf, flags);
} }
const rc = os.windows.sendto(sock, buf.ptr, buf.len, flags, null, 0); const rc = posix.windows.sendto(sock, buf.ptr, buf.len, flags, null, 0);
if (rc == os.windows.ws2_32.SOCKET_ERROR) { if (rc == posix.windows.ws2_32.SOCKET_ERROR) {
switch (os.windows.ws2_32.WSAGetLastError()) { switch (posix.windows.ws2_32.WSAGetLastError()) {
.WSAEACCES => return error.AccessDenied, .WSAEACCES => return error.AccessDenied,
.WSAEADDRNOTAVAIL => return error.AddressNotAvailable, .WSAEADDRNOTAVAIL => return error.AddressNotAvailable,
.WSAECONNRESET => return error.ConnectionResetByPeer, .WSAECONNRESET => return error.ConnectionResetByPeer,
@ -470,7 +470,7 @@ test "tick to wait" {
.WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH. .WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH.
.WSAEWOULDBLOCK => return error.WouldBlock, .WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function. .WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function.
else => |err| return os.windows.unexpectedWSAError(err), else => |err| return posix.windows.unexpectedWSAError(err),
} }
} else { } else {
return @as(usize, @intCast(rc)); return @as(usize, @intCast(rc));
@ -490,7 +490,7 @@ test "pipe data over socket" {
const Context = @This(); const Context = @This();
const Socket = struct { const Socket = struct {
fd: os.socket_t = IO.INVALID_SOCKET, fd: posix.socket_t = IO.INVALID_SOCKET,
completion: IO.Completion = undefined, completion: IO.Completion = undefined,
}; };
const Pipe = struct { const Pipe = struct {
@ -514,23 +514,23 @@ test "pipe data over socket" {
}; };
defer self.io.deinit(); defer self.io.deinit();
self.server.fd = try self.io.open_socket(os.AF.INET, os.SOCK.STREAM, os.IPPROTO.TCP); self.server.fd = try self.io.open_socket(posix.AF.INET, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer os.closeSocket(self.server.fd); defer posix.close(self.server.fd);
const address = try std.net.Address.parseIp4("127.0.0.1", 0); const address = try std.net.Address.parseIp4("127.0.0.1", 0);
try os.setsockopt( try posix.setsockopt(
self.server.fd, self.server.fd,
os.SOL.SOCKET, posix.SOL.SOCKET,
os.SO.REUSEADDR, posix.SO.REUSEADDR,
&std.mem.toBytes(@as(c_int, 1)), &std.mem.toBytes(@as(c_int, 1)),
); );
try os.bind(self.server.fd, &address.any, address.getOsSockLen()); try posix.bind(self.server.fd, &address.any, address.getOsSockLen());
try os.listen(self.server.fd, 1); try posix.listen(self.server.fd, 1);
var client_address = std.net.Address.initIp4(undefined, undefined); var client_address = std.net.Address.initIp4(undefined, undefined);
var client_address_len = client_address.getOsSockLen(); var client_address_len = client_address.getOsSockLen();
try os.getsockname(self.server.fd, &client_address.any, &client_address_len); try posix.getsockname(self.server.fd, &client_address.any, &client_address_len);
self.io.accept( self.io.accept(
*Context, *Context,
@ -540,8 +540,8 @@ test "pipe data over socket" {
self.server.fd, self.server.fd,
); );
self.tx.socket.fd = try self.io.open_socket(os.AF.INET, os.SOCK.STREAM, os.IPPROTO.TCP); self.tx.socket.fd = try self.io.open_socket(posix.AF.INET, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer os.closeSocket(self.tx.socket.fd); defer posix.close(self.tx.socket.fd);
self.io.connect( self.io.connect(
*Context, *Context,
@ -565,7 +565,7 @@ test "pipe data over socket" {
try testing.expect(self.server.fd != IO.INVALID_SOCKET); try testing.expect(self.server.fd != IO.INVALID_SOCKET);
try testing.expect(self.tx.socket.fd != IO.INVALID_SOCKET); try testing.expect(self.tx.socket.fd != IO.INVALID_SOCKET);
try testing.expect(self.rx.socket.fd != IO.INVALID_SOCKET); try testing.expect(self.rx.socket.fd != IO.INVALID_SOCKET);
os.closeSocket(self.rx.socket.fd); posix.close(self.rx.socket.fd);
try testing.expectEqual(self.tx.transferred, buffer_size); try testing.expectEqual(self.tx.transferred, buffer_size);
try testing.expectEqual(self.rx.transferred, buffer_size); try testing.expectEqual(self.rx.transferred, buffer_size);
@ -575,7 +575,7 @@ test "pipe data over socket" {
fn on_accept( fn on_accept(
self: *Context, self: *Context,
completion: *IO.Completion, completion: *IO.Completion,
result: IO.AcceptError!os.socket_t, result: IO.AcceptError!posix.socket_t,
) void { ) void {
assert(self.rx.socket.fd == IO.INVALID_SOCKET); assert(self.rx.socket.fd == IO.INVALID_SOCKET);
assert(&self.server.completion == completion); assert(&self.server.completion == completion);

View File

@ -2,6 +2,7 @@ const std = @import("std");
const builtin = @import("builtin"); const builtin = @import("builtin");
const os = std.os; const os = std.os;
const posix = std.posix;
const assert = std.debug.assert; const assert = std.debug.assert;
const is_darwin = builtin.target.os.tag.isDarwin(); const is_darwin = builtin.target.os.tag.isDarwin();
const is_windows = builtin.target.os.tag == .windows; const is_windows = builtin.target.os.tag == .windows;
@ -48,8 +49,8 @@ pub const Time = struct {
// https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.c.auto.html // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.c.auto.html
if (is_darwin) { if (is_darwin) {
const darwin = struct { const darwin = struct {
const mach_timebase_info_t = os.darwin.mach_timebase_info_data; const mach_timebase_info_t = std.c.mach_timebase_info_data;
extern "c" fn mach_timebase_info(info: *mach_timebase_info_t) os.darwin.kern_return_t; extern "c" fn mach_timebase_info(info: *mach_timebase_info_t) std.c.kern_return_t;
extern "c" fn mach_continuous_time() u64; extern "c" fn mach_continuous_time() u64;
}; };
@ -67,8 +68,8 @@ pub const Time = struct {
// CLOCK_BOOTTIME is the same as CLOCK_MONOTONIC but includes elapsed time during a suspend. // CLOCK_BOOTTIME is the same as CLOCK_MONOTONIC but includes elapsed time during a suspend.
// For more detail and why CLOCK_MONOTONIC_RAW is even worse than CLOCK_MONOTONIC, // For more detail and why CLOCK_MONOTONIC_RAW is even worse than CLOCK_MONOTONIC,
// see https://github.com/ziglang/zig/pull/933#discussion_r656021295. // see https://github.com/ziglang/zig/pull/933#discussion_r656021295.
var ts: os.timespec = undefined; var ts: posix.timespec = undefined;
os.clock_gettime(os.CLOCK.BOOTTIME, &ts) catch @panic("CLOCK_BOOTTIME required"); posix.clock_gettime(posix.CLOCK.BOOTTIME, &ts) catch @panic("CLOCK_BOOTTIME required");
break :blk @as(u64, @intCast(ts.tv_sec)) * std.time.ns_per_s + @as(u64, @intCast(ts.tv_nsec)); break :blk @as(u64, @intCast(ts.tv_sec)) * std.time.ns_per_s + @as(u64, @intCast(ts.tv_nsec));
}; };