From d762108ceaf09f67da8b27e2445f4d31f6f9b9dc Mon Sep 17 00:00:00 2001 From: Serhiy Katsyuba Date: Wed, 17 Jun 2026 13:00:13 +0200 Subject: [PATCH 1/3] userspace: proxy: use a separate worker per core Previously a single userspace worker (work queue) served requests for all userspace modules, regardless of the core they ran on, and the worker thread was re-pinned to the requesting module's core on every invocation. This actually only worked in single-core tests, but on multi-core pinning failed, because a thread can only be pinned while it is not running. Replace the single worker with a per-core worker array. Each worker thread is pinned to its core once, at creation time. Signed-off-by: Serhiy Katsyuba --- .../module_adapter/library/userspace_proxy.c | 107 +++++++++++------- 1 file changed, 65 insertions(+), 42 deletions(-) diff --git a/src/audio/module_adapter/library/userspace_proxy.c b/src/audio/module_adapter/library/userspace_proxy.c index 79e55bd4124e..d9ae3f1622cc 100644 --- a/src/audio/module_adapter/library/userspace_proxy.c +++ b/src/audio/module_adapter/library/userspace_proxy.c @@ -54,12 +54,16 @@ static const struct module_interface userspace_proxy_interface; #include #include -static inline int user_worker_get(void) +static inline int user_worker_get(int cpu) { + ARG_UNUSED(cpu); return 0; } -static inline void user_worker_put(void) { } +static inline void user_worker_put(int cpu) +{ + ARG_UNUSED(cpu); +} struct k_work_user *userspace_proxy_register_ipc_handler(struct processing_module *mod, struct k_event *event) @@ -86,9 +90,10 @@ struct k_work_user *userspace_proxy_register_ipc_handler(struct processing_modul * It invokes the appropriate module function in userspace context and writes the operation * result back into the work item. * - * There is only a single work queue, which is shared by all userspace modules. It is created - * dynamically when needed. Because SOF uses a single dedicated thread for handling IPC, there - * is no need to perform any additional serialization when accessing the worker. + * There is a separate work queue per core. Each core's work queue is shared by all + * userspace modules running on that core and is created dynamically when needed. A given + * core's worker is only accessed from that same core's IPC handling context, so there is no + * need to perform any additional serialization when accessing it. */ struct user_worker { k_tid_t thread_id; /* ipc worker thread ID */ @@ -98,70 +103,96 @@ struct user_worker { struct k_event event; }; -static struct user_worker worker; +static struct user_worker worker[CONFIG_CORE_COUNT]; -static int user_worker_get(void) +static int user_worker_get(int cpu) { - if (worker.reference_count) { - worker.reference_count++; + int ret = 0; + + assert(cpu >= 0 && cpu < (int)ARRAY_SIZE(worker)); + + if (worker[cpu].reference_count) { + worker[cpu].reference_count++; return 0; } - worker.stack_ptr = user_stack_allocate(CONFIG_SOF_USERSPACE_PROXY_WORKER_STACK_SIZE, - K_USER); - if (!worker.stack_ptr) { + worker[cpu].stack_ptr = user_stack_allocate(CONFIG_SOF_USERSPACE_PROXY_WORKER_STACK_SIZE, + K_USER); + if (!worker[cpu].stack_ptr) { tr_err(&userspace_proxy_tr, "Userspace worker stack allocation failed."); return -ENOMEM; } - k_event_init(&worker.event); - k_work_user_queue_start(&worker.work_queue, worker.stack_ptr, + k_event_init(&worker[cpu].event); + k_work_user_queue_start(&worker[cpu].work_queue, worker[cpu].stack_ptr, CONFIG_SOF_USERSPACE_PROXY_WORKER_STACK_SIZE, 0, NULL); - worker.thread_id = k_work_user_queue_thread_get(&worker.work_queue); + worker[cpu].thread_id = k_work_user_queue_thread_get(&worker[cpu].work_queue); - k_thread_access_grant(worker.thread_id, &worker.event); +#ifdef CONFIG_SCHED_CPU_MASK + /* + * k_work_user_queue_start() starts the worker thread immediately. + * We need to make sure it is not running when pinning it to a specific core. + */ + k_thread_suspend(worker[cpu].thread_id); + + /* Pin worker thread to the same core as the module */ + ret = k_thread_cpu_pin(worker[cpu].thread_id, cpu); + if (ret) { + tr_err(&userspace_proxy_tr, "Failed to pin worker to core %d, error: %d", + cpu, ret); + k_panic(); + } + + k_thread_resume(worker[cpu].thread_id); + +#elif CONFIG_CORE_COUNT > 1 +#error "CONFIG_SCHED_CPU_MASK is not enabled" +#endif + + k_thread_access_grant(worker[cpu].thread_id, &worker[cpu].event); + + worker[cpu].reference_count++; - worker.reference_count++; return 0; } -static void user_worker_put(void) +static void user_worker_put(int cpu) { + assert(cpu >= 0 && cpu < (int)ARRAY_SIZE(worker)); + /* Module removed so decrement counter */ - worker.reference_count--; + worker[cpu].reference_count--; /* Free worker resources if no more active user space modules */ - if (worker.reference_count == 0) { - k_thread_abort(worker.thread_id); - user_stack_free(worker.stack_ptr); + if (worker[cpu].reference_count == 0) { + k_thread_abort(worker[cpu].thread_id); + user_stack_free(worker[cpu].stack_ptr); } } #endif static int user_work_item_init(struct userspace_context *user_ctx, struct k_heap *user_heap) { + int cpu = cpu_get_id(); struct user_work_item *work_item = NULL; int ret; - ret = user_worker_get(); + ret = user_worker_get(cpu); if (ret) return ret; - /* We have only a single userspace IPC worker. It handles requests for all userspace - * modules, which may run on different cores. Because the worker processes work items - * coming from any core, the work item must be allocated in coherent memory. - */ + /* TODO: this can probably be allocated as cached? */ work_item = sof_heap_alloc(user_heap, SOF_MEM_FLAG_COHERENT, sizeof(*work_item), 0); if (!work_item) { - user_worker_put(); + user_worker_put(cpu); return -ENOMEM; } k_work_user_init(&work_item->work_item, userspace_proxy_worker_handler); #if !IS_ENABLED(CONFIG_SOF_USERSPACE_MOD_IPC_BY_DP_THREAD) - work_item->event = &worker.event; + work_item->event = &worker[cpu].event; #endif work_item->params.context = user_ctx; work_item->params.mod = NULL; @@ -173,7 +204,7 @@ static int user_work_item_init(struct userspace_context *user_ctx, struct k_heap static void user_work_item_free(struct userspace_context *user_ctx, struct k_heap *user_heap) { sof_heap_free(user_heap, user_ctx->work_item); - user_worker_put(); + user_worker_put(cpu_get_id()); } static inline struct module_params *user_work_get_params(struct userspace_context *user_ctx) @@ -193,7 +224,8 @@ static int userspace_proxy_invoke(struct userspace_context *user_ctx, uint32_t c #if IS_ENABLED(CONFIG_SOF_USERSPACE_MOD_IPC_BY_DP_THREAD) struct k_event * const event = user_ctx->dp_event; #else - struct k_event * const event = &worker.event; + int cpu = cpu_get_id(); + struct k_event * const event = &worker[cpu].event; #endif struct module_params *params = user_work_get_params(user_ctx); const uintptr_t ipc_req_buf = (uintptr_t)MAILBOX_HOSTBOX_BASE; @@ -216,22 +248,13 @@ static int userspace_proxy_invoke(struct userspace_context *user_ctx, uint32_t c #if !IS_ENABLED(CONFIG_SOF_USERSPACE_MOD_IPC_BY_DP_THREAD) /* Switch worker thread to module memory domain */ - ret = k_mem_domain_add_thread(user_ctx->comp_dom, worker.thread_id); + ret = k_mem_domain_add_thread(user_ctx->comp_dom, worker[cpu].thread_id); if (ret < 0) { tr_err(&userspace_proxy_tr, "Failed to switch memory domain, error: %d", ret); goto done; } -#ifdef CONFIG_SCHED_CPU_MASK - /* Pin worker thread to the same core as the module */ - ret = k_thread_cpu_pin(worker.thread_id, cpu_get_id()); - if (ret < 0) { - tr_err(&userspace_proxy_tr, "Failed to pin cpu, error: %d", ret); - goto done; - } -#endif - - ret = k_work_user_submit_to_queue(&worker.work_queue, &user_ctx->work_item->work_item); + ret = k_work_user_submit_to_queue(&worker[cpu].work_queue, &user_ctx->work_item->work_item); if (ret < 0) { tr_err(&userspace_proxy_tr, "Submit to queue error: %d", ret); goto done; From 5d3ee22578caa77f2d9d37c622ac8098d1e26320 Mon Sep 17 00:00:00 2001 From: Serhiy Katsyuba Date: Thu, 18 Jun 2026 17:18:59 +0200 Subject: [PATCH 2/3] userspace: proxy: allocate work item as cached The proxy now uses a separate worker per core. Each work item is allocated, submitted and processed on the same core, so cross-core coherency is no longer required. Signed-off-by: Serhiy Katsyuba --- src/audio/module_adapter/library/userspace_proxy.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/audio/module_adapter/library/userspace_proxy.c b/src/audio/module_adapter/library/userspace_proxy.c index d9ae3f1622cc..3cec54a19747 100644 --- a/src/audio/module_adapter/library/userspace_proxy.c +++ b/src/audio/module_adapter/library/userspace_proxy.c @@ -182,8 +182,7 @@ static int user_work_item_init(struct userspace_context *user_ctx, struct k_heap if (ret) return ret; - /* TODO: this can probably be allocated as cached? */ - work_item = sof_heap_alloc(user_heap, SOF_MEM_FLAG_COHERENT, sizeof(*work_item), 0); + work_item = sof_heap_alloc(user_heap, 0, sizeof(*work_item), 0); if (!work_item) { user_worker_put(cpu); return -ENOMEM; From 3fcc3fbb9c7c12fbc652dc363b040ccda0ed8f11 Mon Sep 17 00:00:00 2001 From: Serhiy Katsyuba Date: Tue, 23 Jun 2026 17:42:45 +0200 Subject: [PATCH 3/3] init worker from 0 core Signed-off-by: Serhiy Katsyuba --- .../module_adapter/library/userspace_proxy.c | 50 +++++++++---------- .../module_adapter/library/userspace_proxy.h | 34 +++++++++++++ src/init/init.c | 7 +++ zephyr/lib/cpu.c | 22 ++++++-- 4 files changed, 81 insertions(+), 32 deletions(-) diff --git a/src/audio/module_adapter/library/userspace_proxy.c b/src/audio/module_adapter/library/userspace_proxy.c index 3cec54a19747..c335f25bdfbd 100644 --- a/src/audio/module_adapter/library/userspace_proxy.c +++ b/src/audio/module_adapter/library/userspace_proxy.c @@ -103,16 +103,15 @@ struct user_worker { struct k_event event; }; -static struct user_worker worker[CONFIG_CORE_COUNT]; +static struct user_worker worker[CONFIG_CORE_COUNT] = { { 0 } }; -static int user_worker_get(int cpu) +int user_worker_create(int cpu) { - int ret = 0; - + assert(cpu_is_me(0)); assert(cpu >= 0 && cpu < (int)ARRAY_SIZE(worker)); - if (worker[cpu].reference_count) { - worker[cpu].reference_count++; + if (worker[cpu].stack_ptr) { + tr_err(&userspace_proxy_tr, "Userspace worker already created for core %d.", cpu); return 0; } @@ -130,45 +129,42 @@ static int user_worker_get(int cpu) worker[cpu].thread_id = k_work_user_queue_thread_get(&worker[cpu].work_queue); #ifdef CONFIG_SCHED_CPU_MASK - /* - * k_work_user_queue_start() starts the worker thread immediately. - * We need to make sure it is not running when pinning it to a specific core. - */ - k_thread_suspend(worker[cpu].thread_id); - - /* Pin worker thread to the same core as the module */ - ret = k_thread_cpu_pin(worker[cpu].thread_id, cpu); + int ret = k_thread_cpu_pin(worker[cpu].thread_id, cpu); if (ret) { tr_err(&userspace_proxy_tr, "Failed to pin worker to core %d, error: %d", cpu, ret); k_panic(); } - - k_thread_resume(worker[cpu].thread_id); - #elif CONFIG_CORE_COUNT > 1 #error "CONFIG_SCHED_CPU_MASK is not enabled" #endif k_thread_access_grant(worker[cpu].thread_id, &worker[cpu].event); - worker[cpu].reference_count++; - return 0; } -static void user_worker_put(int cpu) +void user_worker_free(int cpu) { assert(cpu >= 0 && cpu < (int)ARRAY_SIZE(worker)); + assert(worker[cpu].stack_ptr); - /* Module removed so decrement counter */ - worker[cpu].reference_count--; + k_thread_abort(worker[cpu].thread_id); + user_stack_free(worker[cpu].stack_ptr); + worker[cpu].stack_ptr = NULL; +} - /* Free worker resources if no more active user space modules */ - if (worker[cpu].reference_count == 0) { - k_thread_abort(worker[cpu].thread_id); - user_stack_free(worker[cpu].stack_ptr); - } +static int user_worker_get(int cpu) +{ + assert(cpu >= 0 && cpu < (int)ARRAY_SIZE(worker)); + assert(worker[cpu].stack_ptr); + + return 0; +} + +static void user_worker_put(int cpu) +{ + ARG_UNUSED(cpu); } #endif diff --git a/src/include/sof/audio/module_adapter/library/userspace_proxy.h b/src/include/sof/audio/module_adapter/library/userspace_proxy.h index 482590caf67f..d61e92f20e3c 100644 --- a/src/include/sof/audio/module_adapter/library/userspace_proxy.h +++ b/src/include/sof/audio/module_adapter/library/userspace_proxy.h @@ -81,4 +81,38 @@ struct k_work_user *userspace_proxy_register_ipc_handler(struct processing_modul #endif /* CONFIG_SOF_USERSPACE_PROXY */ +/* + * Per-core userspace IPC worker lifecycle. + * + * A userspace IPC worker is a user-mode work queue (and its worker thread) that + * services IPC requests targeting userspace modules running on a given core. The + * worker is created on, and managed from, the primary core. Configurations that do + * not use a dedicated per-core worker resolve these calls to no-ops. + */ +#if CONFIG_SOF_USERSPACE_PROXY && !CONFIG_SOF_USERSPACE_MOD_IPC_BY_DP_THREAD +/** + * Create the userspace IPC worker for the given core. + * + * Must be called while running on the primary core. The worker thread is pinned to + * the target core. Calling it again for a core that already has a worker is a no-op. + * + * @param cpu - target core id the worker is created for + * @return 0 on success, negative error code otherwise. + */ +int user_worker_create(int cpu); + +/** + * Free the userspace IPC worker for the given core. + * + * Must be called from the primary core while the target core is still running, as it + * aborts the worker thread. + * + * @param cpu - target core id whose worker is freed + */ +void user_worker_free(int cpu); +#else +static inline int user_worker_create(int cpu) { return 0; } +static inline void user_worker_free(int cpu) { } +#endif + #endif /* __SOF_AUDIO_USERSPACE_PROXY_H__ */ diff --git a/src/init/init.c b/src/init/init.c index 9a99c2d9c27b..b4f6b1463ed5 100644 --- a/src/init/init.c +++ b/src/init/init.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #if CONFIG_IPC_MAJOR_4 #include @@ -230,6 +231,12 @@ __cold static int primary_core_init(int argc, char *argv[], struct sof *sof) zephyr_ll_user_resources_init(); #endif + /* Create the userspace IPC worker for the primary core. Secondary cores get + * theirs from cpu_enable_core(). Resolves to a no-op when unused. + */ + if (user_worker_create(cpu_get_id()) < 0) + sof_panic(SOF_IPC_PANIC_MEM); + /* init the platform */ if (platform_init(sof) < 0) sof_panic(SOF_IPC_PANIC_PLATFORM); diff --git a/zephyr/lib/cpu.c b/zephyr/lib/cpu.c index 94f004bd7356..5b48304b29ba 100644 --- a/zephyr/lib/cpu.c +++ b/zephyr/lib/cpu.c @@ -22,6 +22,7 @@ static uint32_t mic_disable_status; #include #include #include +#include #include #include "../audio/copier/copier.h" @@ -230,6 +231,8 @@ void cpu_notify_state_exit(enum pm_state state) int cpu_enable_core(int id) { + int ret = 0; + /* only called from single core, no RMW lock */ __ASSERT_NO_MSG(cpu_is_primary(arch_proc_id())); /* @@ -247,15 +250,18 @@ int cpu_enable_core(int id) * initialization. By reinitializing the idle thread, we would overwrite the kernel structs * and the idle thread stack. */ - if (pm_state_next_get(id)->state == PM_STATE_ACTIVE) { + if (pm_state_next_get(id)->state == PM_STATE_ACTIVE) k_smp_cpu_start(id, secondary_init, NULL); - return 0; - } + else + k_smp_cpu_resume(id, secondary_init, NULL, true, false); - k_smp_cpu_resume(id, secondary_init, NULL, true, false); + /* The core is up now; create its userspace IPC worker on the primary core. + * Resolves to a no-op when no per-core userspace worker is used. + */ + ret = user_worker_create(id); #endif /* CONFIG_PM */ - return 0; + return ret; } void cpu_disable_core(int id) @@ -267,6 +273,12 @@ void cpu_disable_core(int id) tr_warn(&zephyr_tr, "core %d is already disabled", id); return; } + + /* Free the core's userspace IPC worker while the core is still running; the + * worker thread is aborted cross-core. Resolves to a no-op when unused. + */ + user_worker_free(id); + #if defined(CONFIG_PM) /* TODO: before requesting core shut down check if it's not actively used */ if (!pm_state_force(id, &(struct pm_state_info){PM_STATE_SOFT_OFF, 0, 0})) {