Servidor HTTP - Parte 3

Mudando a arquitetura

Enzo Soares

Publicado em 2 de Abril de 2025 às 23:57

Na última entrada deste blog, focamos na análise da solicitação de entrada. Esta é a funcionalidade básica necessária de um servidor web. Agora, vamos explorar diferentes implementações para fazer seu servidor ser capaz de lidar com mais solicitações. Apertem os cintos, esta vai ser uma grande.

Primeiro, vamos dividir seu código em arquivos menores. serve manipula as chamadas http e parse_data analisa a solicitação:

const std = @import("std");
const parse_data = @import("parse_data.zig").parse_data;
const serve = @import("server.zig").serve;
const print = std.debug.print;
const mem = std.mem;
const net = std.net;
const StringHashMap = std.StringHashMap;

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    defer _ = gpa.deinit();

    try run_naive(allocator);
}

fn run_naive(allocator: mem.Allocator) !void {
    var server = try serve();

    while (server.accept()) |connection| {
        print("Accepted connection from: {}\n\n", .{connection.address});
        try parse_data(allocator, connection);
    } else |err| {
        return err;
    }
}

Se você estiver curioso sobre como exatamente eu refatorei o código, você pode conferir no repo.

Agora, temos que medir o quão rápida é sua implementação atual se quisermos tentar melhorar. Vamos usar a incrível ferramenta k6 para fazer isso. Simplificando, vamos testar o estresse do nosso servidor para ver quantas solicitações simultâneas podemos gerenciar. Este é o seu script k6:

import http from 'k6/http';
import { sleep, check } from 'k6';

export const options = {
  vus: 1000,
  duration: '30s',
};

export default function() {
  let res = http.get('http://0.0.0.0:2500');
  check(res, { "status is 200": (res) => res.status === 200 });
  sleep(1);
}

Este código simplesmente afirma que haverá 2000 usuários simultâneos acessando nosso servidor por 30 segundos. Agora, se executarmos este teste em nosso servidor, obtemos que o tempo médio para processar uma solicitação e enviar uma resposta é 119 ms, o que é bastante. Mas, pior ainda, é a % de solicitações que falham: 49% de todas as solicitações recebidas falham. Vamos melhorar isso.

Usando threads

O código que escrevemos roda, por padrão, em uma única thread do processador. Se quisermos aumentar o desempenho do nosso servidor, precisamos usar todos os núcleos disponíveis. Para isso, usaremos um thread pool, uma ferramenta que simplifica a criação e destruição de threads:

fn run_threaded(allocator: mem.Allocator) !void {
    var pool: std.Thread.Pool = undefined;
    try pool.init(.{
        .allocator = allocator,
        .n_jobs = try std.Thread.getCpuCount() / 2,
    });
    defer pool.deinit();
    var wg: std.Thread.WaitGroup = .{};

    var server = try serve();

    while (server.accept()) |connection| {
        print("Accepted connection from: {}\n\n", .{connection.address});

        pool.spawnWg(&wg, struct {
            fn f(local_allocator: std.mem.Allocator, conn: net.Server.Connection) void {
                parse_data(local_allocator, conn) catch |err| {
                    std.log.err("Thread failed: {}", .{err});
                };
            }
        }.f, .{ allocator, connection });
    } else |err| {
        return err;
    }
}

Este código tem como objetivo substituir nossa função run_naive. Para cada conexão, o loop tenta criar um thread em um núcleo disponível. Se estiver disponível, analise a solicitação. Se não, espere um thread ficar disponível.

Ele tem um desempenho marginalmente melhor, tendo um tempo médio de resposta de 111 ms* e uma taxa de falha de **45%. Essa abordagem é provavelmente a melhor se nossos tempos de resposta forem limitados à CPU, o que significa que eles dependem principalmente do processamento de dados e de cálculos matemáticos para produzir a resposta. Mas, quando falamos sobre servidores web, esse raramente é o caso. A maioria das tarefas do servidor são limitadas à E/S, o que significa que passamos a maior parte do tempo esperando os dados chegarem pela rede. Nesses casos, nossos threads passarão a maior parte do tempo ociosos. Isso não é bom. E se houvesse uma maneira de deixar uma conexão em espera enquanto os dados ainda não estão completos e só então começar a processá-los?

Event pooling

Em linguagens como JavaScript ou Python, a lógica async/await já está implementada e faz exatamente isso: espera que uma tarefa I/O bound termine e libera o thread de execução enquanto isso não acontece. O Zig ainda não implementa async/await, então faremos isso nós mesmos.

A ideia por trás do async/await é simples: assinar um evento de nível de SO que será resolvido em algum momento no futuro. Quando o evento terminar, notifique o thread principal e continue executando. Esse padrão é chamado de pooling de eventos. A dificuldade de implementar um algoritmo que suporte pooling de eventos é que cada SO tem uma API diferente para interagir com eventos. Então o plano é usar libuv, uma biblioteca de baixo nível que cria uma interface estável entre SOs.

Para começar a usar a biblioteca, primeiro execute zig fetch --save git+https://github.com/allyourcodebase/libuv.git e adicione o seguinte ao seu build.zig:

    const libuv_dep = b.dependency("libuv", .{
        .target = target,
        .optimize = optimize,
    });
    exe.linkLibrary(libuv_dep.artifact("uv"));

    if (target.result.os.tag == .windows and optimize == .Debug) {
        exe.linkSystemLibrary("ucrtbased");
    }

Então, temos que criar um loop de eventos, que irá agrupar nossos manipuladores de eventos:

pub fn libuv_execute() void {
    print("Started server\n", .{});

    const loop = uv.uv_default_loop();

    var server: uv.uv_tcp_t = undefined;
    _ = uv.uv_tcp_init(loop, &server);

    var addr: uv.struct_sockaddr_in = undefined;
    _ = uv.uv_ip4_addr("0.0.0.0", 2500, &addr);
    _ = uv.uv_tcp_bind(&server, @ptrCast(&addr), 0);

    _ = uv.uv_listen(@ptrCast(&server), 128, on_connection);

    print("Listening on http://0.0.0.0:2500\n", .{});

    _ = uv.uv_run(loop, uv.UV_RUN_DEFAULT);
}

Isso é bem parecido com como implementamos um servidor anteriormente. Mas agora, temos um callback on_connection que manipula conexões: Vamos ver como funciona:

fn on_connection(server: [*c]uv.uv_stream_t, status: c_int) callconv(.c) void {
    const arena = std.heap.page_allocator.create(std.heap.ArenaAllocator) catch return;
    arena.* = std.heap.ArenaAllocator.init(std.heap.page_allocator);
    const allocator = arena.allocator();

    if (status < 0) {
        print("New connection error {s}\n", .{uv.uv_strerror(status)});
        return;
    }

    const client: [*c]uv.uv_tcp_t = allocator.create(uv.uv_tcp_t) catch return;

    _ = uv.uv_tcp_init(server.*.loop, client);
    client.*.data = @constCast(arena);

    if (uv.uv_accept(server, @ptrCast(client)) == 0) {
        _ = uv.uv_read_start(@ptrCast(client), alloc_buffer, on_read);
    }
}

Há muito o que falar aqui. Primeiro, libuv é uma biblioteca C, então precisamos adicionar callconv(.c) à assinatura da função para que a função possa ser chamada de dentro de um programa C. Além disso, o [*c] indica que o ponteiro é um ponteiro C. Segundo, criamos um alocador no heap, para que ele possa ser usado fora do escopo da função. Precisamos que ele seja um Arena Allocator, para que possamos limpar eficientemente toda a memória usada quando a resposta for enviada. Acrescentamos o ponteiro a este alocador no campo data da estrutura client, para que possamos recuperá-lo mais tarde. Por fim, passamos dois outros callbacks para uv_read_start, um para alocar um buffer que irá armazenar a conexão o e outro para ser chamado assim que os dados chegarem pela socket.

alloc_buffer é bem simples. Enquanto arrays em C são simplesmente uma sequência de bytes na memória, arrays em Zig são uma sequência de bytes na memória, precedidos pelo tamanho. Então, simplesmente recuperamos nosso alocador de data e o usamos para alocar um buffer de tamanho suficiente para caber suggested_size mais @sizeOf(usize):

fn alloc_buffer(handle: [*c]uv.uv_handle_t, suggested_size: usize, buf: [*c]uv.uv_buf_t) callconv(.c) void {
    const arena_ptr: *std.heap.ArenaAllocator = @ptrCast(@alignCast(handle.*.data));
    const allocator = arena_ptr.*.allocator();
    const actual_size = suggested_size + @sizeOf(usize);

    const memory = allocator.alloc(u8, actual_size) catch {
        buf.* = uv.uv_buf_init(null, 0);
        return;
    };

    @as(*usize, @ptrCast(@alignCast(memory.ptr))).* = actual_size;

    buf.* = uv.uv_buf_init(
        @ptrCast(@alignCast(memory.ptr + @sizeOf(usize))),
        @intCast(suggested_size),
    );
}

on_read é chamado enquanto os dados do socket estão sendo lidos. Ele recupera novamente o alocador do cliente, tenta ler os dados, analisa a solicitação e faz referência a outro callback a ser chamado quando a resposta for enviada, chamado on_write_complete:

fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
    const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
    const arena_ptr: *std.heap.ArenaAllocator = @ptrCast(@alignCast(stream.*.data));
    const allocator = arena_ptr.*.allocator();

    if (nread < 0) {
        if (nread != uv.UV_EOF) {
            print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
        }
        _ = uv.uv_close(@ptrCast(@alignCast(client)), null);
        return;
    }

    if (nread > 0) {
        const request_raw = buf.*.base[0..@intCast(nread)];

        var request: request_lib.Request = request_lib.parseRequest(allocator, request_raw) catch {
            print("Failed to parse request\n", .{});
            return;
        };
        request.printSelf();
        request.deinit();

        const response =
            "HTTP/1.1 200 OK \r\n" ++
            "Connection: close\r\n" ++
            "\r\n";

        const write_req = allocator.create(uv.uv_write_t) catch {
            print("Failed to allocate write_req\n", .{});
            return;
        };

        var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response)), @intCast(response.len));
        _ = uv.uv_write(
            write_req,
            stream,
            &buf_resp,
            1,
            on_write_complete,
        );
    }
}

on_write_complete é definido da seguinte forma:

fn on_write_complete(req: [*c]uv.uv_write_t, status: c_int) callconv(.C) void {
    if (status < 0) {
        print("Write error: {s}\n", .{uv.uv_strerror(status)});
    }
    _ = uv.uv_close(@ptrCast(req.*.handle), on_close);
}

fn on_close(handle: [*c]uv.uv_handle_t) callconv(.C) void {
    const arena_ptr: *std.heap.ArenaAllocator = @ptrCast(@alignCast(handle.*.data));
    _ = arena_ptr.*.deinit();
}

on_close é outro retorno de chamada, projetado para limpar o heap.

Agora, nosso servidor tem um tempo médio de resposta de 110 ms, o que não mostra nenhuma melhora significativa. Dito isso, a taxa de falha agora é de 23%. Muito melhor, certo? Em nossa próxima entrada de blog, vamos reduzir esses números.

Obrigado por ler até aqui :)