diff --git a/build.zig.zon b/build.zig.zon index 72e92e7..bde65bb 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -1,7 +1,7 @@ .{ .name = "iofthetiger", .version = "0.1.0", - .minimum_zig_version = "0.12.0-dev.2334+aef1da163", + .minimum_zig_version = "0.12.0-dev.3480+9dac8db2d", .paths = .{ "build.zig", "build.zig.zon", diff --git a/src/io/darwin.zig b/src/io/darwin.zig index 8e4058d..a07938e 100644 --- a/src/io/darwin.zig +++ b/src/io/darwin.zig @@ -1,5 +1,6 @@ const std = @import("std"); const os = std.os; +const posix = std.posix; const mem = std.mem; const assert = std.debug.assert; const log = std.log.scoped(.io); @@ -12,7 +13,7 @@ const sector_size = 4096; const direct_io = true; pub const IO = struct { - kq: os.fd_t, + kq: posix.fd_t, time: Time = .{}, io_inflight: usize = 0, timeouts: FIFO(Completion) = .{ .name = "io_timeouts" }, @@ -23,14 +24,14 @@ pub const IO = struct { _ = entries; _ = flags; - const kq = try os.kqueue(); + const kq = try posix.kqueue(); assert(kq > -1); return IO{ .kq = kq }; } pub fn deinit(self: *IO) void { assert(self.kq > -1); - os.close(self.kq); + posix.close(self.kq); self.kq = -1; } @@ -76,7 +77,7 @@ pub const IO = struct { fn flush(self: *IO, wait_for_completions: bool) !void { var io_pending = self.io_pending.peek(); - var events: [256]os.Kevent = undefined; + var events: [256]posix.Kevent = undefined; // Check timeouts and fill events with completions in io_pending // (they will be submitted through kevent). @@ -87,7 +88,7 @@ pub const IO = struct { // Only call kevent() if we need to submit io events or if we need to wait for completions. if (change_events > 0 or self.completed.empty()) { // Zero timeouts for kevent() implies a non-blocking poll - var ts = std.mem.zeroes(os.timespec); + var ts = std.mem.zeroes(posix.timespec); // We need to wait (not poll) on kevent if there's nothing to submit or complete. // We should never wait indefinitely (timeout_ptr = null for kevent) given: @@ -103,7 +104,7 @@ pub const IO = struct { } } - const new_events = try os.kevent( + const new_events = try posix.kevent( self.kq, events[0..change_events], events[0..events.len], @@ -133,25 +134,25 @@ pub const IO = struct { } } - fn flush_io(_: *IO, events: []os.Kevent, io_pending_top: *?*Completion) usize { + fn flush_io(_: *IO, events: []posix.Kevent, io_pending_top: *?*Completion) usize { for (events, 0..) |*event, flushed| { const completion = io_pending_top.* orelse return flushed; io_pending_top.* = completion.next; const event_info = switch (completion.operation) { - .accept => |op| [2]c_int{ op.socket, os.system.EVFILT_READ }, - .connect => |op| [2]c_int{ op.socket, os.system.EVFILT_WRITE }, - .read => |op| [2]c_int{ op.fd, os.system.EVFILT_READ }, - .write => |op| [2]c_int{ op.fd, os.system.EVFILT_WRITE }, - .recv => |op| [2]c_int{ op.socket, os.system.EVFILT_READ }, - .send => |op| [2]c_int{ op.socket, os.system.EVFILT_WRITE }, + .accept => |op| [2]c_int{ op.socket, posix.system.EVFILT_READ }, + .connect => |op| [2]c_int{ op.socket, posix.system.EVFILT_WRITE }, + .read => |op| [2]c_int{ op.fd, posix.system.EVFILT_READ }, + .write => |op| [2]c_int{ op.fd, posix.system.EVFILT_WRITE }, + .recv => |op| [2]c_int{ op.socket, posix.system.EVFILT_READ }, + .send => |op| [2]c_int{ op.socket, posix.system.EVFILT_WRITE }, else => @panic("invalid completion operation queued for io"), }; event.* = .{ .ident = @as(u32, @intCast(event_info[0])), .filter = @as(i16, @intCast(event_info[1])), - .flags = os.system.EV_ADD | os.system.EV_ENABLE | os.system.EV_ONESHOT, + .flags = posix.system.EV_ADD | posix.system.EV_ENABLE | posix.system.EV_ONESHOT, .fflags = 0, .data = 0, .udata = @intFromPtr(completion), @@ -198,29 +199,29 @@ pub const IO = struct { const Operation = union(enum) { accept: struct { - socket: os.socket_t, + socket: posix.socket_t, }, close: struct { - fd: os.fd_t, + fd: posix.fd_t, }, connect: struct { - socket: os.socket_t, + socket: posix.socket_t, address: std.net.Address, initiated: bool, }, read: struct { - fd: os.fd_t, + fd: posix.fd_t, buf: [*]u8, len: u32, offset: u64, }, recv: struct { - socket: os.socket_t, + socket: posix.socket_t, buf: [*]u8, len: u32, }, send: struct { - socket: os.socket_t, + socket: posix.socket_t, buf: [*]const u8, len: u32, }, @@ -228,7 +229,7 @@ pub const IO = struct { expires: u64, }, write: struct { - fd: os.fd_t, + fd: posix.fd_t, buf: [*]const u8, len: u32, offset: u64, @@ -288,7 +289,7 @@ pub const IO = struct { } } - pub const AcceptError = os.AcceptError || os.SetSockOptError; + pub const AcceptError = posix.AcceptError || posix.SetSockOptError; pub fn accept( self: *IO, @@ -297,10 +298,10 @@ pub const IO = struct { comptime callback: fn ( context: Context, completion: *Completion, - result: AcceptError!os.socket_t, + result: AcceptError!posix.socket_t, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, ) void { self.submit( context, @@ -311,21 +312,21 @@ pub const IO = struct { .socket = socket, }, struct { - fn do_operation(op: anytype) AcceptError!os.socket_t { - const fd = try os.accept( + fn do_operation(op: anytype) AcceptError!posix.socket_t { + const fd = try posix.accept( op.socket, null, null, - os.SOCK.NONBLOCK | os.SOCK.CLOEXEC, + posix.SOCK.NONBLOCK | posix.SOCK.CLOEXEC, ); - errdefer os.close(fd); + errdefer posix.close(fd); - // Darwin doesn't support os.MSG_NOSIGNAL to avoid getting SIGPIPE on socket send(). + // Darwin doesn't support posix.MSG_NOSIGNAL to avoid getting SIGPIPE on socket send(). // Instead, it uses the SO_NOSIGPIPE socket option which does the same for all send()s. - os.setsockopt( + posix.setsockopt( fd, - os.SOL.SOCKET, - os.SO.NOSIGPIPE, + posix.SOL.SOCKET, + posix.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1)), ) catch |err| return switch (err) { error.TimeoutTooBig => unreachable, @@ -346,7 +347,7 @@ pub const IO = struct { DiskQuota, InputOutput, NoSpaceLeft, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn close( self: *IO, @@ -358,7 +359,7 @@ pub const IO = struct { result: CloseError!void, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, ) void { self.submit( context, @@ -370,19 +371,19 @@ pub const IO = struct { }, struct { fn do_operation(op: anytype) CloseError!void { - return switch (os.errno(os.system.close(op.fd))) { + return switch (posix.errno(posix.system.close(op.fd))) { .SUCCESS => {}, .BADF => error.FileDescriptorInvalid, .INTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 .IO => error.InputOutput, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; } }, ); } - pub const ConnectError = os.ConnectError; + pub const ConnectError = posix.ConnectError; pub fn connect( self: *IO, @@ -394,7 +395,7 @@ pub const IO = struct { result: ConnectError!void, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, address: std.net.Address, ) void { self.submit( @@ -412,8 +413,8 @@ pub const IO = struct { // Don't call connect after being rescheduled by io_pending as it gives EISCONN. // Instead, check the socket error to see if has been connected successfully. const result = switch (op.initiated) { - true => os.getsockoptError(op.socket), - else => os.connect(op.socket, &op.address.any, op.address.getOsSockLen()), + true => posix.getsockoptError(op.socket), + else => posix.connect(op.socket, &op.address.any, op.address.getOsSockLen()), }; op.initiated = true; @@ -433,7 +434,7 @@ pub const IO = struct { SystemResources, Unseekable, ConnectionTimedOut, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn read( self: *IO, @@ -445,7 +446,7 @@ pub const IO = struct { result: ReadError!usize, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, buffer: []u8, offset: u64, ) void { @@ -463,13 +464,13 @@ pub const IO = struct { struct { fn do_operation(op: anytype) ReadError!usize { while (true) { - const rc = os.system.pread( + const rc = posix.system.pread( op.fd, op.buf, op.len, @as(isize, @bitCast(op.offset)), ); - return switch (os.errno(rc)) { + return switch (posix.errno(rc)) { .SUCCESS => @as(usize, @intCast(rc)), .INTR => continue, .AGAIN => error.WouldBlock, @@ -485,7 +486,7 @@ pub const IO = struct { .OVERFLOW => error.Unseekable, .SPIPE => error.Unseekable, .TIMEDOUT => error.ConnectionTimedOut, - else => |err| os.unexpectedErrno(err), + else => |err| posix.unexpectedErrno(err), }; } } @@ -493,7 +494,7 @@ pub const IO = struct { ); } - pub const RecvError = os.RecvFromError; + pub const RecvError = posix.RecvFromError; pub fn recv( self: *IO, @@ -505,7 +506,7 @@ pub const IO = struct { result: RecvError!usize, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, buffer: []u8, ) void { self.submit( @@ -520,13 +521,13 @@ pub const IO = struct { }, struct { fn do_operation(op: anytype) RecvError!usize { - return os.recv(op.socket, op.buf[0..op.len], 0); + return posix.recv(op.socket, op.buf[0..op.len], 0); } }, ); } - pub const SendError = os.SendError; + pub const SendError = posix.SendError; pub fn send( self: *IO, @@ -538,7 +539,7 @@ pub const IO = struct { result: SendError!usize, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, buffer: []const u8, ) void { self.submit( @@ -553,13 +554,13 @@ pub const IO = struct { }, struct { fn do_operation(op: anytype) SendError!usize { - return os.send(op.socket, op.buf[0..op.len], 0); + return posix.send(op.socket, op.buf[0..op.len], 0); } }, ); } - pub const TimeoutError = error{Canceled} || os.UnexpectedError; + pub const TimeoutError = error{Canceled} || posix.UnexpectedError; pub fn timeout( self: *IO, @@ -608,7 +609,7 @@ pub const IO = struct { ); } - pub const WriteError = os.PWriteError; + pub const WriteError = posix.PWriteError; pub fn write( self: *IO, @@ -620,7 +621,7 @@ pub const IO = struct { result: WriteError!usize, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, buffer: []const u8, offset: u64, ) void { @@ -637,7 +638,7 @@ pub const IO = struct { }, struct { fn do_operation(op: anytype) WriteError!usize { - return os.pwrite(op.fd, op.buf[0..op.len], op.offset); + return posix.pwrite(op.fd, op.buf[0..op.len], op.offset); } }, ); @@ -646,23 +647,23 @@ pub const IO = struct { pub const INVALID_SOCKET = -1; /// Creates a socket that can be used for async operations with the IO instance. - pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t { + pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !posix.socket_t { _ = self; - const fd = try os.socket(family, sock_type | os.SOCK.NONBLOCK, protocol); - errdefer os.closeSocket(fd); + const fd = try posix.socket(family, sock_type | posix.SOCK.NONBLOCK, protocol); + errdefer posix.close(fd); - // darwin doesn't support os.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE. - try os.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1))); + // darwin doesn't support posix.MSG_NOSIGNAL, but instead a socket option to avoid SIGPIPE. + try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1))); return fd; } /// Opens a directory with read only access. - pub fn open_dir(dir_path: []const u8) !os.fd_t { - return os.open(dir_path, os.O.CLOEXEC | os.O.RDONLY, 0); + pub fn open_dir(dir_path: []const u8) !posix.fd_t { + return posix.open(dir_path, posix.O.CLOEXEC | posix.O.RDONLY, 0); } - pub const INVALID_FILE: os.fd_t = -1; + pub const INVALID_FILE: posix.fd_t = -1; /// Opens or creates a journal file: /// - For reading and writing. @@ -673,11 +674,11 @@ pub const IO = struct { /// The caller is responsible for ensuring that the parent directory inode is durable. /// - Verifies that the file size matches the expected file size before returning. pub fn open_file( - dir_fd: os.fd_t, + dir_fd: posix.fd_t, relative_path: []const u8, size: u64, method: enum { create, create_or_open, open }, - ) !os.fd_t { + ) !posix.fd_t { assert(relative_path.len > 0); assert(size % sector_size == 0); @@ -686,21 +687,21 @@ pub const IO = struct { // Opening with O_DSYNC is essential for both durability and correctness. // O_DSYNC enables us to omit fsync() calls in the data plane, since we sync to the disk on every write. - var flags: u32 = os.O.CLOEXEC | os.O.RDWR | os.O.DSYNC; - var mode: os.mode_t = 0; + var flags: u32 = posix.O.CLOEXEC | posix.O.RDWR | posix.O.DSYNC; + var mode: posix.mode_t = 0; // TODO Document this and investigate whether this is in fact correct to set here. - if (@hasDecl(os.O, "LARGEFILE")) flags |= os.O.LARGEFILE; + if (@hasDecl(posix.O, "LARGEFILE")) flags |= posix.O.LARGEFILE; switch (method) { .create => { - flags |= os.O.CREAT; - flags |= os.O.EXCL; + flags |= posix.O.CREAT; + flags |= posix.O.EXCL; mode = 0o666; log.info("creating \"{s}\"...", .{relative_path}); }, .create_or_open => { - flags |= os.O.CREAT; + flags |= posix.O.CREAT; mode = 0o666; log.info("opening or creating \"{s}\"...", .{relative_path}); }, @@ -710,25 +711,25 @@ pub const IO = struct { } // This is critical as we rely on O_DSYNC for fsync() whenever we write to the file: - assert((flags & os.O.DSYNC) > 0); + assert((flags & posix.O.DSYNC) > 0); // Be careful with openat(2): "If pathname is absolute, then dirfd is ignored." (man page) assert(!std.fs.path.isAbsolute(relative_path)); - const fd = try os.openat(dir_fd, relative_path, flags, mode); + const fd = try posix.openat(dir_fd, relative_path, flags, mode); // TODO Return a proper error message when the path exists or does not exist (init/start). - errdefer os.close(fd); + errdefer posix.close(fd); // TODO Check that the file is actually a file. // On darwin assume that Direct I/O is always supported. // Use F_NOCACHE to disable the page cache as O_DIRECT doesn't exist. if (direct_io) { - _ = try os.fcntl(fd, os.F.NOCACHE, 1); + _ = try posix.fcntl(fd, posix.F.NOCACHE, 1); } // Obtain an advisory exclusive lock that works only if all processes actually use flock(). // LOCK_NB means that we want to fail the lock without waiting if another process has it. - os.flock(fd, os.LOCK.EX | os.LOCK.NB) catch |err| switch (err) { + posix.flock(fd, posix.LOCK.EX | posix.LOCK.NB) catch |err| switch (err) { error.WouldBlock => @panic("another process holds the data file lock"), else => return err, }; @@ -750,7 +751,7 @@ pub const IO = struct { try fs_sync(dir_fd); // TODO Document that `size` is now `data_file_size_min` from `main.zig`. - const stat = try os.fstat(fd); + const stat = try posix.fstat(fd); if (stat.size < size) @panic("data file inode size was truncated or corrupted"); return fd; @@ -758,13 +759,13 @@ pub const IO = struct { /// Darwin's fsync() syscall does not flush past the disk cache. We must use F_FULLFSYNC instead. /// https://twitter.com/TigerBeetleDB/status/1422491736224436225 - fn fs_sync(fd: os.fd_t) !void { - _ = os.fcntl(fd, os.F.FULLFSYNC, 1) catch return os.fsync(fd); + fn fs_sync(fd: posix.fd_t) !void { + _ = posix.fcntl(fd, posix.F.FULLFSYNC, 1) catch return posix.fsync(fd); } /// Allocates a file contiguously using fallocate() if supported. /// Alternatively, writes to the last sector so that at least the file size is correct. - fn fs_allocate(fd: os.fd_t, size: u64) !void { + fn fs_allocate(fd: posix.fd_t, size: u64) !void { log.info("allocating {}...", .{std.fmt.fmtIntSizeBin(size)}); // Darwin doesn't have fallocate() but we can simulate it using fcntl()s. @@ -779,27 +780,27 @@ pub const IO = struct { const fstore_t = extern struct { fst_flags: c_uint, fst_posmode: c_int, - fst_offset: os.off_t, - fst_length: os.off_t, - fst_bytesalloc: os.off_t, + fst_offset: posix.off_t, + fst_length: posix.off_t, + fst_bytesalloc: posix.off_t, }; var store = fstore_t{ .fst_flags = F_ALLOCATECONTIG | F_ALLOCATEALL, .fst_posmode = F_PEOFPOSMODE, .fst_offset = 0, - .fst_length = @as(os.off_t, @intCast(size)), + .fst_length = @as(posix.off_t, @intCast(size)), .fst_bytesalloc = 0, }; // Try to pre-allocate contiguous space and fall back to default non-contiguous. - var res = os.system.fcntl(fd, os.F.PREALLOCATE, @intFromPtr(&store)); - if (os.errno(res) != .SUCCESS) { + var res = posix.system.fcntl(fd, posix.F.PREALLOCATE, @intFromPtr(&store)); + if (posix.errno(res) != .SUCCESS) { store.fst_flags = F_ALLOCATEALL; - res = os.system.fcntl(fd, os.F.PREALLOCATE, @intFromPtr(&store)); + res = posix.system.fcntl(fd, posix.F.PREALLOCATE, @intFromPtr(&store)); } - switch (os.errno(res)) { + switch (posix.errno(res)) { .SUCCESS => {}, .ACCES => unreachable, // F_SETLK or F_SETSIZE of F_WRITEBOOTSTRAP .BADF => return error.FileDescriptorInvalid, @@ -811,11 +812,11 @@ pub const IO = struct { .OVERFLOW => return error.FileTooBig, .SRCH => unreachable, // F_SETOWN .OPNOTSUPP => return error.OperationNotSupported, // not reported but need same error union - else => |errno| return os.unexpectedErrno(errno), + else => |errno| return posix.unexpectedErrno(errno), } // Now actually perform the allocation. - return os.ftruncate(fd, size) catch |err| switch (err) { + return posix.ftruncate(fd, size) catch |err| switch (err) { error.AccessDenied => error.PermissionDenied, else => |e| e, }; diff --git a/src/io/linux.zig b/src/io/linux.zig index 1339131..c856638 100644 --- a/src/io/linux.zig +++ b/src/io/linux.zig @@ -1,8 +1,8 @@ const std = @import("std"); const assert = std.debug.assert; -const os = std.os; -const linux = os.linux; -const IO_Uring = linux.IO_Uring; +const posix = std.posix; +const linux = std.os.linux; +const IoUring = linux.IoUring; const io_uring_cqe = linux.io_uring_cqe; const io_uring_sqe = linux.io_uring_sqe; const log = std.log.scoped(.io); @@ -17,7 +17,7 @@ const direct_io_required = true; const sector_size = 4096; pub const IO = struct { - ring: IO_Uring, + ring: IoUring, /// Operations not yet submitted to the kernel and waiting on available space in the /// submission queue. @@ -31,13 +31,13 @@ pub const IO = struct { pub fn init(entries: u12, flags: u32) !IO { // Detect the linux version to ensure that we support all io_uring ops used. - const uts = std.os.uname(); + const uts = std.posix.uname(); const version = try parse_dirty_semver(&uts.release); if (version.order(std.SemanticVersion{ .major = 5, .minor = 5, .patch = 0 }) == .lt) { @panic("Linux kernel 5.5 or greater is required for io_uring OP_ACCEPT"); } - return IO{ .ring = try IO_Uring.init(entries, flags) }; + return IO{ .ring = try IoUring.init(entries, flags) }; } pub fn deinit(self: *IO) void { @@ -74,10 +74,10 @@ pub const IO = struct { // We must use the same clock source used by io_uring (CLOCK_MONOTONIC) since we specify the // timeout below as an absolute value. Otherwise, we may deadlock if the clock sources are // dramatically different. Any kernel that supports io_uring will support CLOCK_MONOTONIC. - var current_ts: os.timespec = undefined; - os.clock_gettime(os.CLOCK.MONOTONIC, ¤t_ts) catch unreachable; + var current_ts: posix.timespec = undefined; + posix.clock_gettime(posix.CLOCK.MONOTONIC, ¤t_ts) catch unreachable; // The absolute CLOCK_MONOTONIC time after which we may return from this function: - const timeout_ts: os.linux.kernel_timespec = .{ + const timeout_ts: linux.kernel_timespec = .{ .tv_sec = current_ts.tv_sec, .tv_nsec = current_ts.tv_nsec + nanoseconds, }; @@ -90,7 +90,7 @@ pub const IO = struct { break :blk self.ring.get_sqe() catch unreachable; }; // Submit an absolute timeout that will be canceled if any other SQE completes first: - linux.io_uring_prep_timeout(timeout_sqe, &timeout_ts, 1, os.linux.IORING_TIMEOUT_ABS); + timeout_sqe.prep_timeout(&timeout_ts, 1, linux.IORING_TIMEOUT_ABS); timeout_sqe.user_data = 0; timeouts += 1; @@ -155,7 +155,7 @@ pub const IO = struct { // it was completed due to the completion of an event, in which case `cqe.res` // would be 0. It is possible for multiple timeout operations to complete at the // same time if the nanoseconds value passed to `run_for_ns()` is very short. - if (-cqe.res == @intFromEnum(os.E.TIME)) etime.* = true; + if (-cqe.res == @intFromEnum(posix.E.TIME)) etime.* = true; continue; } const completion = @as(*Completion, @ptrFromInt(@as(usize, @intCast(cqe.user_data)))); @@ -217,45 +217,41 @@ pub const IO = struct { fn prep(completion: *Completion, sqe: *io_uring_sqe) void { switch (completion.operation) { .accept => |*op| { - linux.io_uring_prep_accept( - sqe, + sqe.prep_accept( op.socket, &op.address, &op.address_size, - os.SOCK.CLOEXEC, + posix.SOCK.CLOEXEC, ); }, .close => |op| { - linux.io_uring_prep_close(sqe, op.fd); + sqe.prep_close(op.fd); }, .connect => |*op| { - linux.io_uring_prep_connect( - sqe, + sqe.prep_connect( op.socket, &op.address.any, op.address.getOsSockLen(), ); }, .read => |op| { - linux.io_uring_prep_read( - sqe, + sqe.prep_read( op.fd, op.buffer[0..bufferLimit(op.buffer.len)], op.offset, ); }, .recv => |op| { - linux.io_uring_prep_recv(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL); + sqe.prep_recv(op.socket, op.buffer, posix.MSG.NOSIGNAL); }, .send => |op| { - linux.io_uring_prep_send(sqe, op.socket, op.buffer, os.MSG.NOSIGNAL); + sqe.prep_send(op.socket, op.buffer, posix.MSG.NOSIGNAL); }, .timeout => |*op| { - linux.io_uring_prep_timeout(sqe, &op.timespec, 0, 0); + sqe.prep_timeout(&op.timespec, 0, 0); }, .write => |op| { - linux.io_uring_prep_write( - sqe, + sqe.prep_write( op.fd, op.buffer[0..bufferLimit(op.buffer.len)], op.offset, @@ -268,9 +264,9 @@ pub const IO = struct { fn complete(completion: *Completion) void { switch (completion.operation) { .accept => { - const result: anyerror!os.socket_t = blk: { + const result: anyerror!posix.socket_t = blk: { if (completion.result < 0) { - const err = switch (@as(os.E, @enumFromInt(-completion.result))) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => { completion.io.enqueue(completion); return; @@ -288,11 +284,11 @@ pub const IO = struct { .OPNOTSUPP => error.OperationNotSupported, .PERM => error.PermissionDenied, .PROTO => error.ProtocolFailure, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; break :blk err; } else { - break :blk @as(os.socket_t, @intCast(completion.result)); + break :blk @as(posix.socket_t, @intCast(completion.result)); } }; completion.callback(completion.context, completion, &result); @@ -300,13 +296,13 @@ pub const IO = struct { .close => { const result: anyerror!void = blk: { if (completion.result < 0) { - const err = switch (@as(os.E, @enumFromInt(-completion.result))) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => {}, // A success, see https://github.com/ziglang/zig/issues/2425 .BADF => error.FileDescriptorInvalid, .DQUOT => error.DiskQuota, .IO => error.InputOutput, .NOSPC => error.NoSpaceLeft, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; break :blk err; } else { @@ -318,7 +314,7 @@ pub const IO = struct { .connect => { const result: anyerror!void = blk: { if (completion.result < 0) { - const err = switch (@as(os.E, @enumFromInt(-completion.result))) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => { completion.io.enqueue(completion); return; @@ -340,7 +336,7 @@ pub const IO = struct { .PERM => error.PermissionDenied, .PROTOTYPE => error.ProtocolNotSupported, .TIMEDOUT => error.ConnectionTimedOut, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; break :blk err; } else { @@ -352,7 +348,7 @@ pub const IO = struct { .read => { const result: anyerror!usize = blk: { if (completion.result < 0) { - const err = switch (@as(os.E, @enumFromInt(-completion.result))) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => { completion.io.enqueue(completion); return; @@ -370,7 +366,7 @@ pub const IO = struct { .OVERFLOW => error.Unseekable, .SPIPE => error.Unseekable, .TIMEDOUT => error.ConnectionTimedOut, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; break :blk err; } else { @@ -382,7 +378,7 @@ pub const IO = struct { .recv => { const result: anyerror!usize = blk: { if (completion.result < 0) { - const err = switch (@as(os.E, @enumFromInt(-completion.result))) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => { completion.io.enqueue(completion); return; @@ -398,7 +394,7 @@ pub const IO = struct { .CONNRESET => error.ConnectionResetByPeer, .TIMEDOUT => error.ConnectionTimedOut, .OPNOTSUPP => error.OperationNotSupported, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; break :blk err; } else { @@ -410,7 +406,7 @@ pub const IO = struct { .send => { const result: anyerror!usize = blk: { if (completion.result < 0) { - const err = switch (@as(os.E, @enumFromInt(-completion.result))) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => { completion.io.enqueue(completion); return; @@ -433,7 +429,7 @@ pub const IO = struct { .OPNOTSUPP => error.OperationNotSupported, .PIPE => error.BrokenPipe, .TIMEDOUT => error.ConnectionTimedOut, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; break :blk err; } else { @@ -444,21 +440,21 @@ pub const IO = struct { }, .timeout => { assert(completion.result < 0); - const result: anyerror!void = switch (@as(os.E, @enumFromInt(-completion.result))) { + const result: anyerror!void = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => { completion.io.enqueue(completion); return; }, .CANCELED => error.Canceled, .TIME => {}, // A success. - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; completion.callback(completion.context, completion, &result); }, .write => { const result: anyerror!usize = blk: { if (completion.result < 0) { - const err = switch (@as(os.E, @enumFromInt(-completion.result))) { + const err = switch (@as(posix.E, @enumFromInt(-completion.result))) { .INTR => { completion.io.enqueue(completion); return; @@ -477,7 +473,7 @@ pub const IO = struct { .PERM => error.AccessDenied, .PIPE => error.BrokenPipe, .SPIPE => error.Unseekable, - else => |errno| os.unexpectedErrno(errno), + else => |errno| posix.unexpectedErrno(errno), }; break :blk err; } else { @@ -493,35 +489,35 @@ pub const IO = struct { /// This union encodes the set of operations supported as well as their arguments. const Operation = union(enum) { accept: struct { - socket: os.socket_t, - address: os.sockaddr = undefined, - address_size: os.socklen_t = @sizeOf(os.sockaddr), + socket: posix.socket_t, + address: posix.sockaddr = undefined, + address_size: posix.socklen_t = @sizeOf(posix.sockaddr), }, close: struct { - fd: os.fd_t, + fd: posix.fd_t, }, connect: struct { - socket: os.socket_t, + socket: posix.socket_t, address: std.net.Address, }, read: struct { - fd: os.fd_t, + fd: posix.fd_t, buffer: []u8, offset: u64, }, recv: struct { - socket: os.socket_t, + socket: posix.socket_t, buffer: []u8, }, send: struct { - socket: os.socket_t, + socket: posix.socket_t, buffer: []const u8, }, timeout: struct { - timespec: os.linux.kernel_timespec, + timespec: linux.kernel_timespec, }, write: struct { - fd: os.fd_t, + fd: posix.fd_t, buffer: []const u8, offset: u64, }, @@ -539,7 +535,7 @@ pub const IO = struct { OperationNotSupported, PermissionDenied, ProtocolFailure, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn accept( self: *IO, @@ -548,10 +544,10 @@ pub const IO = struct { comptime callback: fn ( context: Context, completion: *Completion, - result: AcceptError!os.socket_t, + result: AcceptError!posix.socket_t, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, ) void { completion.* = .{ .io = self, @@ -561,7 +557,7 @@ pub const IO = struct { callback( @ptrCast(@alignCast(ctx)), comp, - @as(*const AcceptError!os.socket_t, @ptrCast(@alignCast(res))).*, + @as(*const AcceptError!posix.socket_t, @ptrCast(@alignCast(res))).*, ); } }.wrapper, @@ -569,7 +565,7 @@ pub const IO = struct { .accept = .{ .socket = socket, .address = undefined, - .address_size = @sizeOf(os.sockaddr), + .address_size = @sizeOf(posix.sockaddr), }, }, }; @@ -581,7 +577,7 @@ pub const IO = struct { DiskQuota, InputOutput, NoSpaceLeft, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn close( self: *IO, @@ -593,7 +589,7 @@ pub const IO = struct { result: CloseError!void, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, ) void { completion.* = .{ .io = self, @@ -631,7 +627,7 @@ pub const IO = struct { ProtocolNotSupported, ConnectionTimedOut, SystemResources, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn connect( self: *IO, @@ -643,7 +639,7 @@ pub const IO = struct { result: ConnectError!void, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, address: std.net.Address, ) void { completion.* = .{ @@ -678,7 +674,7 @@ pub const IO = struct { SystemResources, Unseekable, ConnectionTimedOut, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn read( self: *IO, @@ -690,7 +686,7 @@ pub const IO = struct { result: ReadError!usize, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, buffer: []u8, offset: u64, ) void { @@ -726,7 +722,7 @@ pub const IO = struct { FileDescriptorNotASocket, ConnectionTimedOut, OperationNotSupported, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn recv( self: *IO, @@ -738,7 +734,7 @@ pub const IO = struct { result: RecvError!usize, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, buffer: []u8, ) void { completion.* = .{ @@ -777,7 +773,7 @@ pub const IO = struct { OperationNotSupported, BrokenPipe, ConnectionTimedOut, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn send( self: *IO, @@ -789,7 +785,7 @@ pub const IO = struct { result: SendError!usize, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, buffer: []const u8, ) void { completion.* = .{ @@ -814,7 +810,7 @@ pub const IO = struct { self.enqueue(completion); } - pub const TimeoutError = error{Canceled} || os.UnexpectedError; + pub const TimeoutError = error{Canceled} || posix.UnexpectedError; pub fn timeout( self: *IO, @@ -849,7 +845,7 @@ pub const IO = struct { // Special case a zero timeout as a yield. if (nanoseconds == 0) { - completion.result = -@as(i32, @intCast(@intFromEnum(std.os.E.TIME))); + completion.result = -@as(i32, @intCast(@intFromEnum(std.posix.E.TIME))); self.completed.push(completion); return; } @@ -869,7 +865,7 @@ pub const IO = struct { Unseekable, AccessDenied, BrokenPipe, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn write( self: *IO, @@ -881,7 +877,7 @@ pub const IO = struct { result: WriteError!usize, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, buffer: []const u8, offset: u64, ) void { @@ -911,17 +907,17 @@ pub const IO = struct { pub const INVALID_SOCKET = -1; /// Creates a socket that can be used for async operations with the IO instance. - pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t { + pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !posix.socket_t { _ = self; - return os.socket(family, sock_type, protocol); + return posix.socket(family, sock_type, protocol); } /// Opens a directory with read only access. - pub fn open_dir(dir_path: []const u8) !os.fd_t { - return os.open(dir_path, os.O.CLOEXEC | os.O.RDONLY, 0); + pub fn open_dir(dir_path: []const u8) !posix.fd_t { + return posix.open(dir_path, posix.O.CLOEXEC | posix.O.RDONLY, 0); } - pub const INVALID_FILE: os.fd_t = -1; + pub const INVALID_FILE: posix.fd_t = -1; /// Opens or creates a journal file: /// - For reading and writing. @@ -932,22 +928,22 @@ pub const IO = struct { /// The caller is responsible for ensuring that the parent directory inode is durable. /// - Verifies that the file size matches the expected file size before returning. pub fn open_file( - dir_fd: os.fd_t, + dir_fd: posix.fd_t, relative_path: []const u8, size: u64, method: enum { create, create_or_open, open }, - ) !os.fd_t { + ) !posix.fd_t { assert(relative_path.len > 0); assert(size % sector_size == 0); // TODO Use O_EXCL when opening as a block device to obtain a mandatory exclusive lock. // This is much stronger than an advisory exclusive lock, and is required on some platforms. - var flags: u32 = os.O.CLOEXEC | os.O.RDWR | os.O.DSYNC; - var mode: os.mode_t = 0; + var flags: u32 = posix.O.CLOEXEC | posix.O.RDWR | posix.O.DSYNC; + var mode: posix.mode_t = 0; // TODO Document this and investigate whether this is in fact correct to set here. - if (@hasDecl(os.O, "LARGEFILE")) flags |= os.O.LARGEFILE; + if (@hasDecl(posix.O, "LARGEFILE")) flags |= posix.O.LARGEFILE; var direct_io_supported = false; const dir_on_tmpfs = try fs_is_tmpfs(dir_fd); @@ -962,7 +958,7 @@ pub const IO = struct { if (direct_io and !dir_on_tmpfs) { direct_io_supported = try fs_supports_direct_io(dir_fd); if (direct_io_supported) { - flags |= os.O.DIRECT; + flags |= posix.O.DIRECT; } else if (!direct_io_required) { log.warn("file system does not support Direct I/O", .{}); } else { @@ -974,13 +970,13 @@ pub const IO = struct { switch (method) { .create => { - flags |= os.O.CREAT; - flags |= os.O.EXCL; + flags |= posix.O.CREAT; + flags |= posix.O.EXCL; mode = 0o666; log.info("creating \"{s}\"...", .{relative_path}); }, .create_or_open => { - flags |= os.O.CREAT; + flags |= posix.O.CREAT; mode = 0o666; log.info("opening or creating \"{s}\"...", .{relative_path}); }, @@ -990,19 +986,19 @@ pub const IO = struct { } // This is critical as we rely on O_DSYNC for fsync() whenever we write to the file: - assert((flags & os.O.DSYNC) > 0); + assert((flags & posix.O.DSYNC) > 0); // Be careful with openat(2): "If pathname is absolute, then dirfd is ignored." (man page) assert(!std.fs.path.isAbsolute(relative_path)); - const fd = try os.openat(dir_fd, relative_path, flags, mode); + const fd = try posix.openat(dir_fd, relative_path, flags, mode); // TODO Return a proper error message when the path exists or does not exist (init/start). - errdefer os.close(fd); + errdefer posix.close(fd); // TODO Check that the file is actually a file. // Obtain an advisory exclusive lock that works only if all processes actually use flock(). // LOCK_NB means that we want to fail the lock without waiting if another process has it. - os.flock(fd, os.LOCK.EX | os.LOCK.NB) catch |err| switch (err) { + posix.flock(fd, posix.LOCK.EX | posix.LOCK.NB) catch |err| switch (err) { error.WouldBlock => @panic("another process holds the data file lock"), else => return err, }; @@ -1023,7 +1019,7 @@ pub const IO = struct { const write_offset = size - sector.len; var written: usize = 0; while (written < sector.len) { - written += try os.pwrite(fd, sector[written..], write_offset + written); + written += try posix.pwrite(fd, sector[written..], write_offset + written); } }, else => |e| return e, @@ -1034,14 +1030,14 @@ pub const IO = struct { // making decisions on data that was never durably written by a previously crashed process. // We therefore always fsync when we open the path, also to wait for any pending O_DSYNC. // Thanks to Alex Miller from FoundationDB for diving into our source and pointing this out. - try os.fsync(fd); + try posix.fsync(fd); // We fsync the parent directory to ensure that the file inode is durably written. // The caller is responsible for the parent directory inode stored under the grandparent. // We always do this when opening because we don't know if this was done before crashing. - try os.fsync(dir_fd); + try posix.fsync(dir_fd); - const stat = try os.fstat(fd); + const stat = try posix.fstat(fd); if (stat.size < size) @panic("data file inode size was truncated or corrupted"); return fd; @@ -1049,56 +1045,56 @@ pub const IO = struct { /// Detects whether the underlying file system for a given directory fd is tmpfs. This is used /// to relax our Direct I/O check - running on tmpfs for benchmarking is useful. - fn fs_is_tmpfs(dir_fd: std.os.fd_t) !bool { + fn fs_is_tmpfs(dir_fd: std.posix.fd_t) !bool { var statfs: stdx.StatFs = undefined; while (true) { const res = stdx.fstatfs(dir_fd, &statfs); - switch (os.linux.getErrno(res)) { + switch (linux.getErrno(res)) { .SUCCESS => { return statfs.f_type == stdx.TmpfsMagic; }, .INTR => continue, - else => |err| return os.unexpectedErrno(err), + else => |err| return posix.unexpectedErrno(err), } } } /// Detects whether the underlying file system for a given directory fd supports Direct I/O. /// Not all Linux file systems support `O_DIRECT`, e.g. a shared macOS volume. - fn fs_supports_direct_io(dir_fd: std.os.fd_t) !bool { - if (!@hasDecl(std.os.O, "DIRECT")) return false; + fn fs_supports_direct_io(dir_fd: std.posix.fd_t) !bool { + if (!@hasDecl(std.posix.O, "DIRECT")) return false; const path = "fs_supports_direct_io"; const dir = std.fs.Dir{ .fd = dir_fd }; - const fd = try os.openatZ(dir_fd, path, os.O.CLOEXEC | os.O.CREAT | os.O.TRUNC, 0o666); - defer os.close(fd); + const fd = try posix.openatZ(dir_fd, path, posix.O.CLOEXEC | posix.O.CREAT | posix.O.TRUNC, 0o666); + defer posix.close(fd); defer dir.deleteFile(path) catch {}; while (true) { - const res = os.linux.openat(dir_fd, path, os.O.CLOEXEC | os.O.RDONLY | os.O.DIRECT, 0); - switch (os.linux.getErrno(res)) { + const res = linux.openat(dir_fd, path, posix.O.CLOEXEC | posix.O.RDONLY | posix.O.DIRECT, 0); + switch (linux.getErrno(res)) { .SUCCESS => { - os.close(@as(os.fd_t, @intCast(res))); + posix.close(@as(posix.fd_t, @intCast(res))); return true; }, .INTR => continue, .INVAL => return false, - else => |err| return os.unexpectedErrno(err), + else => |err| return posix.unexpectedErrno(err), } } } /// Allocates a file contiguously using fallocate() if supported. /// Alternatively, writes to the last sector so that at least the file size is correct. - fn fs_allocate(fd: os.fd_t, size: u64) !void { + fn fs_allocate(fd: posix.fd_t, size: u64) !void { const mode: i32 = 0; const offset: i64 = 0; const length = @as(i64, @intCast(size)); while (true) { - const rc = os.linux.fallocate(fd, mode, offset, length); - switch (os.linux.getErrno(rc)) { + const rc = linux.fallocate(fd, mode, offset, length); + switch (linux.getErrno(rc)) { .SUCCESS => return, .BADF => return error.FileDescriptorInvalid, .FBIG => return error.FileTooBig, @@ -1112,7 +1108,7 @@ pub const IO = struct { .PERM => return error.PermissionDenied, .SPIPE => return error.Unseekable, .TXTBSY => return error.FileBusy, - else => |errno| return os.unexpectedErrno(errno), + else => |errno| return posix.unexpectedErrno(errno), } } } diff --git a/src/io/windows.zig b/src/io/windows.zig index f7f3bc7..ad6f6c7 100644 --- a/src/io/windows.zig +++ b/src/io/windows.zig @@ -1,5 +1,6 @@ const std = @import("std"); -const os = std.os; +const posix = std.posix; +const windows = std.os.windows; const assert = std.debug.assert; const log = std.log.scoped(.io); @@ -10,7 +11,7 @@ const bufferLimit = @import("../io.zig").bufferLimit; const sector_size = 4096; pub const IO = struct { - iocp: os.windows.HANDLE, + iocp: windows.HANDLE, timer: Time = .{}, io_pending: usize = 0, timeouts: FIFO(Completion) = .{ .name = "io_timeouts" }, @@ -20,19 +21,19 @@ pub const IO = struct { _ = entries; _ = flags; - _ = try os.windows.WSAStartup(2, 2); - errdefer os.windows.WSACleanup() catch unreachable; + _ = try windows.WSAStartup(2, 2); + errdefer windows.WSACleanup() catch unreachable; - const iocp = try os.windows.CreateIoCompletionPort(os.windows.INVALID_HANDLE_VALUE, null, 0, 0); + const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 0); return IO{ .iocp = iocp }; } pub fn deinit(self: *IO) void { - assert(self.iocp != os.windows.INVALID_HANDLE_VALUE); - os.windows.CloseHandle(self.iocp); - self.iocp = os.windows.INVALID_HANDLE_VALUE; + assert(self.iocp != windows.INVALID_HANDLE_VALUE); + windows.CloseHandle(self.iocp); + self.iocp = windows.INVALID_HANDLE_VALUE; - os.windows.WSACleanup() catch unreachable; + windows.WSACleanup() catch unreachable; } pub fn tick(self: *IO) !void { @@ -66,16 +67,16 @@ pub const IO = struct { if (self.completed.empty()) { // Compute how long to poll by flushing timeout completions. // NOTE: this may push to completed queue - var timeout_ms: ?os.windows.DWORD = null; + var timeout_ms: ?windows.DWORD = null; if (self.flush_timeouts()) |expires_ns| { // 0ns expires should have been completed not returned assert(expires_ns != 0); // Round up sub-millisecond expire times to the next millisecond const expires_ms = (expires_ns + (std.time.ns_per_ms / 2)) / std.time.ns_per_ms; // Saturating cast to DWORD milliseconds - const expires = std.math.cast(os.windows.DWORD, expires_ms) orelse std.math.maxInt(os.windows.DWORD); + const expires = std.math.cast(windows.DWORD, expires_ms) orelse std.math.maxInt(windows.DWORD); // max DWORD is reserved for INFINITE so cap the cast at max - 1 - timeout_ms = if (expires == os.windows.INFINITE) expires - 1 else expires; + timeout_ms = if (expires == windows.INFINITE) expires - 1 else expires; } // Poll for IO iff theres IO pending and flush_timeouts() found no ready completions @@ -87,8 +88,8 @@ pub const IO = struct { .non_blocking => 0, }; - var events: [64]os.windows.OVERLAPPED_ENTRY = undefined; - const num_events: u32 = os.windows.GetQueuedCompletionStatusEx( + var events: [64]windows.OVERLAPPED_ENTRY = undefined; + const num_events: u32 = windows.GetQueuedCompletionStatusEx( self.iocp, &events, io_timeout, @@ -170,13 +171,13 @@ pub const IO = struct { }; const Overlapped = struct { - raw: os.windows.OVERLAPPED, + raw: windows.OVERLAPPED, completion: *Completion, }; const Transfer = struct { - socket: os.socket_t, - buf: os.windows.ws2_32.WSABUF, + socket: posix.socket_t, + buf: windows.ws2_32.WSABUF, overlapped: Overlapped, pending: bool, }; @@ -184,12 +185,12 @@ pub const IO = struct { const Operation = union(enum) { accept: struct { overlapped: Overlapped, - listen_socket: os.socket_t, - client_socket: os.socket_t, + listen_socket: posix.socket_t, + client_socket: posix.socket_t, addr_buffer: [(@sizeOf(std.net.Address) + 16) * 2]u8 align(4), }, connect: struct { - socket: os.socket_t, + socket: posix.socket_t, address: std.net.Address, overlapped: Overlapped, pending: bool, @@ -197,19 +198,19 @@ pub const IO = struct { send: Transfer, recv: Transfer, read: struct { - fd: os.fd_t, + fd: posix.fd_t, buf: [*]u8, len: u32, offset: u64, }, write: struct { - fd: os.fd_t, + fd: posix.fd_t, buf: [*]const u8, len: u32, offset: u64, }, close: struct { - fd: os.fd_t, + fd: posix.fd_t, }, timeout: struct { deadline: u64, @@ -270,7 +271,7 @@ pub const IO = struct { } } - pub const AcceptError = os.AcceptError || os.SetSockOptError; + pub const AcceptError = posix.AcceptError || posix.SetSockOptError; pub fn accept( self: *IO, @@ -279,10 +280,10 @@ pub const IO = struct { comptime callback: fn ( context: Context, completion: *Completion, - result: AcceptError!os.socket_t, + result: AcceptError!posix.socket_t, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, ) void { self.submit( context, @@ -296,31 +297,31 @@ pub const IO = struct { .addr_buffer = undefined, }, struct { - fn do_operation(ctx: Completion.Context, op: anytype) AcceptError!os.socket_t { - var flags: os.windows.DWORD = undefined; - var transferred: os.windows.DWORD = undefined; + fn do_operation(ctx: Completion.Context, op: anytype) AcceptError!posix.socket_t { + var flags: windows.DWORD = undefined; + var transferred: windows.DWORD = undefined; const rc = switch (op.client_socket) { // When first called, the client_socket is invalid so we start the op. INVALID_SOCKET => blk: { // Create the socket that will be used for accept. op.client_socket = ctx.io.open_socket( - os.AF.INET, - os.SOCK.STREAM, - os.IPPROTO.TCP, + posix.AF.INET, + posix.SOCK.STREAM, + posix.IPPROTO.TCP, ) catch |err| switch (err) { error.AddressFamilyNotSupported, error.ProtocolNotSupported => unreachable, else => |e| return e, }; - var sync_bytes_read: os.windows.DWORD = undefined; + var sync_bytes_read: windows.DWORD = undefined; op.overlapped = .{ - .raw = std.mem.zeroes(os.windows.OVERLAPPED), + .raw = std.mem.zeroes(windows.OVERLAPPED), .completion = ctx.completion, }; // Start the asynchronous accept with the created socket. - break :blk os.windows.ws2_32.AcceptEx( + break :blk windows.ws2_32.AcceptEx( op.listen_socket, op.client_socket, &op.addr_buffer, @@ -332,22 +333,22 @@ pub const IO = struct { ); }, // Called after accept was started, so get the result - else => os.windows.ws2_32.WSAGetOverlappedResult( + else => windows.ws2_32.WSAGetOverlappedResult( op.listen_socket, &op.overlapped.raw, &transferred, - os.windows.FALSE, // dont wait + windows.FALSE, // dont wait &flags, ), }; // return the socket if we succeed in accepting. - if (rc != os.windows.FALSE) { + if (rc != windows.FALSE) { // enables getsockopt, setsockopt, getsockname, getpeername - _ = os.windows.ws2_32.setsockopt( + _ = windows.ws2_32.setsockopt( op.client_socket, - os.windows.ws2_32.SOL.SOCKET, - os.windows.ws2_32.SO.UPDATE_ACCEPT_CONTEXT, + windows.ws2_32.SOL.SOCKET, + windows.ws2_32.SO.UPDATE_ACCEPT_CONTEXT, null, 0, ); @@ -359,12 +360,12 @@ pub const IO = struct { errdefer |err| switch (err) { error.WouldBlock => {}, else => { - os.closeSocket(op.client_socket); + posix.close(op.client_socket); op.client_socket = INVALID_SOCKET; }, }; - return switch (os.windows.ws2_32.WSAGetLastError()) { + return switch (windows.ws2_32.WSAGetLastError()) { .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock, .WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSAENETDOWN => unreachable, // WinSock error @@ -376,7 +377,7 @@ pub const IO = struct { .WSAEMFILE => unreachable, // we create our own descriptor so its available .WSAENOBUFS => error.SystemResources, .WSAEINTR, .WSAEINPROGRESS => unreachable, // no blocking calls - else => |err| os.windows.unexpectedWSAError(err), + else => |err| windows.unexpectedWSAError(err), }; } }, @@ -388,9 +389,9 @@ pub const IO = struct { DiskQuota, InputOutput, NoSpaceLeft, - } || os.UnexpectedError; + } || posix.UnexpectedError; - pub const ConnectError = os.ConnectError || error{FileDescriptorNotASocket}; + pub const ConnectError = posix.ConnectError || error{FileDescriptorNotASocket}; pub fn connect( self: *IO, @@ -402,7 +403,7 @@ pub const IO = struct { result: ConnectError!void, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, address: std.net.Address, ) void { self.submit( @@ -418,17 +419,17 @@ pub const IO = struct { }, struct { fn do_operation(ctx: Completion.Context, op: anytype) ConnectError!void { - var flags: os.windows.DWORD = undefined; - var transferred: os.windows.DWORD = undefined; + var flags: windows.DWORD = undefined; + var transferred: windows.DWORD = undefined; const rc = blk: { // Poll for the result if we've already started the connect op. if (op.pending) { - break :blk os.windows.ws2_32.WSAGetOverlappedResult( + break :blk windows.ws2_32.WSAGetOverlappedResult( op.socket, &op.overlapped.raw, &transferred, - os.windows.FALSE, // dont wait + windows.FALSE, // dont wait &flags, ); } @@ -436,7 +437,7 @@ pub const IO = struct { // ConnectEx requires the socket to be initially bound (INADDR_ANY) const inaddr_any = std.mem.zeroes([4]u8); const bind_addr = std.net.Address.initIp4(inaddr_any, 0); - os.bind( + posix.bind( op.socket, &bind_addr.any, bind_addr.getOsSockLen(), @@ -452,43 +453,43 @@ pub const IO = struct { }; const LPFN_CONNECTEX = *const fn ( - Socket: os.windows.ws2_32.SOCKET, - SockAddr: *const os.windows.ws2_32.sockaddr, - SockLen: os.socklen_t, + Socket: windows.ws2_32.SOCKET, + SockAddr: *const windows.ws2_32.sockaddr, + SockLen: posix.socklen_t, SendBuf: ?*const anyopaque, - SendBufLen: os.windows.DWORD, - BytesSent: *os.windows.DWORD, - Overlapped: *os.windows.OVERLAPPED, - ) callconv(os.windows.WINAPI) os.windows.BOOL; + SendBufLen: windows.DWORD, + BytesSent: *windows.DWORD, + Overlapped: *windows.OVERLAPPED, + ) callconv(windows.WINAPI) windows.BOOL; // Find the ConnectEx function by dynamically looking it up on the socket. - // TODO: use `os.windows.loadWinsockExtensionFunction` once the function + // TODO: use `windows.loadWinsockExtensionFunction` once the function // pointer is no longer required to be comptime. var connect_ex: LPFN_CONNECTEX = undefined; - var num_bytes: os.windows.DWORD = undefined; - const guid = os.windows.ws2_32.WSAID_CONNECTEX; - switch (os.windows.ws2_32.WSAIoctl( + var num_bytes: windows.DWORD = undefined; + const guid = windows.ws2_32.WSAID_CONNECTEX; + switch (windows.ws2_32.WSAIoctl( op.socket, - os.windows.ws2_32.SIO_GET_EXTENSION_FUNCTION_POINTER, + windows.ws2_32.SIO_GET_EXTENSION_FUNCTION_POINTER, @as(*const anyopaque, @ptrCast(&guid)), - @sizeOf(os.windows.GUID), + @sizeOf(windows.GUID), @as(*anyopaque, @ptrCast(&connect_ex)), @sizeOf(LPFN_CONNECTEX), &num_bytes, null, null, )) { - os.windows.ws2_32.SOCKET_ERROR => switch (os.windows.ws2_32.WSAGetLastError()) { + windows.ws2_32.SOCKET_ERROR => switch (windows.ws2_32.WSAGetLastError()) { .WSAEOPNOTSUPP => unreachable, .WSAENOTSOCK => unreachable, - else => |err| return os.windows.unexpectedWSAError(err), + else => |err| return windows.unexpectedWSAError(err), }, else => assert(num_bytes == @sizeOf(LPFN_CONNECTEX)), } op.pending = true; op.overlapped = .{ - .raw = std.mem.zeroes(os.windows.OVERLAPPED), + .raw = std.mem.zeroes(windows.OVERLAPPED), .completion = ctx.completion, }; @@ -505,12 +506,12 @@ pub const IO = struct { }; // return if we succeeded in connecting - if (rc != os.windows.FALSE) { + if (rc != windows.FALSE) { // enables getsockopt, setsockopt, getsockname, getpeername - _ = os.windows.ws2_32.setsockopt( + _ = windows.ws2_32.setsockopt( op.socket, - os.windows.ws2_32.SOL.SOCKET, - os.windows.ws2_32.SO.UPDATE_CONNECT_CONTEXT, + windows.ws2_32.SOL.SOCKET, + windows.ws2_32.SO.UPDATE_CONNECT_CONTEXT, null, 0, ); @@ -518,7 +519,7 @@ pub const IO = struct { return; } - return switch (os.windows.ws2_32.WSAGetLastError()) { + return switch (windows.ws2_32.WSAGetLastError()) { .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE, .WSAEALREADY => error.WouldBlock, .WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSAENETDOWN => unreachable, // network subsystem is down @@ -532,14 +533,14 @@ pub const IO = struct { .WSAENOTSOCK => unreachable, // socket is not bound or is listening .WSAETIMEDOUT => error.ConnectionTimedOut, .WSA_INVALID_HANDLE => unreachable, // we dont use hEvent in OVERLAPPED - else => |err| os.windows.unexpectedWSAError(err), + else => |err| windows.unexpectedWSAError(err), }; } }, ); } - pub const SendError = os.SendError; + pub const SendError = posix.SendError; pub fn send( self: *IO, @@ -551,12 +552,12 @@ pub const IO = struct { result: SendError!usize, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, buffer: []const u8, ) void { const transfer = Completion.Transfer{ .socket = socket, - .buf = os.windows.ws2_32.WSABUF{ + .buf = windows.ws2_32.WSABUF{ .len = @as(u32, @intCast(bufferLimit(buffer.len))), .buf = @constCast(buffer.ptr), }, @@ -572,48 +573,48 @@ pub const IO = struct { transfer, struct { fn do_operation(ctx: Completion.Context, op: anytype) SendError!usize { - var flags: os.windows.DWORD = undefined; - var transferred: os.windows.DWORD = undefined; + var flags: windows.DWORD = undefined; + var transferred: windows.DWORD = undefined; const rc = blk: { // Poll for the result if we've already started the send op. if (op.pending) { - break :blk os.windows.ws2_32.WSAGetOverlappedResult( + break :blk windows.ws2_32.WSAGetOverlappedResult( op.socket, &op.overlapped.raw, &transferred, - os.windows.FALSE, // dont wait + windows.FALSE, // dont wait &flags, ); } op.pending = true; op.overlapped = .{ - .raw = std.mem.zeroes(os.windows.OVERLAPPED), + .raw = std.mem.zeroes(windows.OVERLAPPED), .completion = ctx.completion, }; // Start the send operation. - break :blk switch (os.windows.ws2_32.WSASend( + break :blk switch (windows.ws2_32.WSASend( op.socket, - @as([*]os.windows.ws2_32.WSABUF, @ptrCast(&op.buf)), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&op.buf)), 1, // one buffer &transferred, 0, // no flags &op.overlapped.raw, null, )) { - os.windows.ws2_32.SOCKET_ERROR => @as(os.windows.BOOL, os.windows.FALSE), - 0 => os.windows.TRUE, + windows.ws2_32.SOCKET_ERROR => @as(windows.BOOL, windows.FALSE), + 0 => windows.TRUE, else => unreachable, }; }; // Return bytes transferred on success. - if (rc != os.windows.FALSE) + if (rc != windows.FALSE) return transferred; - return switch (os.windows.ws2_32.WSAGetLastError()) { + return switch (windows.ws2_32.WSAGetLastError()) { .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock, .WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent @@ -632,14 +633,14 @@ pub const IO = struct { .WSAEOPNOTSUPP => unreachable, // we dont use MSG_OOB or MSG_PARTIAL .WSAESHUTDOWN => error.BrokenPipe, .WSA_OPERATION_ABORTED => unreachable, // operation was cancelled - else => |err| os.windows.unexpectedWSAError(err), + else => |err| windows.unexpectedWSAError(err), }; } }, ); } - pub const RecvError = os.RecvFromError; + pub const RecvError = posix.RecvFromError; pub fn recv( self: *IO, @@ -651,12 +652,12 @@ pub const IO = struct { result: RecvError!usize, ) void, completion: *Completion, - socket: os.socket_t, + socket: posix.socket_t, buffer: []u8, ) void { const transfer = Completion.Transfer{ .socket = socket, - .buf = os.windows.ws2_32.WSABUF{ + .buf = windows.ws2_32.WSABUF{ .len = @as(u32, @intCast(bufferLimit(buffer.len))), .buf = buffer.ptr, }, @@ -672,48 +673,48 @@ pub const IO = struct { transfer, struct { fn do_operation(ctx: Completion.Context, op: anytype) RecvError!usize { - var flags: os.windows.DWORD = 0; // used both as input and output - var transferred: os.windows.DWORD = undefined; + var flags: windows.DWORD = 0; // used both as input and output + var transferred: windows.DWORD = undefined; const rc = blk: { // Poll for the result if we've already started the recv op. if (op.pending) { - break :blk os.windows.ws2_32.WSAGetOverlappedResult( + break :blk windows.ws2_32.WSAGetOverlappedResult( op.socket, &op.overlapped.raw, &transferred, - os.windows.FALSE, // dont wait + windows.FALSE, // dont wait &flags, ); } op.pending = true; op.overlapped = .{ - .raw = std.mem.zeroes(os.windows.OVERLAPPED), + .raw = std.mem.zeroes(windows.OVERLAPPED), .completion = ctx.completion, }; // Start the recv operation. - break :blk switch (os.windows.ws2_32.WSARecv( + break :blk switch (windows.ws2_32.WSARecv( op.socket, - @as([*]os.windows.ws2_32.WSABUF, @ptrCast(&op.buf)), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&op.buf)), 1, // one buffer &transferred, &flags, &op.overlapped.raw, null, )) { - os.windows.ws2_32.SOCKET_ERROR => @as(os.windows.BOOL, os.windows.FALSE), - 0 => os.windows.TRUE, + windows.ws2_32.SOCKET_ERROR => @as(windows.BOOL, windows.FALSE), + 0 => windows.TRUE, else => unreachable, }; }; // Return bytes received on success. - if (rc != os.windows.FALSE) + if (rc != windows.FALSE) return transferred; - return switch (os.windows.ws2_32.WSAGetLastError()) { + return switch (windows.ws2_32.WSAGetLastError()) { .WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => error.WouldBlock, .WSANOTINITIALISED => unreachable, // WSAStartup() was called .WSA_INVALID_HANDLE => unreachable, // we dont use OVERLAPPED.hEvent @@ -733,7 +734,7 @@ pub const IO = struct { .WSAESHUTDOWN => error.SocketNotConnected, .WSAETIMEDOUT => error.ConnectionRefused, .WSA_OPERATION_ABORTED => unreachable, // operation was cancelled - else => |err| os.windows.unexpectedWSAError(err), + else => |err| windows.unexpectedWSAError(err), }; } }, @@ -750,7 +751,7 @@ pub const IO = struct { SystemResources, Unseekable, ConnectionTimedOut, - } || os.UnexpectedError; + } || posix.UnexpectedError; pub fn read( self: *IO, @@ -762,7 +763,7 @@ pub const IO = struct { result: ReadError!usize, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, buffer: []u8, offset: u64, ) void { @@ -781,7 +782,7 @@ pub const IO = struct { fn do_operation(ctx: Completion.Context, op: anytype) ReadError!usize { // Do a synchronous read for now. _ = ctx; - return os.pread(op.fd, op.buf[0..op.len], op.offset) catch |err| switch (err) { + return posix.pread(op.fd, op.buf[0..op.len], op.offset) catch |err| switch (err) { error.OperationAborted => unreachable, error.BrokenPipe => unreachable, error.ConnectionTimedOut => unreachable, @@ -794,7 +795,7 @@ pub const IO = struct { ); } - pub const WriteError = os.PWriteError; + pub const WriteError = posix.PWriteError; pub fn write( self: *IO, @@ -806,7 +807,7 @@ pub const IO = struct { result: WriteError!usize, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, buffer: []const u8, offset: u64, ) void { @@ -825,7 +826,7 @@ pub const IO = struct { fn do_operation(ctx: Completion.Context, op: anytype) WriteError!usize { // Do a synchronous write for now. _ = ctx; - return os.pwrite(op.fd, op.buf[0..op.len], op.offset); + return posix.pwrite(op.fd, op.buf[0..op.len], op.offset); } }, ); @@ -841,7 +842,7 @@ pub const IO = struct { result: CloseError!void, ) void, completion: *Completion, - fd: os.fd_t, + fd: posix.fd_t, ) void { self.submit( context, @@ -855,19 +856,19 @@ pub const IO = struct { // Check if the fd is a SOCKET by seeing if getsockopt() returns ENOTSOCK // https://stackoverflow.com/a/50981652 - const socket: os.socket_t = @ptrCast(op.fd); + const socket: posix.socket_t = @ptrCast(op.fd); getsockoptError(socket) catch |err| switch (err) { - error.FileDescriptorNotASocket => return os.windows.CloseHandle(op.fd), + error.FileDescriptorNotASocket => return windows.CloseHandle(op.fd), else => {}, }; - os.closeSocket(socket); + posix.close(socket); } }, ); } - pub const TimeoutError = error{Canceled} || os.UnexpectedError; + pub const TimeoutError = error{Canceled} || posix.UnexpectedError; pub fn timeout( self: *IO, @@ -915,16 +916,16 @@ pub const IO = struct { ); } - pub const INVALID_SOCKET = os.windows.ws2_32.INVALID_SOCKET; + pub const INVALID_SOCKET = windows.ws2_32.INVALID_SOCKET; /// Creates a socket that can be used for async operations with the IO instance. - pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !os.socket_t { + pub fn open_socket(self: *IO, family: u32, sock_type: u32, protocol: u32) !posix.socket_t { // SOCK_NONBLOCK | SOCK_CLOEXEC - var flags: os.windows.DWORD = 0; - flags |= os.windows.ws2_32.WSA_FLAG_OVERLAPPED; - flags |= os.windows.ws2_32.WSA_FLAG_NO_HANDLE_INHERIT; + var flags: windows.DWORD = 0; + flags |= windows.ws2_32.WSA_FLAG_OVERLAPPED; + flags |= windows.ws2_32.WSA_FLAG_NO_HANDLE_INHERIT; - const socket = try os.windows.WSASocketW( + const socket = try windows.WSASocketW( @as(i32, @bitCast(family)), @as(i32, @bitCast(sock_type)), @as(i32, @bitCast(protocol)), @@ -932,68 +933,68 @@ pub const IO = struct { 0, flags, ); - errdefer os.closeSocket(socket); + errdefer posix.close(socket); - const socket_iocp = try os.windows.CreateIoCompletionPort(socket, self.iocp, 0, 0); + const socket_iocp = try windows.CreateIoCompletionPort(socket, self.iocp, 0, 0); assert(socket_iocp == self.iocp); // Ensure that synchronous IO completion doesn't queue an unneeded overlapped // and that the event for the socket (WaitForSingleObject) doesn't need to be set. - var mode: os.windows.BYTE = 0; - mode |= os.windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; - mode |= os.windows.FILE_SKIP_SET_EVENT_ON_HANDLE; + var mode: windows.BYTE = 0; + mode |= windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; + mode |= windows.FILE_SKIP_SET_EVENT_ON_HANDLE; - const handle = @as(os.windows.HANDLE, @ptrCast(socket)); - try os.windows.SetFileCompletionNotificationModes(handle, mode); + const handle = @as(windows.HANDLE, @ptrCast(socket)); + try windows.SetFileCompletionNotificationModes(handle, mode); return socket; } /// Opens a directory with read only access. - pub fn open_dir(dir_path: []const u8) !os.fd_t { + pub fn open_dir(dir_path: []const u8) !posix.fd_t { const dir = try std.fs.cwd().openDir(dir_path, .{}); return dir.fd; } - pub const INVALID_FILE = os.windows.INVALID_HANDLE_VALUE; + pub const INVALID_FILE = windows.INVALID_HANDLE_VALUE; - fn open_file_handle(relative_path: []const u8, method: enum { create, open }) !os.fd_t { - const path_w = try os.windows.sliceToPrefixedFileW(relative_path); + fn open_file_handle(relative_path: []const u8, method: enum { create, open }) !posix.fd_t { + const path_w = try windows.sliceToPrefixedFileW(relative_path); // FILE_CREATE = O_CREAT | O_EXCL - var creation_disposition: os.windows.DWORD = 0; + var creation_disposition: windows.DWORD = 0; switch (method) { .create => { - creation_disposition = os.windows.FILE_CREATE; + creation_disposition = windows.FILE_CREATE; log.info("creating \"{s}\"...", .{relative_path}); }, .open => { - creation_disposition = os.windows.OPEN_EXISTING; + creation_disposition = windows.OPEN_EXISTING; log.info("opening \"{s}\"...", .{relative_path}); }, } // O_EXCL - const shared_mode: os.windows.DWORD = 0; + const shared_mode: windows.DWORD = 0; // O_RDWR - var access_mask: os.windows.DWORD = 0; - access_mask |= os.windows.GENERIC_READ; - access_mask |= os.windows.GENERIC_WRITE; + var access_mask: windows.DWORD = 0; + access_mask |= windows.GENERIC_READ; + access_mask |= windows.GENERIC_WRITE; // O_DIRECT | O_DSYNC - var attributes: os.windows.DWORD = 0; - attributes |= os.windows.FILE_FLAG_NO_BUFFERING; - attributes |= os.windows.FILE_FLAG_WRITE_THROUGH; + var attributes: windows.DWORD = 0; + attributes |= windows.FILE_FLAG_NO_BUFFERING; + attributes |= windows.FILE_FLAG_WRITE_THROUGH; // This is critical as we rely on O_DSYNC for fsync() whenever we write to the file: - assert((attributes & os.windows.FILE_FLAG_WRITE_THROUGH) > 0); + assert((attributes & windows.FILE_FLAG_WRITE_THROUGH) > 0); // TODO: Add ReadFileEx/WriteFileEx support. // Not currently needed for O_DIRECT disk IO. - // attributes |= os.windows.FILE_FLAG_OVERLAPPED; + // attributes |= windows.FILE_FLAG_OVERLAPPED; - const handle = os.windows.kernel32.CreateFileW( + const handle = windows.kernel32.CreateFileW( path_w.span(), access_mask, shared_mode, @@ -1003,13 +1004,13 @@ pub const IO = struct { null, // no existing template file ); - if (handle == os.windows.INVALID_HANDLE_VALUE) { - return switch (os.windows.kernel32.GetLastError()) { + if (handle == windows.INVALID_HANDLE_VALUE) { + return switch (windows.kernel32.GetLastError()) { .FILE_NOT_FOUND => error.FileNotFound, .SHARING_VIOLATION, .ACCESS_DENIED => error.AccessDenied, else => |err| { log.warn("CreateFileW(): {}", .{err}); - return os.windows.unexpectedError(err); + return windows.unexpectedError(err); }, }; } @@ -1026,11 +1027,11 @@ pub const IO = struct { /// The caller is responsible for ensuring that the parent directory inode is durable. /// - Verifies that the file size matches the expected file size before returning. pub fn open_file( - dir_handle: os.fd_t, + dir_handle: posix.fd_t, relative_path: []const u8, size: u64, method: enum { create, create_or_open, open }, - ) !os.fd_t { + ) !posix.fd_t { assert(relative_path.len > 0); assert(size % sector_size == 0); @@ -1042,7 +1043,7 @@ pub const IO = struct { else => return err, }, }; - errdefer os.windows.CloseHandle(handle); + errdefer windows.CloseHandle(handle); // Obtain an advisory exclusive lock // even when we haven't given shared access to other processes. @@ -1064,7 +1065,7 @@ pub const IO = struct { const write_offset = size - sector.len; var written: usize = 0; while (written < sector.len) { - written += try os.pwrite(handle, sector[written..], write_offset + written); + written += try posix.pwrite(handle, sector[written..], write_offset + written); } }; } @@ -1073,21 +1074,21 @@ pub const IO = struct { // making decisions on data that was never durably written by a previously crashed process. // We therefore always fsync when we open the path, also to wait for any pending O_DSYNC. // Thanks to Alex Miller from FoundationDB for diving into our source and pointing this out. - try os.fsync(handle); + try posix.fsync(handle); // We cannot fsync the directory handle on Windows. // We have no way to open a directory with write access. // - // try os.fsync(dir_handle); + // try posix.fsync(dir_handle); _ = dir_handle; - const file_size = try os.windows.GetFileSizeEx(handle); + const file_size = try windows.GetFileSizeEx(handle); if (file_size < size) @panic("data file inode size was truncated or corrupted"); return handle; } - fn fs_lock(handle: os.fd_t, size: u64) !void { + fn fs_lock(handle: posix.fd_t, size: u64) !void { // TODO: Look into using SetFileIoOverlappedRange() for better unbuffered async IO perf // NOTE: Requires SeLockMemoryPrivilege. @@ -1096,21 +1097,21 @@ pub const IO = struct { const LOCKFILE_FAIL_IMMEDIATELY = 0x1; extern "kernel32" fn LockFileEx( - hFile: os.windows.HANDLE, - dwFlags: os.windows.DWORD, - dwReserved: os.windows.DWORD, - nNumberOfBytesToLockLow: os.windows.DWORD, - nNumberOfBytesToLockHigh: os.windows.DWORD, - lpOverlapped: ?*os.windows.OVERLAPPED, - ) callconv(os.windows.WINAPI) os.windows.BOOL; + hFile: windows.HANDLE, + dwFlags: windows.DWORD, + dwReserved: windows.DWORD, + nNumberOfBytesToLockLow: windows.DWORD, + nNumberOfBytesToLockHigh: windows.DWORD, + lpOverlapped: ?*windows.OVERLAPPED, + ) callconv(windows.WINAPI) windows.BOOL; }; // hEvent = null // Offset & OffsetHigh = 0 - var lock_overlapped = std.mem.zeroes(os.windows.OVERLAPPED); + var lock_overlapped = std.mem.zeroes(windows.OVERLAPPED); // LOCK_EX | LOCK_NB - var lock_flags: os.windows.DWORD = 0; + var lock_flags: windows.DWORD = 0; lock_flags |= kernel32.LOCKFILE_EXCLUSIVE_LOCK; lock_flags |= kernel32.LOCKFILE_FAIL_IMMEDIATELY; @@ -1123,64 +1124,64 @@ pub const IO = struct { &lock_overlapped, ); - if (locked == os.windows.FALSE) { - return switch (os.windows.kernel32.GetLastError()) { + if (locked == windows.FALSE) { + return switch (windows.kernel32.GetLastError()) { .IO_PENDING => error.WouldBlock, - else => |err| os.windows.unexpectedError(err), + else => |err| windows.unexpectedError(err), }; } } - fn fs_allocate(handle: os.fd_t, size: u64) !void { + fn fs_allocate(handle: posix.fd_t, size: u64) !void { // TODO: Look into using SetFileValidData() instead // NOTE: Requires SE_MANAGE_VOLUME_NAME privilege // Move the file pointer to the start + size - const seeked = os.windows.kernel32.SetFilePointerEx( + const seeked = windows.kernel32.SetFilePointerEx( handle, @as(i64, @intCast(size)), null, // no reference to new file pointer - os.windows.FILE_BEGIN, + windows.FILE_BEGIN, ); - if (seeked == os.windows.FALSE) { - return switch (os.windows.kernel32.GetLastError()) { + if (seeked == windows.FALSE) { + return switch (windows.kernel32.GetLastError()) { .INVALID_HANDLE => unreachable, .INVALID_PARAMETER => unreachable, - else => |err| os.windows.unexpectedError(err), + else => |err| windows.unexpectedError(err), }; } // Mark the moved file pointer (start + size) as the physical EOF. - const allocated = os.windows.kernel32.SetEndOfFile(handle); - if (allocated == os.windows.FALSE) { - const err = os.windows.kernel32.GetLastError(); - return os.windows.unexpectedError(err); + const allocated = windows.kernel32.SetEndOfFile(handle); + if (allocated == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + return windows.unexpectedError(err); } } }; -// TODO: use os.getsockoptError when fixed for windows in stdlib -fn getsockoptError(socket: os.socket_t) IO.ConnectError!void { +// TODO: use posix.getsockoptError when fixed for windows in stdlib +fn getsockoptError(socket: posix.socket_t) IO.ConnectError!void { var err_code: u32 = undefined; var size: i32 = @sizeOf(u32); - const rc = os.windows.ws2_32.getsockopt( + const rc = windows.ws2_32.getsockopt( socket, - os.SOL.SOCKET, - os.SO.ERROR, + posix.SOL.SOCKET, + posix.SO.ERROR, std.mem.asBytes(&err_code), &size, ); if (rc != 0) { - switch (os.windows.ws2_32.WSAGetLastError()) { + switch (windows.ws2_32.WSAGetLastError()) { .WSAENETDOWN => return error.NetworkUnreachable, .WSANOTINITIALISED => unreachable, // WSAStartup() was never called .WSAEFAULT => unreachable, // The address pointed to by optval or optlen is not in a valid part of the process address space. .WSAEINVAL => unreachable, // The level parameter is unknown or invalid .WSAENOPROTOOPT => unreachable, // The option is unknown at the level indicated. .WSAENOTSOCK => return error.FileDescriptorNotASocket, - else => |err| return os.windows.unexpectedWSAError(err), + else => |err| return windows.unexpectedWSAError(err), } } @@ -1188,7 +1189,7 @@ fn getsockoptError(socket: os.socket_t) IO.ConnectError!void { if (err_code == 0) return; - const ws_err = @as(os.windows.ws2_32.WinsockError, @enumFromInt(@as(u16, @intCast(err_code)))); + const ws_err = @as(windows.ws2_32.WinsockError, @enumFromInt(@as(u16, @intCast(err_code)))); return switch (ws_err) { .WSAEACCES => error.PermissionDenied, .WSAEADDRINUSE => error.AddressInUse, @@ -1204,6 +1205,6 @@ fn getsockoptError(socket: os.socket_t) IO.ConnectError!void { .WSAEPROTOTYPE => unreachable, .WSAETIMEDOUT => error.ConnectionTimedOut, .WSAECONNRESET => error.ConnectionResetByPeer, - else => |e| os.windows.unexpectedWSAError(e), + else => |e| windows.unexpectedWSAError(e), }; } diff --git a/src/sample_web_server.zig b/src/sample_web_server.zig index 3e8376d..22f2864 100644 --- a/src/sample_web_server.zig +++ b/src/sample_web_server.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const Atomic = std.atomic.Value; const debug = std.debug; @@ -6,7 +7,7 @@ const fifo = std.fifo; const heap = std.heap; const log = std.log.scoped(.server); const mem = std.mem; -const os = std.os; +const posix = std.posix; const Thread = std.Thread; const IO = @import("io").IO; @@ -23,7 +24,7 @@ pub const Config = struct { pub const recv_buf_len = 1024; }; -const Queue = fifo.LinearFifo(os.socket_t, .{ .Static = Config.accept_buf_len }); +const Queue = fifo.LinearFifo(posix.socket_t, .{ .Static = Config.accept_buf_len }); var running = Atomic(bool).init(true); @@ -34,8 +35,10 @@ const Acceptor = struct { }; pub fn main() !void { - // Handle OS signals for graceful shutdown. - try addSignalHandlers(); + if (builtin.target.isBSD() or builtin.target.os.tag == .linux) { + // Handle OS signals for graceful shutdown. + try addSignalHandlers(); + } // Cross-platform IO setup. var io = try IO.init(Config.io_entries, 0); @@ -43,11 +46,11 @@ pub fn main() !void { // Listener setup const address = try std.net.Address.parseIp4(Config.server_ip, Config.server_port); - const listener = try io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); - defer os.closeSocket(listener); - try os.setsockopt(listener, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); - try os.bind(listener, &address.any, address.getOsSockLen()); - try os.listen(listener, Config.kernel_backlog); + const listener = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP); + defer posix.close(listener); + try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + try posix.bind(listener, &address.any, address.getOsSockLen()); + try posix.listen(listener, Config.kernel_backlog); log.info("server listening on IP {s} port {}. CTRL+C to shutdown.", .{ Config.server_ip, Config.server_port }); @@ -67,7 +70,7 @@ pub fn main() !void { .queue = &queue, }; - while (running.load(.Monotonic)) { + while (running.load(.monotonic)) { // Start accepting. var acceptor_completion: IO.Completion = undefined; io.accept(*Acceptor, &acceptor, acceptCallback, &acceptor_completion, listener); @@ -83,7 +86,7 @@ pub fn main() !void { fn acceptCallback( acceptor_ptr: *Acceptor, completion: *IO.Completion, - result: IO.AcceptError!os.socket_t, + result: IO.AcceptError!posix.socket_t, ) void { _ = completion; const accepted_sock = result catch @panic("accept error"); @@ -111,7 +114,7 @@ const Client = struct { completion: IO.Completion, io: *IO, recv_buf: [Config.recv_buf_len]u8, - socket: os.socket_t, + socket: posix.socket_t, thread_id: Thread.Id, }; @@ -127,9 +130,9 @@ fn handleClient(queue_mutex: *Thread.Mutex, queue: *Queue) !void { var fba = heap.FixedBufferAllocator.init(&client_buf); const allocator = fba.allocator(); - while (running.load(.Monotonic)) { + while (running.load(.monotonic)) { // Get next accepted client socket. - const maybe_socket: ?os.socket_t = blk: { + const maybe_socket: ?posix.socket_t = blk: { queue_mutex.lock(); defer queue_mutex.unlock(); break :blk queue.readItem(); @@ -239,31 +242,31 @@ fn closeCallback( fn addSignalHandlers() !void { // Ignore broken pipes { - var act = os.Sigaction{ + var act = posix.Sigaction{ .handler = .{ - .handler = os.SIG.IGN, + .handler = posix.SIG.IGN, }, - .mask = os.empty_sigset, + .mask = posix.empty_sigset, .flags = 0, }; - try os.sigaction(os.SIG.PIPE, &act, null); + try posix.sigaction(posix.SIG.PIPE, &act, null); } // Catch SIGINT/SIGTERM for proper shutdown { - var act = os.Sigaction{ + var act = posix.Sigaction{ .handler = .{ .handler = struct { fn wrapper(sig: c_int) callconv(.C) void { log.info("Caught signal {d}; Shutting down...", .{sig}); - running.store(false, .Release); + running.store(false, .release); } }.wrapper, }, - .mask = os.empty_sigset, + .mask = posix.empty_sigset, .flags = 0, }; - try os.sigaction(os.SIG.TERM, &act, null); - try os.sigaction(os.SIG.INT, &act, null); + try posix.sigaction(posix.SIG.TERM, &act, null); + try posix.sigaction(posix.SIG.INT, &act, null); } } diff --git a/src/stdx.zig b/src/stdx.zig index 33fac5e..864f167 100644 --- a/src/stdx.zig +++ b/src/stdx.zig @@ -60,7 +60,7 @@ pub inline fn copy_left( if (!disjoint_slices(T, T, target, source)) { assert(@intFromPtr(target.ptr) < @intFromPtr(source.ptr)); } - std.mem.copy(T, target, source); + @memcpy(target[0..source.len], source); } test "copy_left" { @@ -291,8 +291,8 @@ pub fn no_padding(comptime T: type) bool { .Array => |info| return no_padding(info.child), .Struct => |info| { switch (info.layout) { - .Auto => return false, - .Extern => { + .auto => return false, + .@"extern" => { for (info.fields) |field| { if (!no_padding(field.type)) return false; } @@ -322,7 +322,7 @@ pub fn no_padding(comptime T: type) bool { } return offset == @sizeOf(T); }, - .Packed => return @bitSizeOf(T) == 8 * @sizeOf(T), + .@"packed" => return @bitSizeOf(T) == 8 * @sizeOf(T), } }, .Enum => |info| { diff --git a/src/test.zig b/src/test.zig index 8e5fd4b..ff0f4cd 100644 --- a/src/test.zig +++ b/src/test.zig @@ -1,6 +1,6 @@ const std = @import("std"); const builtin = @import("builtin"); -const os = std.os; +const posix = std.posix; const testing = std.testing; const assert = std.debug.assert; @@ -13,7 +13,7 @@ test "write/read/close" { io: IO, done: bool = false, - fd: os.fd_t, + fd: posix.fd_t, write_buf: [20]u8 = [_]u8{97} ** 20, read_buf: [20]u8 = [_]u8{98} ** 20, @@ -87,10 +87,10 @@ test "accept/connect/send/receive" { io: *IO, done: bool = false, - server: os.socket_t, - client: os.socket_t, + server: posix.socket_t, + client: posix.socket_t, - accepted_sock: os.socket_t = undefined, + accepted_sock: posix.socket_t = undefined, send_buf: [10]u8 = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 }, recv_buf: [5]u8 = [_]u8{ 0, 1, 0, 1, 0 }, @@ -104,24 +104,24 @@ test "accept/connect/send/receive" { const address = try std.net.Address.parseIp4("127.0.0.1", 0); const kernel_backlog = 1; - const server = try io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); - defer os.closeSocket(server); + const server = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP); + defer posix.close(server); - const client = try io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); - defer os.closeSocket(client); + const client = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP); + defer posix.close(client); - try os.setsockopt( + try posix.setsockopt( server, - os.SOL.SOCKET, - os.SO.REUSEADDR, + posix.SOL.SOCKET, + posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)), ); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + try posix.bind(server, &address.any, address.getOsSockLen()); + try posix.listen(server, kernel_backlog); var client_address = std.net.Address.initIp4(undefined, undefined); var client_address_len = client_address.getOsSockLen(); - try os.getsockname(server, &client_address.any, &client_address_len); + try posix.getsockname(server, &client_address.any, &client_address_len); var self: Context = .{ .io = &io, @@ -180,7 +180,7 @@ test "accept/connect/send/receive" { fn accept_callback( self: *Context, completion: *IO.Completion, - result: IO.AcceptError!os.socket_t, + result: IO.AcceptError!posix.socket_t, ) void { self.accepted_sock = result catch @panic("accept error"); self.io.recv( @@ -314,7 +314,7 @@ test "tick to wait" { const Context = @This(); io: IO, - accepted: os.socket_t = IO.INVALID_SOCKET, + accepted: posix.socket_t = IO.INVALID_SOCKET, connected: bool = false, received: bool = false, @@ -325,24 +325,24 @@ test "tick to wait" { const address = try std.net.Address.parseIp4("127.0.0.1", 0); const kernel_backlog = 1; - const server = try self.io.open_socket(address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); - defer os.closeSocket(server); + const server = try self.io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP); + defer posix.close(server); - try os.setsockopt( + try posix.setsockopt( server, - os.SOL.SOCKET, - os.SO.REUSEADDR, + posix.SOL.SOCKET, + posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)), ); - try os.bind(server, &address.any, address.getOsSockLen()); - try os.listen(server, kernel_backlog); + try posix.bind(server, &address.any, address.getOsSockLen()); + try posix.listen(server, kernel_backlog); var client_address = std.net.Address.initIp4(undefined, undefined); var client_address_len = client_address.getOsSockLen(); - try os.getsockname(server, &client_address.any, &client_address_len); + try posix.getsockname(server, &client_address.any, &client_address_len); - const client = try self.io.open_socket(client_address.any.family, os.SOCK.STREAM, os.IPPROTO.TCP); - defer os.closeSocket(client); + const client = try self.io.open_socket(client_address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP); + defer posix.close(client); // Start the accept var server_completion: IO.Completion = undefined; @@ -368,7 +368,7 @@ test "tick to wait" { assert(self.connected); assert(self.accepted != IO.INVALID_SOCKET); - defer os.closeSocket(self.accepted); + defer posix.close(self.accepted); // Start receiving on the client var recv_completion: IO.Completion = undefined; @@ -410,7 +410,7 @@ test "tick to wait" { fn accept_callback( self: *Context, completion: *IO.Completion, - result: IO.AcceptError!os.socket_t, + result: IO.AcceptError!posix.socket_t, ) void { _ = completion; @@ -442,15 +442,15 @@ test "tick to wait" { self.received = true; } - // TODO: use os.send() instead when it gets fixed for windows - fn os_send(sock: os.socket_t, buf: []const u8, flags: u32) !usize { + // TODO: use posix.send() instead when it gets fixed for windows + fn os_send(sock: posix.socket_t, buf: []const u8, flags: u32) !usize { if (builtin.target.os.tag != .windows) { - return os.send(sock, buf, flags); + return posix.send(sock, buf, flags); } - const rc = os.windows.sendto(sock, buf.ptr, buf.len, flags, null, 0); - if (rc == os.windows.ws2_32.SOCKET_ERROR) { - switch (os.windows.ws2_32.WSAGetLastError()) { + const rc = posix.windows.sendto(sock, buf.ptr, buf.len, flags, null, 0); + if (rc == posix.windows.ws2_32.SOCKET_ERROR) { + switch (posix.windows.ws2_32.WSAGetLastError()) { .WSAEACCES => return error.AccessDenied, .WSAEADDRNOTAVAIL => return error.AddressNotAvailable, .WSAECONNRESET => return error.ConnectionResetByPeer, @@ -470,7 +470,7 @@ test "tick to wait" { .WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH. .WSAEWOULDBLOCK => return error.WouldBlock, .WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function. - else => |err| return os.windows.unexpectedWSAError(err), + else => |err| return posix.windows.unexpectedWSAError(err), } } else { return @as(usize, @intCast(rc)); @@ -490,7 +490,7 @@ test "pipe data over socket" { const Context = @This(); const Socket = struct { - fd: os.socket_t = IO.INVALID_SOCKET, + fd: posix.socket_t = IO.INVALID_SOCKET, completion: IO.Completion = undefined, }; const Pipe = struct { @@ -514,23 +514,23 @@ test "pipe data over socket" { }; defer self.io.deinit(); - self.server.fd = try self.io.open_socket(os.AF.INET, os.SOCK.STREAM, os.IPPROTO.TCP); - defer os.closeSocket(self.server.fd); + self.server.fd = try self.io.open_socket(posix.AF.INET, posix.SOCK.STREAM, posix.IPPROTO.TCP); + defer posix.close(self.server.fd); const address = try std.net.Address.parseIp4("127.0.0.1", 0); - try os.setsockopt( + try posix.setsockopt( self.server.fd, - os.SOL.SOCKET, - os.SO.REUSEADDR, + posix.SOL.SOCKET, + posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)), ); - try os.bind(self.server.fd, &address.any, address.getOsSockLen()); - try os.listen(self.server.fd, 1); + try posix.bind(self.server.fd, &address.any, address.getOsSockLen()); + try posix.listen(self.server.fd, 1); var client_address = std.net.Address.initIp4(undefined, undefined); var client_address_len = client_address.getOsSockLen(); - try os.getsockname(self.server.fd, &client_address.any, &client_address_len); + try posix.getsockname(self.server.fd, &client_address.any, &client_address_len); self.io.accept( *Context, @@ -540,8 +540,8 @@ test "pipe data over socket" { self.server.fd, ); - self.tx.socket.fd = try self.io.open_socket(os.AF.INET, os.SOCK.STREAM, os.IPPROTO.TCP); - defer os.closeSocket(self.tx.socket.fd); + self.tx.socket.fd = try self.io.open_socket(posix.AF.INET, posix.SOCK.STREAM, posix.IPPROTO.TCP); + defer posix.close(self.tx.socket.fd); self.io.connect( *Context, @@ -565,7 +565,7 @@ test "pipe data over socket" { try testing.expect(self.server.fd != IO.INVALID_SOCKET); try testing.expect(self.tx.socket.fd != IO.INVALID_SOCKET); try testing.expect(self.rx.socket.fd != IO.INVALID_SOCKET); - os.closeSocket(self.rx.socket.fd); + posix.close(self.rx.socket.fd); try testing.expectEqual(self.tx.transferred, buffer_size); try testing.expectEqual(self.rx.transferred, buffer_size); @@ -575,7 +575,7 @@ test "pipe data over socket" { fn on_accept( self: *Context, completion: *IO.Completion, - result: IO.AcceptError!os.socket_t, + result: IO.AcceptError!posix.socket_t, ) void { assert(self.rx.socket.fd == IO.INVALID_SOCKET); assert(&self.server.completion == completion); diff --git a/src/time.zig b/src/time.zig index e894b6c..11e921a 100644 --- a/src/time.zig +++ b/src/time.zig @@ -2,6 +2,7 @@ const std = @import("std"); const builtin = @import("builtin"); const os = std.os; +const posix = std.posix; const assert = std.debug.assert; const is_darwin = builtin.target.os.tag.isDarwin(); const is_windows = builtin.target.os.tag == .windows; @@ -48,8 +49,8 @@ pub const Time = struct { // https://opensource.apple.com/source/Libc/Libc-1158.1.2/gen/clock_gettime.c.auto.html if (is_darwin) { const darwin = struct { - const mach_timebase_info_t = os.darwin.mach_timebase_info_data; - extern "c" fn mach_timebase_info(info: *mach_timebase_info_t) os.darwin.kern_return_t; + const mach_timebase_info_t = std.c.mach_timebase_info_data; + extern "c" fn mach_timebase_info(info: *mach_timebase_info_t) std.c.kern_return_t; extern "c" fn mach_continuous_time() u64; }; @@ -67,8 +68,8 @@ pub const Time = struct { // CLOCK_BOOTTIME is the same as CLOCK_MONOTONIC but includes elapsed time during a suspend. // For more detail and why CLOCK_MONOTONIC_RAW is even worse than CLOCK_MONOTONIC, // see https://github.com/ziglang/zig/pull/933#discussion_r656021295. - var ts: os.timespec = undefined; - os.clock_gettime(os.CLOCK.BOOTTIME, &ts) catch @panic("CLOCK_BOOTTIME required"); + var ts: posix.timespec = undefined; + posix.clock_gettime(posix.CLOCK.BOOTTIME, &ts) catch @panic("CLOCK_BOOTTIME required"); break :blk @as(u64, @intCast(ts.tv_sec)) * std.time.ns_per_s + @as(u64, @intCast(ts.tv_nsec)); };