Servidor HTTP - Parte 4

Organizando

Enzo Soares

Publicado em 8 de Abril de 2025 às 13:19

Saudações!

Continuando da semana passada, agora vamos trabalhar em outras otimizações de performance, além de facilitar o uso da nossa ferramenta. Primeiro, vamos mudar a nossa implementação de parseRequest para usarmos menos fatias:

pub fn parseRequest(allocator: mem.Allocator, request_raw: []u8) !Request {
    var request_iter = mem.tokenizeSequence(u8, request_raw, "\r\n\r\n");
    const raw_headers = request_iter.next() orelse "";
    var request = try parseHeaders(allocator, raw_headers);
    request.body = request_iter.next() orelse null;

    return request;
}

fn parseHeaders(allocator: mem.Allocator, raw_headers: []const u8) !Request {
    var request = Request.init(allocator);

    const first_line_end = mem.indexOfScalar(u8, raw_headers, '\n') orelse raw_headers.len;
    const first_line = mem.trim(u8, raw_headers[0..first_line_end], "\r");

    {
        var iter = mem.splitScalar(u8, first_line, ' ');
        request.method = iter.next() orelse "";
        request.path = iter.next() orelse "";
        request.version = iter.next() orelse "";
    }

    var headers = raw_headers[first_line_end + 1 ..];
    while (headers.len > 0) {
        const line_end = mem.indexOfScalar(u8, headers, '\n') orelse headers.len;
        const line = mem.trim(u8, headers[0..line_end], "\r");

        if (line.len > 0) {
            const colon = mem.indexOfScalar(u8, line, ':') orelse {
                headers = headers[line_end + 1 ..];
                continue;
            };

            const name = mem.trim(u8, line[0..colon], " ");
            const value = if (colon + 1 < line.len)
                mem.trim(u8, line[colon + 1 ..], " ")
            else
                "";

            try request.headers.put(name, value);
        }

        if (line_end + 1 > headers.len) {
            break;
        }
        headers = headers[line_end + 1 ..];
    }

    return request;
}

Depois, vamos remover a chamada de printSelf de on_read. Essas alterações nos dão um tempo médio de resposta de 5 ms e uma taxa de erro de apenas 2%. Ótimo progresso!

No momento, um usuário que deseje escrever uma resposta com status diferente ou fazer algo com a requisição está impossibilitado. Vamos mudar isso. Primeiro, vamos criar um objeto que representa a nossa resposta:

pub const Response = struct {
    status_code: u16,
    headers: StringHashMap([]const u8),
    body: ?[]const u8,

    const Self = @This();

    pub fn init(allocator: mem.Allocator) Self {
        const map = StringHashMap([]const u8).init(allocator);
        return Self{
            .body = null,
            .headers = map,
            .status_code = 200,
        };
    }

    pub fn deinit(self: *Self) void {
        self.headers.deinit();
        self.* = undefined;
    }
}

De forma bem similar à requisição, temos um hashmap para armazenar os cabeçalhos e um array para armazenar o corpo da resposta, além de um inteiro para representar o status. Um cliente HTTP não está preparado para receber essa struct como resposta: ele espera um string. Vamos criar um método então para transformar uma instância dessa struct em algo printável:

pub fn to_string(self: Self, allocator: std.mem.Allocator) ![]const u8 {
    var response_string = std.ArrayList(u8).init(allocator);
    defer response_string.deinit();

    const writer = response_string.writer();

    try writer.print("HTTP/1.1 {d} {s}\r\n", .{
        self.status_code,
        statusCodeToString(self.status_code) orelse "Unknown",
    });

    var headers_iter = self.headers.iterator();
    while (headers_iter.next()) |entry| {
        try writer.print("{s}: {s}\r\n", .{entry.key_ptr.*, entry.value_ptr.*});
    }

    try writer.writeAll("\r\n");

    if (self.body) |body| {
        try writer.writeAll(body);
    }

    return response_string.toOwnedSlice();
}

fn statusCodeToString(code: u16) ?[]const u8 {
    return switch (code) {
        200 => "OK",
        201 => "Created",
        204 => "No Content",
        400 => "Bad Request",
        401 => "Unauthorized",
        403 => "Forbidden",
        404 => "Not Found",
        500 => "Internal Server Error",
        else => null,
    };
}

Em seguida, precisamos criar um abstração para um servidor, que vai iniciar o loop de eventos e chamar um callback para cada conexão:

const constants = @import("constants.zig");

pub const Config = struct {
    port: u16 = constants.DEFAULT_PORT,
};

pub const Server = struct {
    allocator: *mem.Allocator,
    config: Config = .{},
    callback: callback_t,

    const Self = @This();

    pub fn init(allocator: *mem.Allocator, config: Config, callback: callback_t) Self {
        return Self{
            .allocator = allocator,
            .config = config,
            .callback = callback,
        };
    }

    pub fn start(self: Self) !void {
        try libuv_execute(self.allocator, self.config, self.callback);
    }
};

O servidor recebe um ponteiro para um alocador, já que a referência ao alocador deve se manter através de várias threads, uma struct de config onde a porta de funcionamento pode ser definida e um callback. O tipo callback_t é definido como:

pub const callback_t = * const fn (mem.Allocator, *Request, *Response) anyerror!void;

Para usarmos tudo isso junto, vamos mudar a nossa função main para ficar da seguinte forma:

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    var allocator = gpa.allocator();
    defer _ = gpa.deinit();

    const server = Server.init(&allocator, Config{}, on_response);
    try server.start();
}

fn on_response(_: mem.Allocator, _: *Request, _: *Response) !void {
}

Agora, como vamos chamar o callback dentro do nosso loop de eventos e como vamos passar os objetos de requisição e de resposta para o callback?

O loop de eventos

O loop de eventos roda em uma thread específica e tem a função de verificar constantemente se operações em outras threads foram concluídas. Por isso, é importante minimizar o trabalho na thread principal – se ela estiver ocupada com processamento ou operações de I/O, não poderá verificar o status das outras threads. Por essa razão, o callback deve ser executado em uma tarefa separada, fora do loop de eventos.

Primeiro, vamos definir duas structs: InitialContext, que vai passar informações do libuv_execute para cada conexão, e Context, que vai armazenar o contexto de cada conexão específica. Ambos esses contextos serão vinculados ao campo data do libuv.

const Context = struct {
    req: uv.uv_work_t,
    stream: [*c]uv.uv_stream_t,
    callback: callback_t,
    arena: *std.heap.ArenaAllocator,
    request: *Request,
    response: *Response,
};

const InitialContext = struct {
    allocator: *mem.Allocator,
    callback: callback_t,
};

// outras funções

pub fn libuv_execute(allocator: *mem.Allocator, config: Config, callback: callback_t) !void {
    const ctx: *InitialContext = try allocator.*.create(InitialContext);
    ctx.*.allocator = allocator;
    ctx.*.callback = callback;

    print("Started server\n", .{});

    const loop = uv.uv_default_loop();

    var server: uv.uv_tcp_t = undefined;
    _ = uv.uv_tcp_init(loop, &server);
    server.data = @constCast(ctx);

    var addr: uv.struct_sockaddr_in = undefined;
    _ = uv.uv_ip4_addr("0.0.0.0", @intCast(config.port), &addr);
    _ = uv.uv_tcp_bind(&server, @ptrCast(&addr), 0);

    _ = uv.uv_listen(@ptrCast(&server), 128, on_connection);

    print("Listening on http://0.0.0.0:{d}\n", .{config.port});

    _ = uv.uv_run(loop, uv.UV_RUN_DEFAULT);
}

A função on_connection agora deve obter o InitialContext e depois preencher o seu próprio Context e passar para frente:

fn on_connection(server: [*c]uv.uv_stream_t, status: c_int) callconv(.c) void {
    const initial_ctx: *InitialContext = @ptrCast(@alignCast(server.*.data));
    const main_allocator = initial_ctx.*.allocator;

    const ctx = main_allocator.*.create(Context) catch {
        print("Failed to create context\n", .{});
        return;
    };

    ctx.*.callback = initial_ctx.*.callback;

    const arena = main_allocator.*.create(std.heap.ArenaAllocator) catch {
        print("Failed to create arena allocator\n", .{});
        return;
    };
    arena.* = std.heap.ArenaAllocator.init(main_allocator.*);
    const allocator = arena.allocator();

    if (status < 0) {
        print("New connection error {s}\n", .{uv.uv_strerror(status)});
        arena.deinit();
        main_allocator.*.destroy(arena);
        return;
    }

    const client: [*c]uv.uv_tcp_t = allocator.create(uv.uv_tcp_t) catch {
        print("Failed to allocate client\n", .{});
        arena.deinit();
        main_allocator.*.destroy(arena);
        return;
    };

    _ = uv.uv_tcp_init(server.*.loop, client);
    ctx.*.arena = arena;
    client.*.data = ctx;

    if (uv.uv_accept(server, @ptrCast(client)) == 0) {
        _ = uv.uv_read_start(@ptrCast(client), alloc_buffer, on_read);
    } else {
        arena.deinit();
        main_allocator.destroy(arena);
    }
}

A função de callback é só transferida de InitalContext para Context, enquanto o alocador é usado para alocar uma arena, que será usada daqui para frente para acessar o heap.

O método on_read agora tem funções diferentes: processar a requisição, criar instâncias de Request e de Response, colocá-las no contexto e designar um novo trabalho asíncrono a ser realizado:

fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
    const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
    const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
    const arena_ptr: *std.heap.ArenaAllocator = ctx.*.arena;
    const allocator = arena_ptr.*.allocator();

    if (nread < 0) {
        if (nread != uv.UV_EOF) {
            print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
        }
        _ = uv.uv_close(@ptrCast(@alignCast(client)), null);
        return;
    }

    if (nread > 0) {
        const request_raw = buf.*.base[0..@intCast(nread)];
        const request = allocator.create(Request) catch {
            print("Failed to allocate request\n", .{});
            return;
        };
        const response = allocator.create(Response) catch {
            print("Failed to allocate response\n", .{});
            return;
        };

        request.* = request_lib.parseRequest(allocator, request_raw) catch {
            print("Failed to parse request\n", .{});
            return;
        };
        response.* = Response.init(allocator);

        ctx.*.request = request;
        ctx.*.response = response;
        ctx.*.req.data = @ptrCast(@alignCast(ctx));
        ctx.*.stream = stream;

        _ = uv.uv_queue_work(stream.*.loop, @ptrCast(@alignCast(&ctx.*.req)), handle_action, write);
    }
}

A função uv_queue_work recebe como argumento o loop de eventos, um ponteiro para um stream da libuv e dois callbacks, a serem executados um depois do outro. handle_action é definido como:

fn handle_action(req: [*c]uv.uv_work_t) callconv(.c) void {
    const ctx: *Context = @ptrCast(@alignCast(req.*.data));
    const arena_ptr = ctx.*.arena;
    const allocator = arena_ptr.*.allocator();
    ctx.*.callback(allocator, ctx.*.request, ctx.*.response) catch {
        return;
    };
}

Nós simplesmente chamamos o callback com as informações relevantes.

write, por sua vez, cria uma sequência de eventos que retorna uma Response para o cliente e limpa a memória alocada:

fn write(req: [*c]uv.uv_work_t, status: c_int) callconv(.c) void {
    if (status < 0) {
        print("Operation cancelled\n", .{});
        return;
    }
    const ctx: *Context = @ptrCast(@alignCast(req.*.data));
    const arena_ptr = ctx.*.arena;
    const allocator = arena_ptr.*.allocator();

    const response = ctx.*.response.to_string(allocator) catch {
        print("Failed to write response\n", .{});
        return;
    };

    const write_req = allocator.create(uv.uv_write_t) catch {
        print("Failed to allocate write_req\n", .{});
        return;
    };

    var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response)), @intCast(response.len));
    _ = uv.uv_write(
        write_req,
        ctx.*.stream,
        &buf_resp,
        1,
        on_write_complete,
    );
}

fn on_write_complete(req: [*c]uv.uv_write_t, status: c_int) callconv(.C) void {
    if (status < 0) {
        print("Write error: {s}\n", .{uv.uv_strerror(status)});
    }
    _ = uv.uv_close(@ptrCast(req.*.handle), on_close);
}

fn on_close(handle: [*c]uv.uv_handle_t) callconv(.C) void {
    const ctx: *Context = @ptrCast(@alignCast(handle.*.data));
    const arena_ptr: *std.heap.ArenaAllocator = ctx.*.arena;
    _ = arena_ptr.*.deinit();
}

O código completo fica assim:

const std = @import("std");
const mem = std.mem;
const print = std.debug.print;
const request_lib = @import("request.zig");
const uv = @import("imports/libuv.zig");
const Config = @import("config.zig").Config;
const Request = @import("request.zig").Request;
const Response = @import("response.zig").Response;
const callback_t = @import("constants.zig").callback_t;

const Context = struct {
    req: uv.uv_work_t,
    stream: [*c]uv.uv_stream_t,
    callback: callback_t,
    arena: *std.heap.ArenaAllocator,
    request: *Request,
    response: *Response,
};

const InitialContext = struct {
    allocator: *mem.Allocator,
    callback: callback_t,
};

fn on_connection(server: [*c]uv.uv_stream_t, status: c_int) callconv(.c) void {
    const initial_ctx: *InitialContext = @ptrCast(@alignCast(server.*.data));
    const main_allocator = initial_ctx.*.allocator;

    const ctx = main_allocator.*.create(Context) catch {
        print("Failed to create context\n", .{});
        return;
    };

    ctx.*.callback = initial_ctx.*.callback;

    const arena = main_allocator.*.create(std.heap.ArenaAllocator) catch {
        print("Failed to create arena allocator\n", .{});
        return;
    };
    arena.* = std.heap.ArenaAllocator.init(main_allocator.*);
    const allocator = arena.allocator();

    if (status < 0) {
        print("New connection error {s}\n", .{uv.uv_strerror(status)});
        arena.deinit();
        main_allocator.*.destroy(arena);
        return;
    }

    const client: [*c]uv.uv_tcp_t = allocator.create(uv.uv_tcp_t) catch {
        print("Failed to allocate client\n", .{});
        arena.deinit();
        main_allocator.*.destroy(arena);
        return;
    };

    _ = uv.uv_tcp_init(server.*.loop, client);
    ctx.*.arena = arena;
    client.*.data = ctx;

    if (uv.uv_accept(server, @ptrCast(client)) == 0) {
        _ = uv.uv_read_start(@ptrCast(client), alloc_buffer, on_read);
    } else {
        arena.deinit();
        main_allocator.destroy(arena);
    }
}

fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
    const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
    const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
    const arena_ptr: *std.heap.ArenaAllocator = ctx.*.arena;
    const allocator = arena_ptr.*.allocator();

    if (nread < 0) {
        if (nread != uv.UV_EOF) {
            print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
        }
        _ = uv.uv_close(@ptrCast(@alignCast(client)), null);
        return;
    }

    if (nread > 0) {
        const request_raw = buf.*.base[0..@intCast(nread)];
        const request = allocator.create(Request) catch {
            print("Failed to allocate request\n", .{});
            return;
        };
        const response = allocator.create(Response) catch {
            print("Failed to allocate response\n", .{});
            return;
        };

        request.* = request_lib.parseRequest(allocator, request_raw) catch {
            print("Failed to parse request\n", .{});
            return;
        };
        response.* = Response.init(allocator);

        ctx.*.request = request;
        ctx.*.response = response;
        ctx.*.req.data = @ptrCast(@alignCast(ctx));
        ctx.*.stream = stream;

        _ = uv.uv_queue_work(stream.*.loop, @ptrCast(@alignCast(&ctx.*.req)), handle_action, write);
    }
}

fn handle_action(req: [*c]uv.uv_work_t) callconv(.c) void {
    const ctx: *Context = @ptrCast(@alignCast(req.*.data));
    const arena_ptr = ctx.*.arena;
    const allocator = arena_ptr.*.allocator();
    ctx.*.callback(allocator, ctx.*.request, ctx.*.response) catch {
        return;
    };
}

fn write(req: [*c]uv.uv_work_t, status: c_int) callconv(.c) void {
    if (status < 0) {
        print("Operation cancelled\n", .{});
        return;
    }
    const ctx: *Context = @ptrCast(@alignCast(req.*.data));
    const arena_ptr = ctx.*.arena;
    const allocator = arena_ptr.*.allocator();

    const response = ctx.*.response.to_string(allocator) catch {
        print("Failed to write response\n", .{});
        return;
    };

    const write_req = allocator.create(uv.uv_write_t) catch {
        print("Failed to allocate write_req\n", .{});
        return;
    };

    var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response)), @intCast(response.len));
    _ = uv.uv_write(
        write_req,
        ctx.*.stream,
        &buf_resp,
        1,
        on_write_complete,
    );
}

fn on_write_complete(req: [*c]uv.uv_write_t, status: c_int) callconv(.C) void {
    if (status < 0) {
        print("Write error: {s}\n", .{uv.uv_strerror(status)});
    }
    _ = uv.uv_close(@ptrCast(req.*.handle), on_close);
}

fn on_close(handle: [*c]uv.uv_handle_t) callconv(.C) void {
    const ctx: *Context = @ptrCast(@alignCast(handle.*.data));
    const arena_ptr: *std.heap.ArenaAllocator = ctx.*.arena;
    _ = arena_ptr.*.deinit();
}

fn alloc_buffer(handle: [*c]uv.uv_handle_t, suggested_size: usize, buf: [*c]uv.uv_buf_t) callconv(.c) void {
    const ctx: *Context = @ptrCast(@alignCast(handle.*.data));
    const arena_ptr: *std.heap.ArenaAllocator = ctx.*.arena;
    const allocator = arena_ptr.*.allocator();
    const actual_size = suggested_size + @sizeOf(usize);

    const memory = allocator.alloc(u8, actual_size) catch {
        buf.* = uv.uv_buf_init(null, 0);
        return;
    };

    @as(*usize, @ptrCast(@alignCast(memory.ptr))).* = actual_size;

    buf.* = uv.uv_buf_init(
        @ptrCast(@alignCast(memory.ptr + @sizeOf(usize))),
        @intCast(suggested_size),
    );
}

pub fn libuv_execute(allocator: *mem.Allocator, config: Config, callback: callback_t) !void {
    const ctx: *InitialContext = try allocator.*.create(InitialContext);
    ctx.*.allocator = allocator;
    ctx.*.callback = callback;

    print("Started server\n", .{});

    const loop = uv.uv_default_loop();

    var server: uv.uv_tcp_t = undefined;
    _ = uv.uv_tcp_init(loop, &server);
    server.data = @constCast(ctx);

    var addr: uv.struct_sockaddr_in = undefined;
    _ = uv.uv_ip4_addr("0.0.0.0", @intCast(config.port), &addr);
    _ = uv.uv_tcp_bind(&server, @ptrCast(&addr), 0);

    _ = uv.uv_listen(@ptrCast(&server), 128, on_connection);

    print("Listening on http://0.0.0.0:{d}\n", .{config.port});

    _ = uv.uv_run(loop, uv.UV_RUN_DEFAULT);
}

Com isso, nosso servidor fica tanto mais rápido quanto mais organizado. ;)