Servidor HTTP - Parte 8

Usando o loop de eventos

Enzo Soares

Publicado em 4 de Maio de 2025 às 19:39

Alô!

Faz um tempo que não escrevo aqui. Após analisar o código até o momento, temos um problema grave: a lógica definida pelo usuário através de um callback é executada na mesma thread que o loop de eventos. Caso esse callback demore para executar, seja por uma computação pesada ou por um ação de entrada ou saída de dados, o servidor se tornará irresponsivo. A solução é simples: criar uma ação na fila de eventos que será executada em outra thread, executando um callback quando for finalizado.

A função on_read, é executada na thread principal e é nela mesmo que faremos alterações. Ela se encontra dessa forma agora:

fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
    const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
    const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
    const main_allocator = ctx.*.allocator;
    var buffer_pool = ctx.*.buffer_allocator;
    const request_pool = ctx.*.request_allocator;
    const response_pool = ctx.*.response_allocator;
    var write_req_pool = ctx.*.write_req_allocator;
    const header_pool = ctx.*.header_allocator;
    const app_context = ctx.*.app_context;

    defer {
        if (buf.*.base != null) {
            buffer_pool.destroy(@ptrCast(@alignCast(buf.*.base)));
        }
    }

    if (nread < 0) {
        if (nread != uv.UV_EOF) {
            print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
        }
        _ = uv.uv_close(@ptrCast(@alignCast(client)), on_close);
    }

    if (nread > 0) {
        const allocator = main_allocator.*;
        const request_raw = buf.*.base[0..@intCast(nread)];

        const request_struct = start_request(request_raw, header_pool, request_pool) catch {
            print("There was an error initializing the request", .{});
            return;
        };
        const request = request_struct.request;
        defer request_struct.cleanup(request, header_pool, request_pool) catch {
            print("Failed to deinit request\n", .{});
        };

        const response_struct = start_response(header_pool, response_pool) catch {
            print("There was an error initializing the response", .{});
            return;
        };
        const response = response_struct.response;
        defer response_struct.cleanup(response, header_pool, response_pool) catch {
            print("Failed to deinit response\n", .{});
        };

        if (AppContext == void) {
            ctx.*.callback(allocator, request, response) catch {
                return;
            };
        } else {
            ctx.*.callback(allocator, app_context, request, response) catch {
                return;
            };
        }

        const response_text = response.to_string(allocator) catch {
            print("Failed to write response\n", .{});
            return;
        };
        if (response_text.in_heap) {
            defer allocator.free(response_text.string);
        }

        const write_req = write_req_pool.create() catch {
            print("Failed to allocate write_req\n", .{});
            return;
        };

        var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response_text.string)), @intCast(response_text.string.len));
        _ = uv.uv_write(
            write_req,
            stream,
            &buf_resp,
            1,
            if (is_keep_alive(request)) on_write_complete_keep_alive else on_write_complete_no_keep_alive,
        );
    }
}

Para criar um trabalho na fila de eventos, devemos chamar a função uv_queue_work e devemos colocar todos os dados relevantes para que o trabalho seja executado numa struct bastão, que vai tomar a seguinte forma:

pub const Baton = struct {
    req: uv.uv_work_t,
    stream: [*c]uv.uv_stream_t,
    buf: []u8,
};

Nessa struct, req é um objeto a ser registrado na loop de eventos, stream é onde a resposta será escrita e buf é um ponteiro para um vetor de caracteres que contém a requisição como ela foi recebida. A nova implementação de on_read fica assim agora:

fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
    const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
    const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
    const baton_pool = ctx.*.baton_allocator;
    const buffer_pool = ctx.*.buffer_allocator;

    defer {
        if (buf.*.base != null) {
            buffer_pool.destroy(@ptrCast(@alignCast(buf.*.base)));
        }
    }

    if (nread < 0 or buf.*.base == null) {
        if (nread != uv.UV_EOF) {
            print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
        }
        _ = uv.uv_close(@ptrCast(@alignCast(stream)), on_close);
    }

    if (nread > 0) {
        const baton = baton_pool.create() catch {
            print("Failed to allocate baton\n", .{});
            return;
        };
        const request_raw = ctx.*.allocator.*.dupe(u8, buf.*.base[0..@intCast(nread)]) catch {
            baton_pool.destroy(baton);
            std.debug.print("Failed to copy request data\n", .{});
            return;
        };

        baton.*.req.data = @ptrCast(@alignCast(baton));
        baton.*.stream = stream;
        baton.*.buf = request_raw;

        const res = uv.uv_queue_work(stream.*.loop, @ptrCast(@alignCast(&baton.*.req)), process, after_process);
        if (res != 0) {
            _ = uv.uv_close(@ptrCast(@alignCast(stream)), on_close);
        }
    }
}

A função process executa o callback e a função after_process limpa a memória utilizada e escreve a resposta na saída. Para que essa ação seja executada, precisamos adicionar mais três campos à struct Baton, a serem preenchidos em process:

pub const Baton = struct {
    req: uv.uv_work_t,
    stream: [*c]uv.uv_stream_t,
    buf: []u8,
    response: *Response,
    cleanup_response_cb: *const fn (*Response, *header_t, *response_pool_t) anyerror!void,
    on_write_cb: *const fn([*c]uv.uv_write_t, c_int) callconv(.c) void,
};

response é um ponteiro para a resposta, para que ela possa ser convertida em texto, cleanupresponsecb é um callback para limpar essa mesma resposta e onwritecb é a função que deve ser executada após a escrita finalizar.

A implementação final das duas funções é:

fn after_process(req: [*c]uv.uv_work_t, status: c_int) callconv(.c) void {
    const baton: *Baton = @ptrCast(@alignCast(req.*.data));
    const stream = baton.*.stream;
    const ctx: *Context = @ptrCast(@alignCast(stream.*.data));

    if (status < 0) {
        print("Operation cancelled\n", .{});
        return;
    }

    const allocator = ctx.*.allocator.*;
    var write_req_pool = ctx.*.write_req_allocator;
    const response_pool = ctx.*.response_allocator;
    const header_pool = ctx.*.header_allocator;
    const baton_pool = ctx.*.baton_allocator;

    const response = baton.*.response;
    const on_write_cb = baton.*.on_write_cb;
    const cleanup_response_cb = baton.*.cleanup_response_cb;
    const buf = baton.*.buf;

    defer {
        cleanup_response_cb(response, header_pool, response_pool) catch {
            print("Failed to deinit response\n", .{});
        };
        baton_pool.destroy(baton);
        allocator.free(buf);
    }

    const response_text = response.*.to_string(allocator) catch {
        print("Failed to write response\n", .{});
        return;
    };
    if (response_text.in_heap) {
        defer allocator.free(response_text.string);
    }

    const write_req = write_req_pool.create() catch {
        print("Failed to allocate write_req\n", .{});
        return;
    };

    var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response_text.string)), @intCast(response_text.string.len));
    _ = uv.uv_write(
        write_req,
        stream,
        &buf_resp,
        1,
        on_write_cb,
    );
}

fn process(req: [*c]uv.uv_work_t) callconv(.c) void {
    const baton: *Baton = @ptrCast(@alignCast(req.*.data));
    const stream = baton.*.stream;
    const ctx: *Context = @ptrCast(@alignCast(stream.*.data));

    const request_pool = ctx.*.request_allocator;
    const response_pool = ctx.*.response_allocator;
    const header_pool = ctx.*.header_allocator;
    const app_context = ctx.*.app_context;

    const allocator = ctx.*.allocator.*;
    const request_raw = baton.*.buf;

    const request_struct = start_request(request_raw, header_pool, request_pool) catch {
        print("There was an error initializing the request\n", .{});
        return;
    };
    const request = request_struct.request;
    defer request_struct.cleanup(request, header_pool, request_pool) catch {
        print("Failed to deinit request\n", .{});
    };

    const response_struct = start_response(header_pool, response_pool) catch {
        print("There was an error initializing the response\n", .{});
        return;
    };
    const response = response_struct.response;

    if (AppContext == void) {
        ctx.*.callback(allocator, request, response) catch {
            return;
        };
    } else {
        ctx.*.callback(allocator, app_context, request, response) catch {
            return;
        };
    }
    baton.*.response = response;
    baton.*.on_write_cb = if (is_keep_alive(request)) on_write_complete_keep_alive else on_write_complete_no_keep_alive;
    baton.*.cleanup_response_cb = response_struct.cleanup;
}

A função uv_queue_work recebe dois callbacks como parâmetros: o primeiro é executado fora da thread principal e o segundo é executado logo após a conclusão do primeiro, na thread principal.

Um detalhe dessa implementação que eu não tenho muita certeza se está certa é que eu passo tanto response quanto cleanupresponsecb dentro do Baton para serem utilizados por after_process, quando talvez eu só precisasse informar um ponteiro para responsetext e deixar after_process liberá-lo. O motivo de ter feito da forma como está agora é que, na maioria dos casos, responsetext vai viver na stack e não pode ser passado entre funções num loop de eventos. Pelos meus testes, é mais rápido travar o loop de eventos por um pouco durante response.*.to_string do que fazer uma alocação dinâmica em todo on_process.

Isso é tudo por hoje. Gratidão!