/* * Unix SMB/CIFS implementation. * cmocka tests for pthreadpool implementation * Copyright (C) 2025 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "pthreadpool_tevent.h" struct mutex_int { int num; /* protect num */ pthread_mutex_t mutex; }; static void safe_increment(struct mutex_int *counter) { int ret; ret = pthread_mutex_lock(&counter->mutex); assert_int_equal(ret, 0); counter->num++; ret = pthread_mutex_unlock(&counter->mutex); assert_int_equal(ret, 0); } /* Test fixture structure */ struct test_context { TALLOC_CTX *mem_ctx; struct tevent_context *ev; struct pthreadpool_tevent *pool; int g_job_executed; }; /* Global state for tracking callbacks */ /* Reset global test state */ static void reset_test_state(struct test_context *state) { state->g_job_executed = 0; } /* Setup function - called before each test */ static int setup(void **state) { struct test_context *ctx; ctx = talloc_zero(NULL, struct test_context); assert_non_null(ctx); ctx->ev = tevent_context_init(ctx); assert_non_null(ctx->ev); reset_test_state(ctx); *state = ctx; return 0; } /* Teardown function - called after each test */ static int teardown(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); TALLOC_FREE(ctx->pool); TALLOC_FREE(ctx->ev); TALLOC_FREE(ctx); return 0; } /* Mock job function for testing */ static void mock_job_fn(void *private_data) { struct test_context *state = talloc_get_type_abort(private_data, struct test_context); /* Simulate some work */ usleep(10000); /* 10ms */ state->g_job_executed++; } /* Quick job function */ static void quick_job_fn(void *private_data) { struct mutex_int *counter = (struct mutex_int *)private_data; safe_increment(counter); } /* Slow job function */ static void slow_job_fn(void *private_data) { struct mutex_int *counter = (struct mutex_int *)private_data; /* Simulate some work */ usleep(10000); /* 10ms */ safe_increment(counter); } /* Slower job function */ static void wait_fn(void *private_data) { int *timeout = private_data; poll(NULL, 0, *timeout); } struct job_completion_state { int num_jobs; int status; struct test_context *ctx; }; /* Tevent request callback */ static void job_completion_callback(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct job_completion_state *state = tevent_req_data(req, struct job_completion_state); state->status = pthreadpool_tevent_job_recv(subreq); TALLOC_FREE(subreq); state->num_jobs = state->num_jobs - 1; if (state->num_jobs == 0) { tevent_req_done(req); } } /* * Test: pthreadpool_tevent_init with valid parameters */ static void test_pthreadpool_tevent_init_valid(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); int ret; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); assert_non_null(ctx->pool); } /* * Test: pthreadpool_tevent_init with zero max_threads (sync mode) */ static void test_pthreadpool_tevent_init_unlimited(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); int ret; ret = pthreadpool_tevent_init(ctx, 0, &ctx->pool); assert_int_equal(ret, 0); assert_non_null(ctx->pool); } /* * Test: pthreadpool_tevent_init with large thread count */ static void test_pthreadpool_tevent_init_large_threads(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); int ret; ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool); /* Should handle large values gracefully */ if (ret == 0) { assert_non_null(ctx->pool); } } /* * Test: pthreadpool_tevent_max_threads returns correct value */ static void test_pthreadpool_tevent_max_threads(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); size_t max_threads; int ret; ret = pthreadpool_tevent_init(ctx, 8, &ctx->pool); assert_int_equal(ret, 0); max_threads = pthreadpool_tevent_max_threads(ctx->pool); assert_int_equal(max_threads, 8); } /* * Test: pthreadpool_tevent_max_threads with sync mode */ static void test_pthreadpool_tevent_max_threads_unlimited(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); size_t max_threads; int ret; ret = pthreadpool_tevent_init(ctx, 0, &ctx->pool); assert_int_equal(ret, 0); max_threads = pthreadpool_tevent_max_threads(ctx->pool); assert_int_equal(max_threads, 0); } /* * Test: pthreadpool_tevent_queued_jobs initially returns zero */ static void test_pthreadpool_tevent_queued_jobs_empty(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); size_t queued; int ret; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); queued = pthreadpool_tevent_queued_jobs(ctx->pool); assert_int_equal(queued, 0); } /* * Test: pthreadpool_tevent_job_send with valid parameters */ static void test_pthreadpool_tevent_job_send_valid(void **ppstate) { struct test_context *ctx = talloc_get_type_abort(*ppstate, struct test_context); struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; struct job_completion_state *state = NULL; int ret; req = tevent_req_create(ctx, &state, struct job_completion_state); assert_non_null(req); state->ctx = ctx; state->num_jobs = 1; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); subreq = pthreadpool_tevent_job_send( ctx->mem_ctx, ctx->ev, ctx->pool, mock_job_fn, ctx); assert_non_null(subreq); /* Set callback */ tevent_req_set_callback(subreq, job_completion_callback, req); /* wait for event to complete*/ assert_true(tevent_req_poll(req,ctx->ev)); TALLOC_FREE(req); } /* * Test: pthreadpool_tevent_job_send with private data */ static void test_pthreadpool_tevent_job_send_with_private_data(void **ppstate) { struct test_context *ctx = talloc_get_type_abort(*ppstate, struct test_context); struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; struct mutex_int test_data = {0}; struct job_completion_state *state = NULL; int ret; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); ret = pthread_mutex_init(&test_data.mutex, NULL); assert_int_equal(ret, 0); test_data.num = 42; req = tevent_req_create(ctx, &state, struct job_completion_state); assert_non_null(req); state->ctx = ctx; state->num_jobs = 1; subreq = pthreadpool_tevent_job_send( ctx->mem_ctx, ctx->ev, ctx->pool, quick_job_fn, &test_data); assert_non_null(subreq); tevent_req_set_callback(subreq, job_completion_callback, req); /* wait for event to complete*/ assert_true(tevent_req_poll(req, ctx->ev)); /* Job should have incremented test_data */ assert_int_equal(test_data.num, 43); pthread_mutex_destroy(&test_data.mutex); } /* * Test: pthreadpool_tevent_job_send multiple jobs */ static void test_pthreadpool_tevent_job_send_multiple(void **ppstate) { struct test_context *ctx = talloc_get_type_abort(*ppstate, struct test_context); struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; struct mutex_int counter = {0}; struct job_completion_state *state = NULL; int num_jobs = 5; int ret; int num; int i; req = tevent_req_create(ctx, &state, struct job_completion_state); assert_non_null(req); state->ctx = ctx; state->num_jobs = num_jobs; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Submit multiple jobs */ for (i = 0; i < num_jobs; i++) { subreq = pthreadpool_tevent_job_send(ctx->mem_ctx, ctx->ev, ctx->pool, slow_job_fn, &counter); assert_non_null(subreq); tevent_req_set_callback(subreq, job_completion_callback, req); } /* wait for events to complete*/ assert_true(tevent_req_poll(req,ctx->ev)); /* All jobs should have completed */ assert_int_equal(state->num_jobs, 0); num = counter.num; assert_int_equal(num, 5); pthread_mutex_destroy(&counter.mutex); TALLOC_FREE(req); } /* * Test: pthreadpool_tevent_job_send multiple jobs, mixing * sync and async. */ static void test_pthreadpool_tevent_job_send_multiple_2(void **ppstate) { struct test_context *ctx = talloc_get_type_abort(*ppstate, struct test_context); struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; struct job_completion_state *state = NULL; int num_jobs = 10; int timeout10 = 10; int timeout100 = 100; int i; int ret; req = tevent_req_create(ctx, &state, struct job_completion_state); assert_non_null(req); state->ctx = ctx; state->num_jobs = num_jobs; ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool); assert_int_equal(ret, 0); /* * Intersperse pthreadpool_tevent jobs processed synchronously * (with temporary sub-event context) and pthreadpool_tevent * processed asynchronously. * This is analogous to smb_vfs_fsync_sync calls happening * concurrently with other asynchronous io calls in smbd */ for (i = 0; i < num_jobs; i++) { if (i % 2) { subreq = pthreadpool_tevent_job_send(ctx->mem_ctx, ctx->ev, ctx->pool, wait_fn, &timeout100); assert_non_null(subreq); tevent_req_set_callback(subreq, job_completion_callback, req); } else { TALLOC_CTX *mem_ctx = talloc_new(NULL); bool ok; struct tevent_context *tmp_ev = tevent_context_init(mem_ctx); subreq = pthreadpool_tevent_job_send(tmp_ev, tmp_ev, ctx->pool, wait_fn, &timeout10); assert_non_null(subreq); ok = tevent_req_poll(subreq, tmp_ev); assert_true(ok); ret = pthreadpool_tevent_job_recv(subreq); assert_int_equal(ret, 0); state->num_jobs -= 1; if (state->num_jobs == 0) { tevent_req_done(req); } TALLOC_FREE(mem_ctx); } } /* wait for events to complete*/ assert_true(tevent_req_poll(req,ctx->ev)); /* All jobs should have completed */ assert_int_equal(state->num_jobs, 0); TALLOC_FREE(req); } struct nested_state { struct pthreadpool_tevent *pool; int timeout; }; static void do_nested_pthread_job(void *private_data) { struct nested_state *state = private_data; TALLOC_CTX *ctx = talloc_new(NULL); bool ok; struct tevent_context *tmp_ev = tevent_context_init(ctx); struct tevent_req *subreq = NULL; int ret; assert_non_null(tmp_ev); subreq = pthreadpool_tevent_job_send( tmp_ev, tmp_ev, state->pool, wait_fn, &state->timeout); assert_non_null(subreq); ok = tevent_req_poll(subreq, tmp_ev); assert_true(ok); ret = pthreadpool_tevent_job_recv(subreq); assert_int_equal(ret,0); TALLOC_FREE(ctx); } /* * Test: pthreadpool_tevent_job_send multiple jobs, * where jobs can themselves initiate a nested job */ static void test_pthreadpool_tevent_job_send_multiple_3(void **ppstate) { struct test_context *ctx = talloc_get_type_abort(*ppstate, struct test_context); struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; struct job_completion_state *state = NULL; int num_jobs = 10; int timeout100 = 100; int i; int ret; req = tevent_req_create(ctx, &state, struct job_completion_state); assert_non_null(req); state->ctx = ctx; state->num_jobs = num_jobs; ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool); assert_int_equal(ret, 0); /* * Intersperse pthreadpool_tevent jobs processed synchronously * (with temporary sub-event context) and pthreadpool_tevent * processed asynchronously. * This is analogous to smb_vfs_fsync_sync calls happening * concurrently with other asynchronous io calls in smbd */ for (i = 0; i < num_jobs; i++) { struct nested_state *nested_state = talloc_zero(ctx->mem_ctx, struct nested_state); assert_non_null(nested_state); nested_state->pool = ctx->pool; nested_state->timeout = timeout100; subreq = pthreadpool_tevent_job_send(ctx->mem_ctx, ctx->ev, ctx->pool, do_nested_pthread_job, nested_state); assert_non_null(subreq); tevent_req_set_callback(subreq, job_completion_callback, req); } /* wait for events to complete*/ assert_true(tevent_req_poll(req,ctx->ev)); /* All jobs should have completed */ assert_int_equal(state->num_jobs, 0); TALLOC_FREE(req); } /* * Test: pthreadpool_tevent_job_recv with valid request */ static void test_pthreadpool_tevent_job_recv_valid(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); struct tevent_req *req; int ret; bool ok; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); req = pthreadpool_tevent_job_send( ctx->mem_ctx, ctx->ev, ctx->pool, mock_job_fn, ctx); assert_non_null(req); ok = tevent_req_poll(req, ctx->ev); assert_true(ok); /* Receive result */ ret = pthreadpool_tevent_job_recv(req); assert_int_equal(ret, 0); TALLOC_FREE(req); } /* * Test: pthreadpool_tevent_queued_jobs tracking */ static void test_pthreadpool_tevent_queued_jobs_tracking(void **ppstate) { struct test_context *ctx = talloc_get_type_abort(*ppstate, struct test_context); struct tevent_req *req = NULL; struct job_completion_state *state = NULL; int ret; int i; size_t queued; struct mutex_int counter = {0}; req = tevent_req_create(ctx, &state, struct job_completion_state); assert_non_null(req); state->ctx = ctx; state->num_jobs = 3; ret = pthreadpool_tevent_init(ctx, 1, &ctx->pool); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Submit jobs faster than they can be processed */ for (i = 0; i < state->num_jobs; i++) { struct tevent_req *subreq = NULL; subreq = pthreadpool_tevent_job_send(ctx->mem_ctx, ctx->ev, ctx->pool, slow_job_fn, &counter); assert_non_null(subreq); tevent_req_set_callback(subreq, job_completion_callback, req); } /* Check queued jobs (some may be queued) */ queued = pthreadpool_tevent_queued_jobs(ctx->pool); assert_true(queued > 0); /* Should have at least some jobs queued or processing */ /* Exact number depends on timing */ /* wait for events to complete*/ assert_true(tevent_req_poll(req,ctx->ev)); /* Clean up */ assert_int_equal(state->num_jobs, 0); TALLOC_FREE(req); pthread_mutex_destroy(&counter.mutex); } /* * Test: Memory cleanup with talloc */ static void test_memory_cleanup(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); TALLOC_CTX *tmp_ctx; struct tevent_req *req; int ret; int i; struct mutex_int counter = {0}; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Create temporary context */ tmp_ctx = talloc_new(ctx->mem_ctx); assert_non_null(tmp_ctx); /* Allocate request in temporary context */ req = pthreadpool_tevent_job_send( tmp_ctx, ctx->ev, ctx->pool, quick_job_fn, &counter); assert_non_null(req); /* * wait for work to be done, but don't interact with tevent * e.g. don't call any tevent poll etc. */ for (i = 0; i < 100; i++) { int num; usleep(10000); ret = pthread_mutex_lock(&counter.mutex); assert_int_equal(ret, 0); num = counter.num; ret = pthread_mutex_unlock(&counter.mutex); assert_int_equal(ret, 0); if (num == 1) { break; } } /* Free temporary context - should clean up request */ TALLOC_FREE(tmp_ctx); /* Pool should still be valid */ assert_non_null(ctx->pool); pthread_mutex_destroy(&counter.mutex); } /* * Test: Callback execution */ static void test_callback_execution(void **ppstate) { struct test_context *ctx = talloc_get_type_abort(*ppstate, struct test_context); struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; int ret; struct job_completion_state *state = NULL; reset_test_state(ctx); req = tevent_req_create(ctx, &state, struct job_completion_state); assert_non_null(req); state->ctx = ctx; state->num_jobs = 1; ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); subreq = pthreadpool_tevent_job_send( ctx->mem_ctx, ctx->ev, ctx->pool, mock_job_fn, ctx); assert_non_null(subreq); tevent_req_set_callback(subreq, job_completion_callback, req); /* wait for event to complete*/ assert_true(tevent_req_poll(req,ctx->ev)); /* Callback should have been executed */ assert_int_equal(state->num_jobs, 0); assert_int_equal(state->status, 0); TALLOC_FREE(req); } /* * Test: Job execution verification */ static void test_job_execution(void **state) { struct test_context *ctx = talloc_get_type_abort(*state, struct test_context); struct tevent_req *req; int ret; bool ok; reset_test_state(ctx); ret = pthreadpool_tevent_init(ctx, 4, &ctx->pool); assert_int_equal(ret, 0); req = pthreadpool_tevent_job_send( ctx->mem_ctx, ctx->ev, ctx->pool, mock_job_fn, ctx); assert_non_null(req); ok = tevent_req_poll(req, ctx->ev); assert_true(ok); /* Job should have been executed */ assert_int_equal(ctx->g_job_executed, 1); TALLOC_FREE(req); } int main(void) { const struct CMUnitTest tests[] = { cmocka_unit_test_setup_teardown(test_pthreadpool_tevent_init_valid, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_init_unlimited, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_init_large_threads, setup, teardown), cmocka_unit_test_setup_teardown(test_pthreadpool_tevent_max_threads, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_max_threads_unlimited, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_queued_jobs_empty, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_job_send_valid, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_job_send_with_private_data, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_job_send_multiple, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_job_send_multiple_2, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_job_send_multiple_3, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_job_recv_valid, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_tevent_queued_jobs_tracking, setup, teardown), cmocka_unit_test_setup_teardown(test_memory_cleanup, setup, teardown), cmocka_unit_test_setup_teardown(test_callback_execution, setup, teardown), cmocka_unit_test_setup_teardown(test_job_execution, setup, teardown), }; cmocka_set_message_output(CM_OUTPUT_SUBUNIT); return cmocka_run_group_tests(tests, NULL, NULL); }