/* * Unix SMB/CIFS implementation. * cmocka tests for pthreadpool implementation * Copyright (C) 2025 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include #include "pthreadpool.h" /* Test state structure */ struct test_state { struct pthreadpool *pool; int signal_received; int signal_job_id; void (*signal_job_fn)(void *); void *signal_job_fn_data; /* protect test_state */ pthread_mutex_t mutex; }; struct mutex_int { int num; /* protect num */ pthread_mutex_t mutex; }; /* Signal function for testing */ static int test_signal_fn(int jobid, void (*job_fn)(void *private_data), void *job_fn_private_data, void *private_data) { int ret; struct test_state *state = talloc_get_type_abort(private_data, struct test_state); ret = pthread_mutex_lock(&state->mutex); assert_int_equal(ret, 0); state->signal_received++; state->signal_job_id = jobid; state->signal_job_fn = job_fn; state->signal_job_fn_data = job_fn_private_data; ret = pthread_mutex_unlock(&state->mutex); assert_int_equal(ret, 0); return 0; } static void safe_increment(struct mutex_int *counter) { int ret; ret = pthread_mutex_lock(&counter->mutex); assert_int_equal(ret, 0); counter->num++; ret = pthread_mutex_unlock(&counter->mutex); assert_int_equal(ret, 0); } /* Simple job function that increments a counter (in a thread safe way)*/ static void increment_job(void *private_data) { struct mutex_int *num = (struct mutex_int*)private_data; safe_increment(num); } /* Job function that sleeps briefly */ static void sleep_job(void *private_data) { int *duration = (int *)private_data; usleep(*duration * 1000); /* Convert ms to microseconds */ } /* Setup function */ static int setup(void **state) { struct test_state *test_state = NULL; int ret; test_state = talloc_zero(NULL, struct test_state); assert_non_null(test_state); ret = pthread_mutex_init(&test_state->mutex, NULL); assert_int_equal(ret, 0); *state = test_state; return 0; } /* Teardown function */ static int teardown(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); if (test_state->pool != NULL) { pthreadpool_destroy(test_state->pool); test_state->pool = NULL; } pthread_mutex_destroy(&test_state->mutex); TALLOC_FREE(test_state); return 0; } /* Test: Initialize pool with different max_threads values */ static void test_pthreadpool_init(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; /* Test with unlimited threads (0) */ ret = pthreadpool_init(0, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); assert_non_null(test_state->pool); assert_int_equal(pthreadpool_max_threads(test_state->pool), 0); pthreadpool_destroy(test_state->pool); test_state->pool = NULL; /* Test with limited threads */ ret = pthreadpool_init(4, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); assert_non_null(test_state->pool); assert_int_equal(pthreadpool_max_threads(test_state->pool), 4); pthreadpool_destroy(test_state->pool); test_state->pool = NULL; /* Test with 1 thread */ ret = pthreadpool_init(1, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); assert_non_null(test_state->pool); assert_int_equal(pthreadpool_max_threads(test_state->pool), 1); } /* Test: Add and execute a simple job */ static void test_pthreadpool_add_job_simple(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; struct mutex_int counter = {0}; int timeout; int signal_received = 0; ret = pthreadpool_init(2, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Add a job */ ret = pthreadpool_add_job(test_state->pool, 1, increment_job, &counter); assert_int_equal(ret, 0); /* Wait for job completion (with timeout) */ timeout = 0; do { ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); signal_received = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); usleep(10000); /* 10ms */ timeout++; } while (signal_received == 0 && timeout < 100); /* Verify job was executed */ assert_int_equal(counter.num, 1); assert_int_equal(test_state->signal_received, 1); assert_int_equal(test_state->signal_job_id, 1); assert_ptr_equal(test_state->signal_job_fn, increment_job); assert_ptr_equal(test_state->signal_job_fn_data, &counter); pthread_mutex_destroy(&counter.mutex); } /* Test: Add multiple jobs */ static void test_pthreadpool_add_multiple_jobs(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; struct mutex_int counter = {0}; int i; int timeout; int signal_received = 0; ret = pthreadpool_init(4, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Add multiple jobs */ for (i = 0; i < 10; i++) { ret = pthreadpool_add_job(test_state->pool, i, increment_job, &counter); assert_int_equal(ret, 0); } /* Wait for all jobs to complete */ timeout = 0; do { ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); signal_received = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); usleep(10000); /* 10ms */ timeout++; } while (signal_received < 10 && timeout < 100); /* Verify all jobs were executed */ assert_int_equal(counter.num, 10); assert_int_equal(test_state->signal_received, 10); pthread_mutex_destroy(&counter.mutex); } /* Test: Query queued jobs */ static void test_pthreadpool_queued_jobs(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; int sleep_duration = 100; /* 100ms */ size_t queued; int timeout; int signal_received; ret = pthreadpool_init(1, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); /* Initially no jobs */ queued = pthreadpool_queued_jobs(test_state->pool); assert_int_equal(queued, 0); /* Add a long-running job to occupy the thread */ ret = pthreadpool_add_job(test_state->pool, 1, sleep_job, &sleep_duration); assert_int_equal(ret, 0); /* Give the job a moment to start */ usleep(10000); /* 10ms */ /* Add more jobs that will be queued */ ret = pthreadpool_add_job(test_state->pool, 2, sleep_job, &sleep_duration); assert_int_equal(ret, 0); ret = pthreadpool_add_job(test_state->pool, 3, sleep_job, &sleep_duration); assert_int_equal(ret, 0); /* Check queued jobs count */ queued = pthreadpool_queued_jobs(test_state->pool); assert_true(queued >= 1); /* Wait for job completion (with timeout) */ timeout = 0; do { ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); signal_received = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); usleep(10000); /* 10ms */ timeout++; queued = pthreadpool_queued_jobs(test_state->pool); } while (signal_received < 3 && timeout < 100); } /* Test: Cancel a job */ static void test_pthreadpool_cancel_job(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; struct mutex_int counter = {0}; int sleep_duration = 100; /* 100ms */ size_t cancelled; int timeout; int signal_received; ret = pthreadpool_init(1, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Add a long-running job to occupy the thread */ ret = pthreadpool_add_job(test_state->pool, 1, sleep_job, &sleep_duration); assert_int_equal(ret, 0); /* Give the job a moment to start */ usleep(10000); /* 10ms */ /* Add jobs that will be queued */ ret = pthreadpool_add_job(test_state->pool, 2, increment_job, &counter); assert_int_equal(ret, 0); ret = pthreadpool_add_job(test_state->pool, 3, increment_job, &counter); assert_int_equal(ret, 0); /* Cancel the queued job */ cancelled = pthreadpool_cancel_job(test_state->pool, 2, increment_job, &counter); assert_true(cancelled >= 1); timeout = 0; do { ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); signal_received = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); usleep(10000); /* 10ms */ } while (timeout < 100 && signal_received != 2); /* The cancelled job should not have executed */ assert_true(counter.num < 2); pthread_mutex_destroy(&counter.mutex); } /* Test: Cancel multiple jobs */ static void test_pthreadpool_cancel_multiple_jobs(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; struct mutex_int counter = {0}; int sleep_duration = 100; /* 100ms */ size_t cancelled; int i; int timeout; int signal_received; size_t jobs; ret = pthreadpool_init(1, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Add a long-running job to occupy the thread */ ret = pthreadpool_add_job(test_state->pool, 1, sleep_job, &sleep_duration); assert_int_equal(ret, 0); /* Give the job a moment to start */ usleep(10000); /* 10ms */ /* Add multiple jobs with the same signature */ for (i = 0; i < 5; i++) { ret = pthreadpool_add_job(test_state->pool, 100, increment_job, &counter); assert_int_equal(ret, 0); } /* Cancel all jobs with the same signature */ cancelled = pthreadpool_cancel_job(test_state->pool, 100, increment_job, &counter); assert_true(cancelled >= 1); assert_true(cancelled <= 5); jobs = 6; /* long living job + 5 jobs with same signature */ jobs -= cancelled; /* adjust num jobs actually cancelled */ timeout = 0; /* wait until all jobs have completed (or timeout reached) */ do { ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); signal_received = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); usleep(10000); /* 10ms */ timeout++; } while (signal_received != jobs && timeout < 100); assert_true(signal_received == jobs); /* Some jobs should have been cancelled */ assert_true(counter.num < 5); pthread_mutex_destroy(&counter.mutex); } /* Test: Stop a pool */ static void test_pthreadpool_stop(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; struct mutex_int counter = {0}; int sleep_duration = 100; /* 100ms */ int initial_signals; int signal_received; ret = pthreadpool_init(1, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Add a job that's currently running */ ret = pthreadpool_add_job(test_state->pool, 1, sleep_job, &sleep_duration); assert_int_equal(ret, 0); /* Give the job a moment to start */ usleep(10000); /* 10ms */ /* Add more jobs that will be queued */ ret = pthreadpool_add_job(test_state->pool, 2, increment_job, &counter); assert_int_equal(ret, 0); ret = pthreadpool_add_job(test_state->pool, 3, increment_job, &counter); assert_int_equal(ret, 0); /* Stop the pool */ ret = pthreadpool_stop(test_state->pool); assert_int_equal(ret, 0); /* Wait a bit */ usleep(50000); /* 50ms */ ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); initial_signals = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); /* Add another job - should fail or be ignored */ ret = pthreadpool_add_job(test_state->pool, 4, increment_job, &counter); /* Wait to see if any more signals arrive */ usleep(50000); /* 50ms */ /* No new signals should arrive for queued jobs after stop */ /* (but the running job may complete) */ ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); signal_received = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); assert_true(signal_received <= initial_signals + 1); pthread_mutex_destroy(&counter.mutex); } /* Test: Destroy a pool */ static void test_pthreadpool_destroy(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; struct mutex_int counter = {0}; int sleep_duration = 10; /* 10ms */ int timeout; int signal_received; ret = pthreadpool_init(2, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Add a job */ ret = pthreadpool_add_job(test_state->pool, 1, sleep_job, &sleep_duration); assert_int_equal(ret, 0); /* Destroy the pool */ ret = pthreadpool_destroy(test_state->pool); assert_int_equal(ret, 0); pthread_mutex_destroy(&counter.mutex); /* ensure job is complete before exiting */ timeout = 0; do { ret = pthread_mutex_lock(&test_state->mutex); assert_int_equal(ret, 0); signal_received = test_state->signal_received; ret = pthread_mutex_unlock(&test_state->mutex); assert_int_equal(ret, 0); usleep(10000); /* 10ms */ timeout++; } while (signal_received == 0 && timeout < 100); test_state->pool = NULL; } /* Test: Pool with max_threads=0 (sync mode) */ static void test_pthreadpool_sync_mode(void **state) { struct test_state *test_state = talloc_get_type_abort( *state, struct test_state); int ret; struct mutex_int counter = {0}; /* Initialize with max_threads=0 for sync processing */ ret = pthreadpool_init(0, &test_state->pool, test_signal_fn, test_state); assert_int_equal(ret, 0); assert_int_equal(pthreadpool_max_threads(test_state->pool), 0); ret = pthread_mutex_init(&counter.mutex, NULL); assert_int_equal(ret, 0); /* Add a job - should be processed synchronously */ ret = pthreadpool_add_job(test_state->pool, 1, increment_job, &counter); assert_int_equal(ret, 0); /* In sync mode, the job might be executed immediately */ /* Wait a bit to allow signal to be processed */ usleep(50000); /* 50ms */ /* Verify job was processed */ assert_true(counter.num >= 0); pthread_mutex_destroy(&counter.mutex); } /* Main test runner */ int main(void) { const struct CMUnitTest tests[] = { cmocka_unit_test_setup_teardown(test_pthreadpool_init, setup, teardown), cmocka_unit_test_setup_teardown(test_pthreadpool_add_job_simple, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_add_multiple_jobs, setup, teardown), cmocka_unit_test_setup_teardown(test_pthreadpool_queued_jobs, setup, teardown), cmocka_unit_test_setup_teardown(test_pthreadpool_cancel_job, setup, teardown), cmocka_unit_test_setup_teardown( test_pthreadpool_cancel_multiple_jobs, setup, teardown), cmocka_unit_test_setup_teardown(test_pthreadpool_stop, setup, teardown), cmocka_unit_test_setup_teardown(test_pthreadpool_destroy, setup, teardown), cmocka_unit_test_setup_teardown(test_pthreadpool_sync_mode, setup, teardown), }; cmocka_set_message_output(CM_OUTPUT_SUBUNIT); return cmocka_run_group_tests(tests, NULL, NULL); }