Enzo Soares
Publicado em 4 de Maio de 2025 às 19:39
Alô!
Faz um tempo que não escrevo aqui. Após analisar o código até o momento, temos um problema grave: a lógica definida pelo usuário através de um callback é executada na mesma thread que o loop de eventos. Caso esse callback demore para executar, seja por uma computação pesada ou por um ação de entrada ou saída de dados, o servidor se tornará irresponsivo. A solução é simples: criar uma ação na fila de eventos que será executada em outra thread, executando um callback quando for finalizado.
A função on_read, é executada na thread principal e é nela mesmo que faremos alterações. Ela se encontra dessa forma agora:
fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
const main_allocator = ctx.*.allocator;
var buffer_pool = ctx.*.buffer_allocator;
const request_pool = ctx.*.request_allocator;
const response_pool = ctx.*.response_allocator;
var write_req_pool = ctx.*.write_req_allocator;
const header_pool = ctx.*.header_allocator;
const app_context = ctx.*.app_context;
defer {
if (buf.*.base != null) {
buffer_pool.destroy(@ptrCast(@alignCast(buf.*.base)));
}
}
if (nread < 0) {
if (nread != uv.UV_EOF) {
print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
}
_ = uv.uv_close(@ptrCast(@alignCast(client)), on_close);
}
if (nread > 0) {
const allocator = main_allocator.*;
const request_raw = buf.*.base[0..@intCast(nread)];
const request_struct = start_request(request_raw, header_pool, request_pool) catch {
print("There was an error initializing the request", .{});
return;
};
const request = request_struct.request;
defer request_struct.cleanup(request, header_pool, request_pool) catch {
print("Failed to deinit request\n", .{});
};
const response_struct = start_response(header_pool, response_pool) catch {
print("There was an error initializing the response", .{});
return;
};
const response = response_struct.response;
defer response_struct.cleanup(response, header_pool, response_pool) catch {
print("Failed to deinit response\n", .{});
};
if (AppContext == void) {
ctx.*.callback(allocator, request, response) catch {
return;
};
} else {
ctx.*.callback(allocator, app_context, request, response) catch {
return;
};
}
const response_text = response.to_string(allocator) catch {
print("Failed to write response\n", .{});
return;
};
if (response_text.in_heap) {
defer allocator.free(response_text.string);
}
const write_req = write_req_pool.create() catch {
print("Failed to allocate write_req\n", .{});
return;
};
var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response_text.string)), @intCast(response_text.string.len));
_ = uv.uv_write(
write_req,
stream,
&buf_resp,
1,
if (is_keep_alive(request)) on_write_complete_keep_alive else on_write_complete_no_keep_alive,
);
}
}
Para criar um trabalho na fila de eventos, devemos chamar a função uv_queue_work e devemos colocar todos os dados relevantes para que o trabalho seja executado numa struct bastão, que vai tomar a seguinte forma:
pub const Baton = struct {
req: uv.uv_work_t,
stream: [*c]uv.uv_stream_t,
buf: []u8,
};
Nessa struct, req é um objeto a ser registrado na loop de eventos, stream é onde a resposta será escrita e buf é um ponteiro para um vetor de caracteres que contém a requisição como ela foi recebida. A nova implementação de on_read fica assim agora:
fn on_read(client: *anyopaque, nread: isize, buf: [*c]const uv.uv_buf_t) callconv(.c) void {
const stream: [*c]uv.uv_stream_t = @ptrCast(@alignCast(client));
const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
const baton_pool = ctx.*.baton_allocator;
const buffer_pool = ctx.*.buffer_allocator;
defer {
if (buf.*.base != null) {
buffer_pool.destroy(@ptrCast(@alignCast(buf.*.base)));
}
}
if (nread < 0 or buf.*.base == null) {
if (nread != uv.UV_EOF) {
print("Read error: {s}\n", .{uv.uv_strerror(@intCast(nread))});
}
_ = uv.uv_close(@ptrCast(@alignCast(stream)), on_close);
}
if (nread > 0) {
const baton = baton_pool.create() catch {
print("Failed to allocate baton\n", .{});
return;
};
const request_raw = ctx.*.allocator.*.dupe(u8, buf.*.base[0..@intCast(nread)]) catch {
baton_pool.destroy(baton);
std.debug.print("Failed to copy request data\n", .{});
return;
};
baton.*.req.data = @ptrCast(@alignCast(baton));
baton.*.stream = stream;
baton.*.buf = request_raw;
const res = uv.uv_queue_work(stream.*.loop, @ptrCast(@alignCast(&baton.*.req)), process, after_process);
if (res != 0) {
_ = uv.uv_close(@ptrCast(@alignCast(stream)), on_close);
}
}
}
A função process executa o callback e a função after_process limpa a memória utilizada e escreve a resposta na saída. Para que essa ação seja executada, precisamos adicionar mais três campos à struct Baton, a serem preenchidos em process:
pub const Baton = struct {
req: uv.uv_work_t,
stream: [*c]uv.uv_stream_t,
buf: []u8,
response: *Response,
cleanup_response_cb: *const fn (*Response, *header_t, *response_pool_t) anyerror!void,
on_write_cb: *const fn([*c]uv.uv_write_t, c_int) callconv(.c) void,
};
response é um ponteiro para a resposta, para que ela possa ser convertida em texto, cleanupresponsecb é um callback para limpar essa mesma resposta e onwritecb é a função que deve ser executada após a escrita finalizar.
A implementação final das duas funções é:
fn after_process(req: [*c]uv.uv_work_t, status: c_int) callconv(.c) void {
const baton: *Baton = @ptrCast(@alignCast(req.*.data));
const stream = baton.*.stream;
const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
if (status < 0) {
print("Operation cancelled\n", .{});
return;
}
const allocator = ctx.*.allocator.*;
var write_req_pool = ctx.*.write_req_allocator;
const response_pool = ctx.*.response_allocator;
const header_pool = ctx.*.header_allocator;
const baton_pool = ctx.*.baton_allocator;
const response = baton.*.response;
const on_write_cb = baton.*.on_write_cb;
const cleanup_response_cb = baton.*.cleanup_response_cb;
const buf = baton.*.buf;
defer {
cleanup_response_cb(response, header_pool, response_pool) catch {
print("Failed to deinit response\n", .{});
};
baton_pool.destroy(baton);
allocator.free(buf);
}
const response_text = response.*.to_string(allocator) catch {
print("Failed to write response\n", .{});
return;
};
if (response_text.in_heap) {
defer allocator.free(response_text.string);
}
const write_req = write_req_pool.create() catch {
print("Failed to allocate write_req\n", .{});
return;
};
var buf_resp = uv.uv_buf_init(@ptrCast(@constCast(response_text.string)), @intCast(response_text.string.len));
_ = uv.uv_write(
write_req,
stream,
&buf_resp,
1,
on_write_cb,
);
}
fn process(req: [*c]uv.uv_work_t) callconv(.c) void {
const baton: *Baton = @ptrCast(@alignCast(req.*.data));
const stream = baton.*.stream;
const ctx: *Context = @ptrCast(@alignCast(stream.*.data));
const request_pool = ctx.*.request_allocator;
const response_pool = ctx.*.response_allocator;
const header_pool = ctx.*.header_allocator;
const app_context = ctx.*.app_context;
const allocator = ctx.*.allocator.*;
const request_raw = baton.*.buf;
const request_struct = start_request(request_raw, header_pool, request_pool) catch {
print("There was an error initializing the request\n", .{});
return;
};
const request = request_struct.request;
defer request_struct.cleanup(request, header_pool, request_pool) catch {
print("Failed to deinit request\n", .{});
};
const response_struct = start_response(header_pool, response_pool) catch {
print("There was an error initializing the response\n", .{});
return;
};
const response = response_struct.response;
if (AppContext == void) {
ctx.*.callback(allocator, request, response) catch {
return;
};
} else {
ctx.*.callback(allocator, app_context, request, response) catch {
return;
};
}
baton.*.response = response;
baton.*.on_write_cb = if (is_keep_alive(request)) on_write_complete_keep_alive else on_write_complete_no_keep_alive;
baton.*.cleanup_response_cb = response_struct.cleanup;
}
A função uv_queue_work recebe dois callbacks como parâmetros: o primeiro é executado fora da thread principal e o segundo é executado logo após a conclusão do primeiro, na thread principal.
Um detalhe dessa implementação que eu não tenho muita certeza se está certa é que eu passo tanto response quanto cleanupresponsecb dentro do Baton para serem utilizados por after_process, quando talvez eu só precisasse informar um ponteiro para responsetext e deixar after_process liberá-lo. O motivo de ter feito da forma como está agora é que, na maioria dos casos, responsetext vai viver na stack e não pode ser passado entre funções num loop de eventos. Pelos meus testes, é mais rápido travar o loop de eventos por um pouco durante response.*.to_string do que fazer uma alocação dinâmica em todo on_process.
Isso é tudo por hoje. Gratidão!