diff --git a/components/drivers/include/ipc/workqueue.h b/components/drivers/include/ipc/workqueue.h index 23e256c420..11fee489bd 100644 --- a/components/drivers/include/ipc/workqueue.h +++ b/components/drivers/include/ipc/workqueue.h @@ -29,6 +29,7 @@ enum struct rt_workqueue { rt_list_t work_list; + rt_list_t delayed_list; struct rt_work *work_current; /* current work */ struct rt_semaphore sem; @@ -62,6 +63,7 @@ rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work); rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time); rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work); rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work); +rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue); #ifdef RT_USING_SYSTEM_WORKQUEUE rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t time); diff --git a/components/drivers/src/workqueue.c b/components/drivers/src/workqueue.c index 2b5630d8d0..7aea647703 100644 --- a/components/drivers/src/workqueue.c +++ b/components/drivers/src/workqueue.c @@ -58,15 +58,17 @@ static void _workqueue_thread_entry(void *parameter) while (1) { + level = rt_hw_interrupt_disable(); if (rt_list_isempty(&(queue->work_list))) { /* no software timer exist, suspend self. */ rt_thread_suspend(rt_thread_self()); + rt_hw_interrupt_enable(level); rt_schedule(); + continue; } /* we have work to do with. */ - level = rt_hw_interrupt_disable(); work = rt_list_entry(queue->work_list.next, struct rt_work, list); rt_list_remove(&(work->list)); queue->work_current = work; @@ -76,175 +78,134 @@ static void _workqueue_thread_entry(void *parameter) /* do work */ work->work_func(work, work->work_data); - level = rt_hw_interrupt_disable(); /* clean current work */ queue->work_current = RT_NULL; - rt_hw_interrupt_enable(level); /* ack work completion */ _workqueue_work_completion(queue); } } -static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work) +static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, + struct rt_work *work, rt_tick_t ticks) { rt_base_t level; + rt_err_t err; level = rt_hw_interrupt_disable(); - if (work->flags & RT_WORK_STATE_PENDING) - { - rt_hw_interrupt_enable(level); - return -RT_EBUSY; - } - - if (queue->work_current == work) - { - rt_hw_interrupt_enable(level); - return -RT_EBUSY; - } - - /* NOTE: the work MUST be initialized firstly */ + /* remove list */ rt_list_remove(&(work->list)); - - rt_list_insert_after(queue->work_list.prev, &(work->list)); - work->flags |= RT_WORK_STATE_PENDING; - - /* whether the workqueue is doing work */ - if (queue->work_current == RT_NULL) + work->flags &= ~RT_WORK_STATE_PENDING; + /* */ + if (ticks == 0) { + if (queue->work_current != work) + { + rt_list_insert_after(queue->work_list.prev, &(work->list)); + work->flags |= RT_WORK_STATE_PENDING; + work->workqueue = queue; + err = RT_EOK; + } + else + { + err = -RT_EBUSY; + } + + /* whether the workqueue is doing work */ + if (queue->work_current == RT_NULL && + ((queue->work_thread->stat & RT_THREAD_STAT_MASK) == RT_THREAD_SUSPEND)) + { + /* resume work thread */ + rt_thread_resume(queue->work_thread); + rt_hw_interrupt_enable(level); + rt_schedule(); + } + else + { + rt_hw_interrupt_enable(level); + } + return err; + } + else if (ticks < RT_TICK_MAX / 2) + { + /* Timer started */ + if (work->flags & RT_WORK_STATE_SUBMITTING) + { + rt_timer_stop(&work->timer); + rt_timer_control(&work->timer, RT_TIMER_CTRL_SET_TIME, &ticks); + } + else + { + rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, + work, ticks, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER); + work->flags |= RT_WORK_STATE_SUBMITTING; + } + work->workqueue = queue; + /* insert delay work list */ + rt_list_insert_after(queue->delayed_list.prev, &(work->list)); rt_hw_interrupt_enable(level); + rt_timer_start(&(work->timer)); + return RT_EOK; + } + rt_hw_interrupt_enable(level); + return -RT_ERROR; +} + +static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work) +{ + rt_base_t level; + rt_err_t err; + + level = rt_hw_interrupt_disable(); + rt_list_remove(&(work->list)); + work->flags &= ~RT_WORK_STATE_PENDING; + /* Timer started */ + if (work->flags & RT_WORK_STATE_SUBMITTING) + { + rt_timer_stop(&(work->timer)); + rt_timer_detach(&(work->timer)); + work->flags &= ~RT_WORK_STATE_SUBMITTING; + } + err = queue->work_current != work ? RT_EOK : -RT_EBUSY; + work->workqueue = RT_NULL; + rt_hw_interrupt_enable(level); + return err; +} + +static void _delayed_work_timeout_handler(void *parameter) +{ + struct rt_work *work; + struct rt_workqueue *queue; + rt_base_t level; + + work = (struct rt_work *)parameter; + queue = work->workqueue; + RT_ASSERT(queue != RT_NULL); + + level = rt_hw_interrupt_disable(); + rt_timer_detach(&(work->timer)); + work->flags &= ~RT_WORK_STATE_SUBMITTING; + /* remove delay list */ + rt_list_remove(&(work->list)); + /* insert work queue */ + if (queue->work_current != work) + { + rt_list_insert_after(queue->work_list.prev, &(work->list)); + work->flags |= RT_WORK_STATE_PENDING; + } + /* whether the workqueue is doing work */ + if (queue->work_current == RT_NULL && + ((queue->work_thread->stat & RT_THREAD_STAT_MASK) == RT_THREAD_SUSPEND)) + { /* resume work thread */ rt_thread_resume(queue->work_thread); + rt_hw_interrupt_enable(level); rt_schedule(); } else { rt_hw_interrupt_enable(level); } - - return RT_EOK; -} - -static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work) -{ - rt_base_t level; - - level = rt_hw_interrupt_disable(); - if (queue->work_current == work) - { - rt_hw_interrupt_enable(level); - return -RT_EBUSY; - } - rt_list_remove(&(work->list)); - work->flags &= ~RT_WORK_STATE_PENDING; - rt_hw_interrupt_enable(level); - - return RT_EOK; -} - -static rt_err_t _workqueue_cancel_delayed_work(struct rt_work *work) -{ - rt_base_t level; - int ret = RT_EOK; - - if (!work->workqueue) - { - ret = -EINVAL; - goto __exit; - } - - if (work->flags & RT_WORK_STATE_PENDING) - { - /* Remove from the queue if already submitted */ - ret = _workqueue_cancel_work(work->workqueue, work); - if (ret) - { - goto __exit; - } - } - else - { - if (work->flags & RT_WORK_STATE_SUBMITTING) - { - level = rt_hw_interrupt_disable(); - rt_timer_stop(&(work->timer)); - rt_timer_detach(&(work->timer)); - work->flags &= ~RT_WORK_STATE_SUBMITTING; - rt_hw_interrupt_enable(level); - } - } - - level = rt_hw_interrupt_disable(); - /* Detach from workqueue */ - work->workqueue = RT_NULL; - work->flags &= ~(RT_WORK_STATE_PENDING); - rt_hw_interrupt_enable(level); - -__exit: - return ret; -} - -static rt_err_t _workqueue_submit_delayed_work(struct rt_workqueue *queue, - struct rt_work *work, rt_tick_t ticks) -{ - rt_base_t level; - rt_err_t ret = RT_EOK; - - /* Work cannot be active in multiple queues */ - if (work->workqueue && work->workqueue != queue) - { - ret = -RT_EINVAL; - goto __exit; - } - - /* Cancel if work has been submitted */ - if (work->workqueue == queue) - { - ret = _workqueue_cancel_delayed_work(work); - if (ret < 0) - { - goto __exit; - } - } - - level = rt_hw_interrupt_disable(); - /* Attach workqueue so the timeout callback can submit it */ - work->workqueue = queue; - rt_hw_interrupt_enable(level); - - if (!ticks) - { - /* Submit work if no ticks is 0 */ - ret = _workqueue_submit_work(work->workqueue, work); - } - else - { - level = rt_hw_interrupt_disable(); - /* Add timeout */ - work->flags |= RT_WORK_STATE_SUBMITTING; - rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, work, ticks, - RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER); - rt_hw_interrupt_enable(level); - rt_timer_start(&(work->timer)); - } - -__exit: - return ret; -} - -static void _delayed_work_timeout_handler(void *parameter) -{ - struct rt_work *delayed_work; - rt_base_t level; - - delayed_work = (struct rt_work *)parameter; - level = rt_hw_interrupt_disable(); - rt_timer_stop(&(delayed_work->timer)); - rt_timer_detach(&(delayed_work->timer)); - delayed_work->flags &= ~RT_WORK_STATE_SUBMITTING; - delayed_work->type &= ~RT_WORK_TYPE_DELAYED; - rt_hw_interrupt_enable(level); - _workqueue_submit_work(delayed_work->workqueue, delayed_work); } struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority) @@ -256,6 +217,7 @@ struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_siz { /* initialize work list */ rt_list_init(&(queue->work_list)); + rt_list_init(&(queue->delayed_list)); queue->work_current = RT_NULL; rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO); @@ -277,7 +239,9 @@ rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue) { RT_ASSERT(queue != RT_NULL); + rt_workqueue_cancel_all_work(queue); rt_thread_delete(queue->work_thread); + rt_sem_detach(&(queue->sem)); RT_KERNEL_FREE(queue); return RT_EOK; @@ -288,7 +252,7 @@ rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work) RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); - return _workqueue_submit_work(queue, work); + return _workqueue_submit_work(queue, work, 0); } rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time) @@ -296,19 +260,7 @@ rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *wo RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); - if (time > 0) - { - work->type |= RT_WORK_TYPE_DELAYED; - } - - if (work->type & RT_WORK_TYPE_DELAYED) - { - return _workqueue_submit_delayed_work(queue, work, time); - } - else - { - return _workqueue_submit_work(queue, work); - } + return _workqueue_submit_work(queue, work, time); } rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work) @@ -318,51 +270,38 @@ rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work * RT_ASSERT(work != RT_NULL); level = rt_hw_interrupt_disable(); - if (queue->work_current == work) - { - rt_hw_interrupt_enable(level); - return -RT_EBUSY; - } - /* NOTE: the work MUST be initialized firstly */ rt_list_remove(&(work->list)); - - rt_list_insert_after(queue->work_list.prev, &(work->list)); - if (queue->work_current == RT_NULL) + rt_list_insert_after(&queue->work_list, &(work->list)); + /* whether the workqueue is doing work */ + if (queue->work_current == RT_NULL && + ((queue->work_thread->stat & RT_THREAD_STAT_MASK) == RT_THREAD_SUSPEND)) { - rt_hw_interrupt_enable(level); /* resume work thread */ rt_thread_resume(queue->work_thread); + rt_hw_interrupt_enable(level); rt_schedule(); } - else rt_hw_interrupt_enable(level); + else + { + rt_hw_interrupt_enable(level); + } return RT_EOK; } rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work) { - RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); - - if (work->type & RT_WORK_TYPE_DELAYED) - { - return _workqueue_cancel_delayed_work(work); - } - else - { - return _workqueue_cancel_work(queue, work); - } + RT_ASSERT(queue != RT_NULL); + return _workqueue_cancel_work(queue, work); } rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work) { - rt_base_t level; - RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); - level = rt_hw_interrupt_disable(); if (queue->work_current == work) /* it's current work in the queue */ { /* wait for work completion */ @@ -370,24 +309,30 @@ rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_wor } else { - rt_list_remove(&(work->list)); + _workqueue_cancel_work(queue, work); } - work->flags &= ~RT_WORK_STATE_PENDING; - rt_hw_interrupt_enable(level); return RT_EOK; } rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue) { - struct rt_list_node *node, *next; + struct rt_work *work; + RT_ASSERT(queue != RT_NULL); + /* cancel work */ rt_enter_critical(); - for (node = queue->work_list.next; node != &(queue->work_list); node = next) + while (rt_list_isempty(&queue->work_list) == RT_FALSE) { - next = node->next; - rt_list_remove(node); + work = rt_list_first_entry(&queue->work_list, struct rt_work, list); + _workqueue_cancel_work(queue, work); + } + /* cancel delay work */ + while (rt_list_isempty(&queue->delayed_list) == RT_FALSE) + { + work = rt_list_first_entry(&queue->delayed_list, struct rt_work, list); + _workqueue_cancel_work(queue, work); } rt_exit_critical(); @@ -416,14 +361,14 @@ rt_err_t rt_work_cancel(struct rt_work *work) int rt_work_sys_workqueue_init(void) { if (sys_workq != RT_NULL) - return 0; + return RT_EOK; sys_workq = rt_workqueue_create("sys_work", RT_SYSTEM_WORKQUEUE_STACKSIZE, RT_SYSTEM_WORKQUEUE_PRIORITY); + RT_ASSERT(sys_workq != RT_NULL); return RT_EOK; } - INIT_PREV_EXPORT(rt_work_sys_workqueue_init); #endif #endif