wqueue: add work_cancel_sync() support

Signed-off-by: ligd <liguiding1@xiaomi.com>
This commit is contained in:
ligd
2023-08-28 19:29:54 +08:00
committed by Xiang Xiao
parent 978c8bd249
commit 61ef7eb3dc
4 changed files with 134 additions and 14 deletions
+23
View File
@@ -383,6 +383,29 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
int work_cancel(int qid, FAR struct work_s *work);
/****************************************************************************
* Name: work_cancel_sync
*
* Description:
* Blocked cancel previously queued user-mode work. This removes work
* from the user mode work queue. After work has been cancelled, it may
* be requeued by calling work_queue() again.
*
* Input Parameters:
* qid - The work queue ID (must be HPWORK or LPWORK)
* work - The previously queued work structure to cancel
*
* Returned Value:
* Zero (OK) on success, a negated errno on failure. This error may be
* reported:
*
* -ENOENT - There is no such work queued.
* -EINVAL - An invalid work queue was specified
*
****************************************************************************/
int work_cancel_sync(int qid, FAR struct work_s *work);
/****************************************************************************
* Name: work_foreach
*
+73 -5
View File
@@ -49,8 +49,11 @@
* work_queue() again.
*
* Input Parameters:
* qid - The work queue ID
* work - The previously queued work structure to cancel
* wqueue - The work queue to use. Must be HPWORK or LPWORK
* nthread - The number of threads in the work queue
* > 0 unsynchronous cancel
* < 0 synchronous cancel
* work - The previously queued work structure to cancel
*
* Returned Value:
* Zero (OK) on success, a negated errno on failure. This error may be
@@ -61,7 +64,7 @@
*
****************************************************************************/
static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread,
FAR struct work_s *work)
{
irqstate_t flags;
@@ -93,6 +96,21 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
work->worker = NULL;
ret = OK;
}
else if (nthread > 0)
{
int wndx;
for (wndx = 0; wndx < nthread; wndx++)
{
if (wqueue->worker[wndx].work == work &&
wqueue->worker[wndx].pid != nxsched_gettid())
{
nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait);
ret = OK;
break;
}
}
}
leave_critical_section(flags);
return ret;
@@ -130,7 +148,8 @@ int work_cancel(int qid, FAR struct work_s *work)
{
/* Cancel high priority work */
return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork, work);
return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork,
-1, work);
}
else
#endif
@@ -139,7 +158,56 @@ int work_cancel(int qid, FAR struct work_s *work)
{
/* Cancel low priority work */
return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork, work);
return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork,
-1, work);
}
else
#endif
{
return -EINVAL;
}
}
/****************************************************************************
* Name: work_cancel_sync
*
* Description:
* Blocked cancel previously queued user-mode work. This removes work
* from the user mode work queue. After work has been cancelled, it may
* be requeued by calling work_queue() again.
*
* Input Parameters:
* qid - The work queue ID (must be HPWORK or LPWORK)
* work - The previously queued work structure to cancel
*
* Returned Value:
* Zero (OK) on success, a negated errno on failure. This error may be
* reported:
*
* -ENOENT - There is no such work queued.
* -EINVAL - An invalid work queue was specified
*
****************************************************************************/
int work_cancel_sync(int qid, FAR struct work_s *work)
{
#ifdef CONFIG_SCHED_HPWORK
if (qid == HPWORK)
{
/* Cancel high priority work */
return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork,
CONFIG_SCHED_HPNTHREADS, work);
}
else
#endif
#ifdef CONFIG_SCHED_LPWORK
if (qid == LPWORK)
{
/* Cancel low priority work */
return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork,
CONFIG_SCHED_LPNTHREADS, work);
}
else
#endif
+36 -9
View File
@@ -126,13 +126,19 @@ struct lp_wqueue_s g_lpwork =
static int work_thread(int argc, FAR char *argv[])
{
FAR struct kwork_wqueue_s *wqueue;
FAR struct kworker_s *kworker;
FAR struct work_s *work;
worker_t worker;
irqstate_t flags;
FAR void *arg;
int semcount;
wqueue = (FAR struct kwork_wqueue_s *)
((uintptr_t)strtoul(argv[1], NULL, 16));
/* Get the handle from argv */
wqueue = (FAR struct kwork_wqueue_s *)
((uintptr_t)strtoul(argv[1], NULL, 0));
kworker = (FAR struct kworker_s *)
((uintptr_t)strtoul(argv[2], NULL, 0));
flags = enter_critical_section();
@@ -168,6 +174,10 @@ static int work_thread(int argc, FAR char *argv[])
work->worker = NULL;
/* Mark the thread busy */
kworker->work = work;
/* Do the work. Re-enable interrupts while the work is being
* performed... we don't have any idea how long this will take!
*/
@@ -175,6 +185,18 @@ static int work_thread(int argc, FAR char *argv[])
leave_critical_section(flags);
CALL_WORKER(worker, arg);
flags = enter_critical_section();
/* Mark the thread un-busy */
kworker->work = NULL;
/* Check if someone is waiting, if so, wakeup it */
nxsem_get_value(&kworker->wait, &semcount);
while (semcount++ < 0)
{
nxsem_post(&kworker->wait);
}
}
/* Then process queued work. work_process will not return until: (1)
@@ -213,15 +235,12 @@ static int work_thread_create(FAR const char *name, int priority,
int stack_size, int nthread,
FAR struct kwork_wqueue_s *wqueue)
{
FAR char *argv[2];
char args[32];
FAR char *argv[3];
char arg0[32];
char arg1[32];
int wndx;
int pid;
snprintf(args, sizeof(args), "%p", wqueue);
argv[0] = args;
argv[1] = NULL;
/* Don't permit any of the threads to run until we have fully initialized
* g_hpwork and g_lpwork.
*/
@@ -230,6 +249,14 @@ static int work_thread_create(FAR const char *name, int priority,
for (wndx = 0; wndx < nthread; wndx++)
{
nxsem_init(&wqueue->worker[wndx].wait, 0, 0);
snprintf(arg0, sizeof(arg0), "%p", wqueue);
snprintf(arg1, sizeof(arg1), "%p", &wqueue->worker[wndx]);
argv[0] = arg0;
argv[1] = arg1;
argv[2] = NULL;
pid = kthread_create(name, priority, stack_size,
work_thread, argv);
@@ -241,7 +268,7 @@ static int work_thread_create(FAR const char *name, int priority,
return pid;
}
wqueue->worker[wndx].pid = pid;
wqueue->worker[wndx].pid = pid;
}
sched_unlock();
+2
View File
@@ -53,6 +53,8 @@
struct kworker_s
{
pid_t pid; /* The task ID of the worker thread */
FAR struct work_s *work; /* The work structure */
sem_t wait; /* Sync waiting for worker done */
};
/* This structure defines the state of one kernel-mode work queue */