Servidor HTTP - Parte 5

Indo além

Enzo Soares

Publicado em 15 de Abril de 2025 às 14:48

Olá, colega programador!

Na semana passada, chegamos em um ponto em que nosso servidor está com uma performance aceitável: já passamos de um tempo de resposta médio no nosso cenário de testes de 119 ms para 5 ms, mas podemos chegar mais longe. Mas onde melhorar? O que melhorar? Para responder isso, vamos usar uma ferramenta chamada dtrace, que vai criar um perfil de uso da nossa aplicação: quais as chamadas mais caras e onde estão os gargalos.

Um arquivo gerado pelo dtrace se parece com isso:

  ntdll`NtDeviceIoControlFile+0x14
  mswsock`WSPSend+0x204
  WS2_32`WSASend+0x145
  azig`uv__tcp_write+0x982
  azig`uv_write+0x1cc
  azig`on_read+0xa6a
  azig`uv__process_tcp_read_req+0x1e85
  azig`uv__process_reqs+0x5de
  azig`uv_run+0x424
  azig`libuv_execute+0x4cc
  azig`start+0x24
  azig`main+0xcc
  azig`main+0x24a
  azig`__tmainCRTStartup+0x1cc
  azig`mainCRTStartup+0x1c
  KERNEL32`BaseThreadInitThunk+0x17
  ntdll`RtlUserThreadStart+0x2c
 1215

Onde, nesse exemplo, 1215 é a quantidade de vezes que dtrace achou a seguinte chamada sendo executada na stack. Quanto maior esse número, mais tempo essa função passou sendo executada.

Uma análise do arquivo dtrace gerado pelo código de semana passada nos dá alguns gargalos:

  • Operações de I/O
  • Abertura e fechamento de sockets
  • Alocação e liberação de memória

O tema geral dessa postagem de hoje é que chamadas de sistema são extremamente lentas e devemos evitá-las quando possível.

Operações de I/O

A velocidade de entrada e saída de dados sempre é uma das partes mais lentas de qualquer sistema, então não há muito o que fazer aqui além de ativar o algoritmo de Nagle, que tenta agrupar várias chamadas de I/O em uma só para reduzir o número de chamadas de sistema

 _ = uv.uv_tcp_nodelay(&server, @intCast(0));

Essa linha deve ser adicionada na função libuv_execute

Abertura e fechamento de sockets

Um problema da implementação atual do servidor é que utilizamos uma socket tcp para cada conexão. Isso é bem ineficiente, já que para cada conexão teremos duas chamadas de sistema diferentes, uma para abrir a socket e outra para fechar. O fluxo de reutilizar sockets é bem conhecido e é gerido pelo cabeçalho Connection.

Keep-Alive

A presença do valor Keep-Alive no cabeçalho Connection ou a não presença do cabeçalho indica que a conexão não deve ser encerrada pelo servidor quando uma resposta for enviada, devendo se manter aberta a solicitações do mesmo cliente. Para implementar esse fluxo, devemos chamar a função uvreadstart ao invés de uvclose na função onwrite_complete. Precisamos também definir uma condicional que vai determinar se a conexão vai ou não ser mantida:

        /// resto da função on_read
        const connection_header = request.*.getHeader(@constCast("Connection"));
        var is_keep_alive: bool = true;

        if (connection_header) |header| {
            if (!std.ascii.eqlIgnoreCase(header, "keep-alive")) {
                is_keep_alive = false;
            }
        }

        var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response_text.string)), @intCast(response_text.string.len));
        _ = uv.uv_write(
            write_req,
            stream,
            &buf_resp,
            1,
            if (is_keep_alive) on_write_complete_keep_alive else on_write_complete_no_keep_alive,
        );
    }
}

fn on_write_common(req: [*c]uv.uv_write_t, status: c_int) void {
    if (status < 0) {
        print("Write error: {s}\n", .{uv.uv_strerror(status)});
    }
}

fn on_write_complete_keep_alive(req: [*c]uv.uv_write_t, status: c_int) callconv(.C) void {
    on_write_common(req, status);
    _ = uv.uv_read_start(req.*.handle, alloc_buffer, on_read);
}

fn on_write_complete_no_keep_alive(req: [*c]uv.uv_write_t, status: c_int) callconv(.C) void {
    on_write_common(req, status);
    _ = uv.uv_close(@ptrCast(req.*.handle), on_close);
}

Se você executar esse código do jeito que está e tentar usar algum cliente http para fazer uma requisição, vai ver que a conexão não se encerra. Isso se dá porque, sem o encerramento explícito da socket, o cliente não entende que a resposta chegou completa. Já que não podemos encerrar a socket agora, vamos utilizar do cabeçalho Content-Length, que tem como valor um inteiro que indica o tamanho do corpo da resposta. Após ler essa quantidade especificada de caracteres, o cliente se dá por satisfeito. Vamos então alterar o método to_string da struct Response e adicionar as linhas:

const len = if (self.body) |b| b.len else 0;
try writer.print("Content-Length: {d}\r\n", .{len});

Agora, nosso código funciona!

Alocação e liberação de memória

Atualmente, para cada requisição http recebida, as seguintes alocações são feitas:

  • Um contexto
  • Uma arena
  • Um cliente uv.uvtcpt
  • Uma requisição + o hash map de cabeçalhos
  • Uma resposta + o hash map de cabeçalhos
  • Um buffer para receber a requisição
  • Um buffer para receber a resposta uvwritet
  • Um buffer com a resposta

São vários alloc/free numa mesma requisição. Para resolver isso, vamos começar a usar pools de memória.

Pools de memória

A ideia é mantermos uma reserva de algumas structs de cada tipo acima. Quando chamarmos destroy, apenas voltamos o objeto para a reserva. Quando chamarmos create, primeiro tentamos obter uma struct da reserva. Se não for possível, aí sim alocamos memória no heap. Isso é uma pool de memória;

Zig nos proporciona uma implementação simples de uma pool de memória, através de std.heap.MemoryPool. Fiz algumas alterações nesse tipo que estão fora do escopo desse post para podermos usá-lo com threads.

pub const buffer_pool_t = ThreadSafeMemoryPool([suggested_buffer_size]u8, .{
    .growable = true,
});

pub const request_pool_t = ThreadSafeMemoryPool(Request, .{
    .growable = true,
});

pub const response_pool_t = ThreadSafeMemoryPool(Response, .{
    .growable = true,
});

pub const client_pool_t = ThreadSafeMemoryPool(uv.uv_tcp_t, .{
    .growable = true,
});

pub const context_pool_t = ThreadSafeMemoryPool(Context, .{
    .growable = true,
});

pub const write_req_pool_t = ThreadSafeMemoryPool(uv.uv_write_t, .{
    .growable = true,
});

Agora, ao iniciarmos nosso servidor, devemos iniciar também as pools:

fn init_pools(allocator: mem.Allocator, ctx: *Context) !*const fn (allocator: mem.Allocator, ctx: *Context) void {
    const initial_size: usize = 1024;

    ctx.*.buffer_allocator = try allocator.create(buffer_pool_t);
    ctx.*.buffer_allocator.* = try buffer_pool_t.initPreheated(allocator, initial_size);

    ctx.*.request_allocator = try allocator.create(request_pool_t);
    ctx.*.request_allocator.* = try request_pool_t.initPreheated(allocator, initial_size);

    ctx.*.response_allocator = try allocator.create(response_pool_t);
    ctx.*.response_allocator.* = try response_pool_t.initPreheated(allocator, initial_size);

    ctx.*.client_allocator = try allocator.create(client_pool_t);
    ctx.*.client_allocator.* = try client_pool_t.initPreheated(allocator, initial_size);

    ctx.*.write_req_allocator = try allocator.create(write_req_pool_t);
    ctx.*.write_req_allocator.* = try write_req_pool_t.initPreheated(allocator, initial_size);

    return struct {
        fn clean(allocator_inner: mem.Allocator, ctx_inner: *Context) void {
            ctx_inner.*.buffer_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.buffer_allocator);

            ctx_inner.*.request_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.request_allocator);

            ctx_inner.*.response_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.response_allocator);

            ctx_inner.*.client_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.client_allocator);

            ctx_inner.*.write_req_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.write_req_allocator);
        }
    }.clean;
}

pub fn libuv_execute(allocator: *mem.Allocator, config: Config, callback: callback_t) !void {
    const ctx: *Context = try allocator.*.create(Context);
    ctx.*.allocator = allocator;
    ctx.*.callback = callback;

    const cleanup = try init_pools(allocator.*, ctx);
    defer cleanup(allocator.*, ctx);
    /// resto da implemtentação de libuv_execute
}

A alocação de memória agora não é mais gerida por uma arena e sim por pools. Como não utilizamos mais arenas, não há mais contexto específico para cada conexão. Então, agora, o contexto é criado apenas uma vez ao iniciar o servidor. Com isso, unificamos InitialContext e Context nesse último. Agora, mudamos todas as chamadas a create e destroy de alocador.create(tipo) para pool_tipo.create()

const std = @import("std");
const mem = std.mem;
const print = std.debug.print;
const request_lib = @import("request.zig");
const uv = @import("imports/libuv.zig");
const Config = @import("config.zig").Config;
const Request = @import("request.zig").Request;
const Response = @import("response.zig").Response;
const suggested_buffer_size = @import("constants.zig").suggested_buffer_size;
const actual_buffer_size = @import("constants.zig").actual_buffer_size;
const Context = @import("context.zig").Context;
const callback_t = @import("constants.zig").callback_t;
const buffer_pool_t = @import("constants.zig").buffer_pool_t;
const request_pool_t = @import("constants.zig").request_pool_t;
const response_pool_t = @import("constants.zig").response_pool_t;
const client_pool_t = @import("constants.zig").client_pool_t;
const context_pool_t = @import("constants.zig").context_pool_t;
const write_req_pool_t = @import("constants.zig").write_req_pool_t;

fn on_connection(server: [*c]uv.uv_stream_t, status: c_int) callconv(.c) void {
    const ctx: *Context = @ptrCast(@alignCast(server.*.data));
    const client_pool = ctx.client_allocator;

    if (status < 0) {
        print("New connection error {s}\n", .{uv.uv_strerror(status)});
        return;
    }

    const client: [*c]uv.uv_tcp_t = client_pool.create() catch {
        print("Failed to allocate client\n", .{});
        return;
    };

    _ = uv.uv_tcp_init(server.*.loop, client);
    client.*.data = ctx;

    if (uv.uv_accept(server, @ptrCast(client)) == 0) {
        _ = uv.uv_read_start(@ptrCast(client), alloc_buffer, on_read);
    }
}

fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
    const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
    const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
    const main_allocator = ctx.*.allocator;
    var buffer_pool = ctx.*.buffer_allocator;
    var request_pool = ctx.*.request_allocator;
    var response_pool = ctx.*.response_allocator;
    var write_req_pool = ctx.*.write_req_allocator;

    defer {
        if (buf.*.base != null) {
            buffer_pool.destroy(@ptrCast(@alignCast(buf.*.base)));
        }
    }

    if (nread < 0) {
        if (nread != uv.UV_EOF) {
            print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
        }
        _ = uv.uv_close(@ptrCast(@alignCast(client)), on_close);
    }

    if (nread > 0) {
        const allocator = main_allocator.*;
        const request_raw = buf.*.base[0..@intCast(nread)];

        const request = request_pool.create() catch {
            print("Failed to allocate request\n", .{});
            return;
        };
        if (!request.*.is_init) {
            request.* = Request.init(allocator) catch {
                print("Failed to init request\n", .{});
                return;
            };
        }
        request.*.parseRequest(request_raw) catch {
            print("Failed to parse request\n", .{});
            return;
        };
        defer {
            request.*.deinit();
            request_pool.destroy(request);
        }

        const response = response_pool.create() catch {
            print("Failed to allocate response\n", .{});
            return;
        };
        response.* = Response.init(allocator) catch {
            print("Failed to create response\n", .{});
            return;
        };
        defer {
            response.*.deinit();
            response_pool.destroy(response);
        }

        ctx.*.callback(allocator, request, response) catch {
            return;
        };

        const response_text = response.to_string(allocator) catch {
            print("Failed to write response\n", .{});
            return;
        };
        defer allocator.free(response_text);

        const write_req = write_req_pool.create() catch {
            print("Failed to allocate write_req\n", .{});
            return;
        };

        const connection_header = request.*.getHeader(@constCast("Connection"));
        var is_keep_alive: bool = true;

        if (connection_header) |header| {
            if (!std.ascii.eqlIgnoreCase(header, "keep-alive")) {
                is_keep_alive = false;
            }
        }

        var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response_text)), @intCast(response_text.len));
        _ = uv.uv_write(
            write_req,
            stream,
            &buf_resp,
            1,
            if (is_keep_alive) on_write_complete_keep_alive else on_write_complete_no_keep_alive,
        );
    }
}

fn on_write_common(req: [*c]uv.uv_write_t, status: c_int) void {
    const ctx: *Context = @ptrCast(@alignCast(req.*.handle.*.data));
    var write_req_pool = ctx.*.write_req_allocator;
    defer write_req_pool.destroy(req);

    if (status < 0) {
        print("Write error: {s}\n", .{uv.uv_strerror(status)});
    }
}

fn on_write_complete_keep_alive(req: [*c]uv.uv_write_t, status: c_int) callconv(.C) void {
    on_write_common(req, status);
    _ = uv.uv_read_start(req.*.handle, alloc_buffer, on_read);
}

fn on_write_complete_no_keep_alive(req: [*c]uv.uv_write_t, status: c_int) callconv(.C) void {
    on_write_common(req, status);
    _ = uv.uv_close(@ptrCast(req.*.handle), on_close);
}

fn on_close(handle: [*c]uv.uv_handle_t) callconv(.C) void {
    const ctx: *Context = @ptrCast(@alignCast(handle.*.data));

    const client_pool = ctx.client_allocator;
    const client: *uv.uv_tcp_t = @alignCast(@ptrCast(handle));
    client_pool.*.destroy(client);
}

fn alloc_buffer(handle: [*c]uv.uv_handle_t, _: usize, buf: [*c]uv.uv_buf_t) callconv(.c) void {
    const ctx: *Context = @ptrCast(@alignCast(handle.*.data));
    var buffer_pool = ctx.*.buffer_allocator;

    const memory: *[suggested_buffer_size]u8 = buffer_pool.create() catch {
        buf.* = uv.uv_buf_init(null, 0);
        return;
    };

    buf.* = uv.uv_buf_init(memory.ptr, @intCast(suggested_buffer_size));
}

fn init_pools(allocator: mem.Allocator, ctx: *Context) !*const fn (allocator: mem.Allocator, ctx: *Context) void {
    const initial_size: usize = 1024;

    ctx.*.buffer_allocator = try allocator.create(buffer_pool_t);
    ctx.*.buffer_allocator.* = try buffer_pool_t.initPreheated(allocator, initial_size);

    ctx.*.request_allocator = try allocator.create(request_pool_t);
    ctx.*.request_allocator.* = try request_pool_t.initPreheated(allocator, initial_size);

    ctx.*.response_allocator = try allocator.create(response_pool_t);
    ctx.*.response_allocator.* = try response_pool_t.initPreheated(allocator, initial_size);

    ctx.*.client_allocator = try allocator.create(client_pool_t);
    ctx.*.client_allocator.* = try client_pool_t.initPreheated(allocator, initial_size);

    ctx.*.write_req_allocator = try allocator.create(write_req_pool_t);
    ctx.*.write_req_allocator.* = try write_req_pool_t.initPreheated(allocator, initial_size);

    return struct {
        fn clean(allocator_inner: mem.Allocator, ctx_inner: *Context) void {
            ctx_inner.*.buffer_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.buffer_allocator);

            ctx_inner.*.request_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.request_allocator);

            ctx_inner.*.response_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.response_allocator);

            ctx_inner.*.client_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.client_allocator);

            ctx_inner.*.write_req_allocator.deinit();
            allocator_inner.destroy(ctx_inner.*.write_req_allocator);
        }
    }.clean;
}

pub fn libuv_execute(allocator: *mem.Allocator, config: Config, callback: callback_t) !void {
    const ctx: *Context = try allocator.*.create(Context);
    ctx.*.allocator = allocator;
    ctx.*.callback = callback;

    const cleanup = try init_pools(allocator.*, ctx);
    defer cleanup(allocator.*, ctx);

    print("Started server\n", .{});

    const loop = uv.uv_default_loop();

    var server: uv.uv_tcp_t = undefined;
    _ = uv.uv_tcp_init(loop, &server);
    server.data = @constCast(ctx);
    _ = uv.uv_tcp_nodelay(&server, @intCast(0));

    var addr: uv.struct_sockaddr_in = undefined;
    _ = uv.uv_ip4_addr("0.0.0.0", @intCast(config.port), &addr);
    _ = uv.uv_tcp_bind(&server, @ptrCast(&addr), 0);

    _ = uv.uv_listen(@ptrCast(&server), 128, on_connection);

    print("Listening on http://0.0.0.0:{d}\n", .{config.port});

    _ = uv.uv_run(loop, uv.UV_RUN_DEFAULT);
}

Com isso, reduzimos drasticamente as alocações do heap. No entanto, ainda existem algumas: toda vez que paramos de usar uma requisição ou resposta, chamamos o método deinit, que libera os cabeçalhos. Para mudarmos isso, vamos criar um outro método chamado reset, que limpa as structs, mas não libera a memória.

/// Request:
pub const Request = struct {
    method: []const u8,
    path: []const u8,
    version: []const u8,
    headers: StringHashMap([]const u8),
    body: ?[]const u8,
    is_init: bool = false,

    const Self = @This();

    pub fn reset(self: *Self) void {
        self.*.body = null;
        self.*.method = undefined;
        self.*.path = undefined;
        self.*.version = undefined;
        self.*.headers.clearRetainingCapacity();
        self.*.is_init = true;
    }
    /// resto da implementação
}

/// Response
pub const Response = struct {
    status_code: u16,
    headers: StringHashMap([]const u8),
    body: ?[]const u8,
    is_init: bool = false,

    const Self = @This();

    pub fn reset(self: *Self) void {
        self.*.body = null;
        self.*.status_code = 200;
        self.*.headers.clearRetainingCapacity();
        self.*.is_init = true;
    }

    /// resto da implementação
}

A bandeira is_init apenas reflete se a struct for criada agora ou se está sendo reutilizada. Agora, vamos mudar a função on_read:

    const request = request_pool.create() catch {
        print("Failed to allocate request\n", .{});
        return;
    };
    if (!request.*.is_init) {
        request.* = Request.init(allocator) catch {
            print("Failed to init request\n", .{});
            return;
        };
    }
    request.*.parseRequest(request_raw) catch {
        print("Failed to parse request\n", .{});
        return;
    };
    defer {
        request.*.reset();
        request_pool.destroy(request);
    }

    const response = response_pool.create() catch {
        print("Failed to allocate response\n", .{});
        return;
    };
    if (!response.*.is_init) {
        response.* = Response.init(allocator) catch {
            print("Failed to create response\n", .{});
            return;
        };
    }
    defer {
        response.*.reset();
        response_pool.destroy(response);
    }

    ctx.*.callback(allocator, request, response) catch {
        return;
    };

    const response_text = response.to_string(allocator) catch {
        print("Failed to write response\n", .{});
        return;
    };
    defer allocator.free(response_text.string);

Muito melhor! Por último, temos uma última função lenta em nosso fluxo: response.to_string(). A melhor implementação desse método que eu implementei é a seguinte:

  1. Calcular o tamanho do texto
  2. Caso seja maior que uma constante, colocar o texto no heap
  3. Se não, colocar em um buffer estático

Ficou assim:

pub const ResponseString = struct {
    string: []const u8,
    in_heap: bool,
};

pub fn to_string(self: *Self, allocator: std.mem.Allocator) !ResponseString {
    const size = try self.calculate_serialized_size();

    if (size < 4096) {
        var buffer: [4096]u8 = undefined;
        return .{
            .string = try self.write_to_buffer(&buffer),
            .in_heap = false
        };
    } else {
        const buffer = try allocator.alloc(u8, size);
        errdefer allocator.free(buffer);

        var stream = std.io.fixedBufferStream(buffer);
        try self.write_to_stream(stream.writer());

        return .{
            .string = buffer,
            .in_heap = true
        };
    }
}

fn calculate_serialized_size(self: *Self) !usize {
    var size: usize = 0;

    const status_text = statusCodeToString(self.status_code) orelse "Unknown";
    size += "HTTP/1.1 ".len;
    size += std.fmt.count("{d}", .{self.status_code});
    size += " ".len;
    size += status_text.len;
    size += "\r\n".len;

    var headers_iter = self.headers.iterator();
    while (headers_iter.next()) |entry| {
        size += entry.key_ptr.len;
        size += ": ".len;
        size += entry.value_ptr.len;
        size += "\r\n".len;
    }

    const content_length = "Content-Length";
    const len = if (self.body) |b| b.len else 0;
    size += content_length.len;
    size += ": ".len;
    size += std.fmt.count("{d}", .{len});
    size += "\r\n".len;

    size += "\r\n".len;

    if (self.body) |b| {
        size += b.len;
    }

    return size;
}

fn write_to_stream(self: *Self, writer: anytype) !void {
    try writer.print("HTTP/1.1 {d} {s}\r\n", .{
        self.status_code,
        statusCodeToString(self.status_code) orelse "Unknown",
    });

    var headers_iter = self.headers.iterator();
    while (headers_iter.next()) |entry| {
        try writer.print("{s}: {s}\r\n", .{entry.key_ptr.*, entry.value_ptr.*});
    }

    const len = if (self.body) |b| b.len else 0;
    try writer.print("Content-Length: {d}\r\n", .{len});

    try writer.writeAll("\r\n");

    if (self.body) |b| {
        try writer.writeAll(b);
    }
}

fn write_to_buffer(self: *Self, buffer: []u8) ![]const u8 {
    var stream = std.io.fixedBufferStream(buffer);
    const writer = stream.writer();

    try write_to_stream(self, writer);

    return buffer[0..stream.pos];
}

fn statusCodeToString(code: u16) ?[]const u8 {
    return switch (code) {
        200 => "OK",
        201 => "Created",
        204 => "No Content",
        400 => "Bad Request",
        401 => "Unauthorized",
        403 => "Forbidden",
        404 => "Not Found",
        500 => "Internal Server Error",
        else => null,
    };
}

Essa implementação tem um detalhe: caso o retorno da função to_string() esteja no heap, quem chamou a função deve liberar a memória. No entanto, isso não deve acontecer caso o retorno esteja na stack. Por isso, retornamos da função não só o buffer, mas também uma bandeira que indica onde estão localizados os bytes. Agora só precisamos adicionar a condicional à função on_read:

const response_text = response.to_string(allocator) catch {
    print("Failed to write response\n", .{});
    return;
};
if (response_text.in_heap) {
    defer allocator.free(response_text.string);
}

Com tudo isso, chegamos à um tempo médio de resposta de 0.7 ms!!!!!

Obrigado!