Files
iofthetiger/src/test.zig
2024-03-28 14:46:31 +03:30

655 lines
21 KiB
Zig

const std = @import("std");
const builtin = @import("builtin");
const posix = std.posix;
const testing = std.testing;
const assert = std.debug.assert;
const Time = @import("time.zig").Time;
const IO = @import("io.zig").IO;
test "write/read/close" {
try struct {
const Context = @This();
io: IO,
done: bool = false,
fd: posix.fd_t,
write_buf: [20]u8 = [_]u8{97} ** 20,
read_buf: [20]u8 = [_]u8{98} ** 20,
written: usize = 0,
read: usize = 0,
fn run_test() !void {
const path = "test_io_write_read_close";
const file = try std.fs.cwd().createFile(path, .{ .read = true, .truncate = true });
defer std.fs.cwd().deleteFile(path) catch {};
var self: Context = .{
.io = try IO.init(32, 0),
.fd = file.handle,
};
defer self.io.deinit();
var completion: IO.Completion = undefined;
self.io.write(
*Context,
&self,
write_callback,
&completion,
self.fd,
&self.write_buf,
10,
);
while (!self.done) try self.io.tick();
try testing.expectEqual(self.write_buf.len, self.written);
try testing.expectEqual(self.read_buf.len, self.read);
try testing.expectEqualSlices(u8, &self.write_buf, &self.read_buf);
}
fn write_callback(
self: *Context,
completion: *IO.Completion,
result: IO.WriteError!usize,
) void {
self.written = result catch @panic("write error");
self.io.read(*Context, self, read_callback, completion, self.fd, &self.read_buf, 10);
}
fn read_callback(
self: *Context,
completion: *IO.Completion,
result: IO.ReadError!usize,
) void {
self.read = result catch @panic("read error");
self.io.close(*Context, self, close_callback, completion, self.fd);
}
fn close_callback(
self: *Context,
completion: *IO.Completion,
result: IO.CloseError!void,
) void {
_ = completion;
_ = result catch @panic("close error");
self.done = true;
}
}.run_test();
}
test "accept/connect/send/receive" {
try struct {
const Context = @This();
io: *IO,
done: bool = false,
server: posix.socket_t,
client: posix.socket_t,
accepted_sock: posix.socket_t = undefined,
send_buf: [10]u8 = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 },
recv_buf: [5]u8 = [_]u8{ 0, 1, 0, 1, 0 },
sent: usize = 0,
received: usize = 0,
fn run_test() !void {
var io = try IO.init(32, 0);
defer io.deinit();
const address = try std.net.Address.parseIp4("127.0.0.1", 0);
const kernel_backlog = 1;
const server = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer posix.close(server);
const client = try io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer posix.close(client);
try posix.setsockopt(
server,
posix.SOL.SOCKET,
posix.SO.REUSEADDR,
&std.mem.toBytes(@as(c_int, 1)),
);
try posix.bind(server, &address.any, address.getOsSockLen());
try posix.listen(server, kernel_backlog);
var client_address = std.net.Address.initIp4(undefined, undefined);
var client_address_len = client_address.getOsSockLen();
try posix.getsockname(server, &client_address.any, &client_address_len);
var self: Context = .{
.io = &io,
.server = server,
.client = client,
};
var client_completion: IO.Completion = undefined;
self.io.connect(
*Context,
&self,
connect_callback,
&client_completion,
client,
client_address,
);
var server_completion: IO.Completion = undefined;
self.io.accept(*Context, &self, accept_callback, &server_completion, server);
while (!self.done) try self.io.tick();
try testing.expectEqual(self.send_buf.len, self.sent);
try testing.expectEqual(self.recv_buf.len, self.received);
try testing.expectEqualSlices(u8, self.send_buf[0..self.received], &self.recv_buf);
}
fn connect_callback(
self: *Context,
completion: *IO.Completion,
result: IO.ConnectError!void,
) void {
_ = result catch @panic("connect error");
self.io.send(
*Context,
self,
send_callback,
completion,
self.client,
&self.send_buf,
);
}
fn send_callback(
self: *Context,
completion: *IO.Completion,
result: IO.SendError!usize,
) void {
_ = completion;
self.sent = result catch @panic("send error");
}
fn accept_callback(
self: *Context,
completion: *IO.Completion,
result: IO.AcceptError!posix.socket_t,
) void {
self.accepted_sock = result catch @panic("accept error");
self.io.recv(
*Context,
self,
recv_callback,
completion,
self.accepted_sock,
&self.recv_buf,
);
}
fn recv_callback(
self: *Context,
completion: *IO.Completion,
result: IO.RecvError!usize,
) void {
_ = completion;
self.received = result catch @panic("recv error");
self.done = true;
}
}.run_test();
}
test "timeout" {
const ms = 20;
const margin = 5;
const count = 10;
try struct {
const Context = @This();
io: IO,
timer: *Time,
count: u32 = 0,
stop_time: u64 = 0,
fn run_test() !void {
var timer = Time{};
const start_time = timer.monotonic();
var self: Context = .{
.timer = &timer,
.io = try IO.init(32, 0),
};
defer self.io.deinit();
var completions: [count]IO.Completion = undefined;
for (&completions) |*completion| {
self.io.timeout(
*Context,
&self,
timeout_callback,
completion,
ms * std.time.ns_per_ms,
);
}
while (self.count < count) try self.io.tick();
try self.io.tick();
try testing.expectEqual(@as(u32, count), self.count);
try testing.expectApproxEqAbs(
@as(f64, ms),
@as(f64, @floatFromInt((self.stop_time - start_time) / std.time.ns_per_ms)),
margin,
);
}
fn timeout_callback(
self: *Context,
completion: *IO.Completion,
result: IO.TimeoutError!void,
) void {
_ = completion;
_ = result catch @panic("timeout error");
if (self.stop_time == 0) self.stop_time = self.timer.monotonic();
self.count += 1;
}
}.run_test();
}
test "submission queue full" {
const ms = 20;
const count = 10;
try struct {
const Context = @This();
io: IO,
count: u32 = 0,
fn run_test() !void {
var self: Context = .{ .io = try IO.init(1, 0) };
defer self.io.deinit();
var completions: [count]IO.Completion = undefined;
for (&completions) |*completion| {
self.io.timeout(
*Context,
&self,
timeout_callback,
completion,
ms * std.time.ns_per_ms,
);
}
while (self.count < count) try self.io.tick();
try self.io.tick();
try testing.expectEqual(@as(u32, count), self.count);
}
fn timeout_callback(
self: *Context,
completion: *IO.Completion,
result: IO.TimeoutError!void,
) void {
_ = completion;
_ = result catch @panic("timeout error");
self.count += 1;
}
}.run_test();
}
test "tick to wait" {
// Use only IO.tick() to see if pending IO is actually processsed
try struct {
const Context = @This();
io: IO,
accepted: posix.socket_t = IO.INVALID_SOCKET,
connected: bool = false,
received: bool = false,
fn run_test() !void {
var self: Context = .{ .io = try IO.init(1, 0) };
defer self.io.deinit();
const address = try std.net.Address.parseIp4("127.0.0.1", 0);
const kernel_backlog = 1;
const server = try self.io.open_socket(address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer posix.close(server);
try posix.setsockopt(
server,
posix.SOL.SOCKET,
posix.SO.REUSEADDR,
&std.mem.toBytes(@as(c_int, 1)),
);
try posix.bind(server, &address.any, address.getOsSockLen());
try posix.listen(server, kernel_backlog);
var client_address = std.net.Address.initIp4(undefined, undefined);
var client_address_len = client_address.getOsSockLen();
try posix.getsockname(server, &client_address.any, &client_address_len);
const client = try self.io.open_socket(client_address.any.family, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer posix.close(client);
// Start the accept
var server_completion: IO.Completion = undefined;
self.io.accept(*Context, &self, accept_callback, &server_completion, server);
// Start the connect
var client_completion: IO.Completion = undefined;
self.io.connect(
*Context,
&self,
connect_callback,
&client_completion,
client,
client_address,
);
// Tick the IO to drain the accept & connect completions
assert(!self.connected);
assert(self.accepted == IO.INVALID_SOCKET);
while (self.accepted == IO.INVALID_SOCKET or !self.connected)
try self.io.tick();
assert(self.connected);
assert(self.accepted != IO.INVALID_SOCKET);
defer posix.close(self.accepted);
// Start receiving on the client
var recv_completion: IO.Completion = undefined;
var recv_buffer: [64]u8 = undefined;
@memset(&recv_buffer, 0xaa);
self.io.recv(
*Context,
&self,
recv_callback,
&recv_completion,
client,
&recv_buffer,
);
// Drain out the recv completion from any internal IO queues
try self.io.tick();
try self.io.tick();
try self.io.tick();
// Complete the recv() *outside* of the IO instance.
// Other tests already check .tick() with IO based completions.
// This simulates IO being completed by an external system
var send_buf = std.mem.zeroes([64]u8);
const wrote = try os_send(self.accepted, &send_buf, 0);
try testing.expectEqual(wrote, send_buf.len);
// Wait for the recv() to complete using only IO.tick().
// If tick is broken, then this will deadlock
assert(!self.received);
while (!self.received) {
try self.io.tick();
}
// Make sure the receive actually happened
assert(self.received);
try testing.expect(std.mem.eql(u8, &recv_buffer, &send_buf));
}
fn accept_callback(
self: *Context,
completion: *IO.Completion,
result: IO.AcceptError!posix.socket_t,
) void {
_ = completion;
assert(self.accepted == IO.INVALID_SOCKET);
self.accepted = result catch @panic("accept error");
}
fn connect_callback(
self: *Context,
completion: *IO.Completion,
result: IO.ConnectError!void,
) void {
_ = completion;
_ = result catch @panic("connect error");
assert(!self.connected);
self.connected = true;
}
fn recv_callback(
self: *Context,
completion: *IO.Completion,
result: IO.RecvError!usize,
) void {
_ = completion;
_ = result catch |err| std.debug.panic("recv error: {}", .{err});
assert(!self.received);
self.received = true;
}
// TODO: use posix.send() instead when it gets fixed for windows
fn os_send(sock: posix.socket_t, buf: []const u8, flags: u32) !usize {
if (builtin.target.os.tag != .windows) {
return posix.send(sock, buf, flags);
}
const rc = posix.windows.sendto(sock, buf.ptr, buf.len, flags, null, 0);
if (rc == posix.windows.ws2_32.SOCKET_ERROR) {
switch (posix.windows.ws2_32.WSAGetLastError()) {
.WSAEACCES => return error.AccessDenied,
.WSAEADDRNOTAVAIL => return error.AddressNotAvailable,
.WSAECONNRESET => return error.ConnectionResetByPeer,
.WSAEMSGSIZE => return error.MessageTooBig,
.WSAENOBUFS => return error.SystemResources,
.WSAENOTSOCK => return error.FileDescriptorNotASocket,
.WSAEAFNOSUPPORT => return error.AddressFamilyNotSupported,
.WSAEDESTADDRREQ => unreachable, // A destination address is required.
.WSAEFAULT => unreachable, // The lpBuffers, lpTo, lpOverlapped, lpNumberOfBytesSent, or lpCompletionRoutine parameters are not part of the user address space, or the lpTo parameter is too small.
.WSAEHOSTUNREACH => return error.NetworkUnreachable,
// TODO: WSAEINPROGRESS, WSAEINTR
.WSAEINVAL => unreachable,
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENETUNREACH => return error.NetworkUnreachable,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH.
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function.
else => |err| return posix.windows.unexpectedWSAError(err),
}
} else {
return @as(usize, @intCast(rc));
}
}
}.run_test();
}
test "pipe data over socket" {
try struct {
io: IO,
tx: Pipe,
rx: Pipe,
server: Socket = .{},
const buffer_size = 1 * 1024 * 1024;
const Context = @This();
const Socket = struct {
fd: posix.socket_t = IO.INVALID_SOCKET,
completion: IO.Completion = undefined,
};
const Pipe = struct {
socket: Socket = .{},
buffer: []u8,
transferred: usize = 0,
};
fn run() !void {
const tx_buf = try testing.allocator.alloc(u8, buffer_size);
defer testing.allocator.free(tx_buf);
const rx_buf = try testing.allocator.alloc(u8, buffer_size);
defer testing.allocator.free(rx_buf);
@memset(tx_buf, 1);
@memset(rx_buf, 0);
var self = Context{
.io = try IO.init(32, 0),
.tx = .{ .buffer = tx_buf },
.rx = .{ .buffer = rx_buf },
};
defer self.io.deinit();
self.server.fd = try self.io.open_socket(posix.AF.INET, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer posix.close(self.server.fd);
const address = try std.net.Address.parseIp4("127.0.0.1", 0);
try posix.setsockopt(
self.server.fd,
posix.SOL.SOCKET,
posix.SO.REUSEADDR,
&std.mem.toBytes(@as(c_int, 1)),
);
try posix.bind(self.server.fd, &address.any, address.getOsSockLen());
try posix.listen(self.server.fd, 1);
var client_address = std.net.Address.initIp4(undefined, undefined);
var client_address_len = client_address.getOsSockLen();
try posix.getsockname(self.server.fd, &client_address.any, &client_address_len);
self.io.accept(
*Context,
&self,
on_accept,
&self.server.completion,
self.server.fd,
);
self.tx.socket.fd = try self.io.open_socket(posix.AF.INET, posix.SOCK.STREAM, posix.IPPROTO.TCP);
defer posix.close(self.tx.socket.fd);
self.io.connect(
*Context,
&self,
on_connect,
&self.tx.socket.completion,
self.tx.socket.fd,
client_address,
);
var tick: usize = 0xdeadbeef;
while (self.rx.transferred != self.rx.buffer.len) : (tick +%= 1) {
if (tick % 61 == 0) {
const timeout_ns = tick % (10 * std.time.ns_per_ms);
try self.io.run_for_ns(@as(u63, @intCast(timeout_ns)));
} else {
try self.io.tick();
}
}
try testing.expect(self.server.fd != IO.INVALID_SOCKET);
try testing.expect(self.tx.socket.fd != IO.INVALID_SOCKET);
try testing.expect(self.rx.socket.fd != IO.INVALID_SOCKET);
posix.close(self.rx.socket.fd);
try testing.expectEqual(self.tx.transferred, buffer_size);
try testing.expectEqual(self.rx.transferred, buffer_size);
try testing.expect(std.mem.eql(u8, self.tx.buffer, self.rx.buffer));
}
fn on_accept(
self: *Context,
completion: *IO.Completion,
result: IO.AcceptError!posix.socket_t,
) void {
assert(self.rx.socket.fd == IO.INVALID_SOCKET);
assert(&self.server.completion == completion);
self.rx.socket.fd = result catch |err| std.debug.panic("accept error {}", .{err});
assert(self.rx.transferred == 0);
self.do_receiver(0);
}
fn on_connect(
self: *Context,
completion: *IO.Completion,
result: IO.ConnectError!void,
) void {
_ = result catch unreachable;
assert(self.tx.socket.fd != IO.INVALID_SOCKET);
assert(&self.tx.socket.completion == completion);
assert(self.tx.transferred == 0);
self.do_sender(0);
}
fn do_sender(self: *Context, bytes: usize) void {
self.tx.transferred += bytes;
assert(self.tx.transferred <= self.tx.buffer.len);
if (self.tx.transferred < self.tx.buffer.len) {
self.io.send(
*Context,
self,
on_send,
&self.tx.socket.completion,
self.tx.socket.fd,
self.tx.buffer[self.tx.transferred..],
);
}
}
fn on_send(
self: *Context,
completion: *IO.Completion,
result: IO.SendError!usize,
) void {
const bytes = result catch |err| std.debug.panic("send error: {}", .{err});
assert(&self.tx.socket.completion == completion);
self.do_sender(bytes);
}
fn do_receiver(self: *Context, bytes: usize) void {
self.rx.transferred += bytes;
assert(self.rx.transferred <= self.rx.buffer.len);
if (self.rx.transferred < self.rx.buffer.len) {
self.io.recv(
*Context,
self,
on_recv,
&self.rx.socket.completion,
self.rx.socket.fd,
self.rx.buffer[self.rx.transferred..],
);
}
}
fn on_recv(
self: *Context,
completion: *IO.Completion,
result: IO.RecvError!usize,
) void {
const bytes = result catch |err| std.debug.panic("recv error: {}", .{err});
assert(&self.rx.socket.completion == completion);
self.do_receiver(bytes);
}
}.run();
}