mq_receive/send: Return appropriate errnos and stop waiting if signal received.

git-svn-id: svn://svn.code.sf.net/p/nuttx/code/trunk@164 42af7a65-404d-4744-a932-0658087f49c3
This commit is contained in:
patacongo
2007-03-28 14:48:42 +00:00
parent c5971231da
commit 3e352ca10b
9 changed files with 527 additions and 260 deletions
+2
View File
@@ -91,6 +91,8 @@
0.2.3 2007-xx-xx Gregory Nutt <spudmonkey@racsa.co.cr> 0.2.3 2007-xx-xx Gregory Nutt <spudmonkey@racsa.co.cr>
* mq_receive and mq_send now return errno's appropriately
* mq_receive and mq_send are now correctly awakened by signals.
* Started m68322 * Started m68322
+46 -6
View File
@@ -37,13 +37,17 @@
* Included Files * Included Files
**************************************************************************/ **************************************************************************/
#include <nuttx/config.h>
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#include <string.h> #include <string.h>
#include <ctype.h> #include <ctype.h>
#include <fcntl.h> #include <fcntl.h>
#include <pthread.h> #include <pthread.h>
#include <mqueue.h> #include <mqueue.h>
#include <sched.h> #include <sched.h>
#include <errno.h>
#include "ostest.h" #include "ostest.h"
@@ -58,6 +62,13 @@
#define TEST_MSGLEN (strlen(TEST_MESSAGE)+1) #define TEST_MSGLEN (strlen(TEST_MESSAGE)+1)
#endif #endif
#define TEST_SEND_NMSGS (10)
#ifndef CONFIG_DISABLE_SIGNALS
# define TEST_RECEIVE_NMSGS (11)
#else
# define TEST_RECEIVE_NMSGS (10)
#endif
/************************************************************************** /**************************************************************************
* Private Types * Private Types
**************************************************************************/ **************************************************************************/
@@ -121,9 +132,9 @@ static void *sender_thread(void *arg)
memcpy(msg_buffer, TEST_MESSAGE, TEST_MSGLEN); memcpy(msg_buffer, TEST_MESSAGE, TEST_MSGLEN);
/* Perform the send 10 times */ /* Perform the send TEST_SEND_NMSGS times */
for (i = 0; i < 10; i++) for (i = 0; i < TEST_SEND_NMSGS; i++)
{ {
status = mq_send(mqfd, msg_buffer, TEST_MSGLEN, 42); status = mq_send(mqfd, msg_buffer, TEST_MSGLEN, 42);
if (status < 0) if (status < 0)
@@ -183,17 +194,28 @@ static void *receiver_thread(void *arg)
pthread_exit((pthread_addr_t)1); pthread_exit((pthread_addr_t)1);
} }
/* Perform the receive 10 times */ /* Perform the receive TEST_RECEIVE_NMSGS times */
for (i = 0; i < 10; i++) for (i = 0; i < TEST_RECEIVE_NMSGS; i++)
{ {
memset(msg_buffer, 0xaa, TEST_MSGLEN); memset(msg_buffer, 0xaa, TEST_MSGLEN);
nbytes = mq_receive(mqfd, msg_buffer, TEST_MSGLEN, 0); nbytes = mq_receive(mqfd, msg_buffer, TEST_MSGLEN, 0);
if (nbytes < 0) if (nbytes < 0)
{ {
printf("receiver_thread: ERROR mq_receive failure on msg %d\n", i); /* mq_receive failed. If the error is because of EINTR then
* it is not a failure.
*/
if (*get_errno_ptr() != EINTR)
{
printf("receiver_thread: ERROR mq_receive failure on msg %d, errno=%d\n", i, *get_errno_ptr());
nerrors++; nerrors++;
} }
else
{
printf("receiver_thread: mq_receive interrupted!\n", i);
}
}
else if (nbytes != TEST_MSGLEN) else if (nbytes != TEST_MSGLEN)
{ {
printf("receiver_thread: mq_receive return bad size %d on msg %d\n", nbytes, i); printf("receiver_thread: mq_receive return bad size %d on msg %d\n", nbytes, i);
@@ -336,8 +358,26 @@ void mqueue_test(void)
printf("mqueue_test: ERROR sender thread exited with %d errors\n", (int)result); printf("mqueue_test: ERROR sender thread exited with %d errors\n", (int)result);
} }
#ifndef CONFIG_DISABLE_SIGNALS
/* Wake up the receiver thread with a signal */
printf("mqueue_test: Killing receiver\n");
pthread_kill(receiver, 9);
#endif
/* Wait a bit to see if the thread exits on its own */
usleep(500*1000);
/* Then cancel the thread and see if it did */
printf("mqueue_test: Canceling receiver\n"); printf("mqueue_test: Canceling receiver\n");
pthread_cancel(receiver); status = pthread_cancel(receiver);
if (status == ESRCH)
{
printf("mqueue_test: receiver has already terminated\n");
}
pthread_join(receiver, &result); pthread_join(receiver, &result);
if (result != (void*)0) if (result != (void*)0)
{ {
+1 -1
View File
@@ -71,7 +71,7 @@ SIGNAL_SRCS = sig_initialize.c \
sig_cleanup.c sig_received.c sig_deliver.c sig_cleanup.c sig_received.c sig_deliver.c
MQUEUE_SRCS = mq_open.c mq_close.c mq_unlink.c mq_send.c mq_receive.c \ MQUEUE_SRCS = mq_open.c mq_close.c mq_unlink.c mq_send.c mq_receive.c \
mq_setattr.c mq_getattr.c mq_initialize.c mq_descreate.c \ mq_setattr.c mq_getattr.c mq_initialize.c mq_descreate.c \
mq_findnamed.c mq_msgfree.c mq_msgqfree.c mq_findnamed.c mq_msgfree.c mq_msgqfree.c mq_waitirq.c
ifneq ($(CONFIG_DISABLE_SIGNALS),y) ifneq ($(CONFIG_DISABLE_SIGNALS),y)
MQUEUE_SRCS += mq_notify.c MQUEUE_SRCS += mq_notify.c
endif endif
+5 -1
View File
@@ -181,7 +181,7 @@ extern "C" {
#define EXTERN extern #define EXTERN extern
#endif #endif
/* Functions defined in mq_initialized.c *******************/ /* Functions defined in mq_initialize.c ********************/
EXTERN void weak_function mq_initialize(void); EXTERN void weak_function mq_initialize(void);
EXTERN void mq_desblockalloc(void); EXTERN void mq_desblockalloc(void);
@@ -191,6 +191,10 @@ EXTERN FAR msgq_t *mq_findnamed(const char *mq_name);
EXTERN void mq_msgfree(FAR mqmsg_t *mqmsg); EXTERN void mq_msgfree(FAR mqmsg_t *mqmsg);
EXTERN void mq_msgqfree(FAR msgq_t *msgq); EXTERN void mq_msgqfree(FAR msgq_t *msgq);
/* mq_waitirq.c ********************************************/
EXTERN void mq_waitirq(FAR _TCB *wtcb);
#undef EXTERN #undef EXTERN
#ifdef __cplusplus #ifdef __cplusplus
} }
+48 -6
View File
@@ -43,6 +43,7 @@
#include <fcntl.h> /* O_NONBLOCK */ #include <fcntl.h> /* O_NONBLOCK */
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <errno.h>
#include <mqueue.h> #include <mqueue.h>
#include <sched.h> #include <sched.h>
#include <debug.h> #include <debug.h>
@@ -106,8 +107,17 @@
* priority. * priority.
* *
* Return Value: * Return Value:
* Length of the selected message in bytes, otherwise -1 * One success, the length of the selected message in bytes.is
* (ERROR). * returned. On failure, -1 (ERROR) is returned and the errno
* is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes'
* *
* Assumptions: * Assumptions:
* *
@@ -125,12 +135,27 @@ int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio)
/* Verify the input parameters */ /* Verify the input parameters */
sched_lock(); if (!msg || !mqdes)
if (msg && mqdes && (mqdes->oflags & O_RDOK) != 0 &&
msglen >= (size_t)mqdes->msgq->maxmsgsize)
{ {
*get_errno_ptr() = EINVAL;
return ERROR;
}
if ((mqdes->oflags & O_RDOK) == 0)
{
*get_errno_ptr() = EPERM;
return ERROR;
}
if (msglen < (size_t)mqdes->msgq->maxmsgsize)
{
*get_errno_ptr() = EMSGSIZE;
return ERROR;
}
/* Get a pointer to the message queue */ /* Get a pointer to the message queue */
sched_lock();
msgq = mqdes->msgq; msgq = mqdes->msgq;
/* Several operations must be performed below: We must determine if /* Several operations must be performed below: We must determine if
@@ -156,10 +181,28 @@ int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio)
rtcb = (FAR _TCB*)g_readytorun.head; rtcb = (FAR _TCB*)g_readytorun.head;
rtcb->msgwaitq = msgq; rtcb->msgwaitq = msgq;
msgq->nwaitnotempty++; msgq->nwaitnotempty++;
*get_errno_ptr() = OK;
up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY); up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* errno value (should be EINTR).
*/
if (*get_errno_ptr() != OK)
{
break;
}
} }
else else
{ {
/* The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by 'mqdes'.
*/
*get_errno_ptr() = EAGAIN;
break; break;
} }
} }
@@ -230,7 +273,6 @@ int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio)
irqrestore(saved_state); irqrestore(saved_state);
} }
} }
}
sched_unlock(); sched_unlock();
return ret; return ret;
+55 -10
View File
@@ -43,6 +43,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <mqueue.h> #include <mqueue.h>
#include <string.h> #include <string.h>
#include <errno.h>
#include <sched.h> #include <sched.h>
#include <debug.h> #include <debug.h>
#include <nuttx/arch.h> #include <nuttx/arch.h>
@@ -197,7 +198,18 @@ FAR mqmsg_t *mq_msgalloc(void)
* prio - The priority of the message * prio - The priority of the message
* *
* Return Value: * Return Value:
* None * On success, mq_send() returns0 (OK); on error, -1 (ERROR)
* is returned, with errno set to indicate the error:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was
* set for the message queue description referred to
* by mqdes.
* EINVAL Either msg or mqdes is NULL or the value of prio
* is invalid.
* EPERM Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute
* of the message queue.
* EINTR The call was interrupted by a signal handler.
* *
* Assumptions/restrictions: * Assumptions/restrictions:
* *
@@ -216,13 +228,27 @@ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio)
/* Verify the input parameters */ /* Verify the input parameters */
sched_lock(); if (!msg || !mqdes || prio < 0 || prio > MQ_PRIO_MAX)
if (msg && mqdes && (mqdes->oflags & O_WROK) != 0 &&
msglen > 0 && msglen <= (size_t)mqdes->msgq->maxmsgsize &&
prio >= 0 && prio <= MQ_PRIO_MAX)
{ {
*get_errno_ptr() = EINVAL;
return ERROR;
}
if ((mqdes->oflags & O_WROK) == 0)
{
*get_errno_ptr() = EPERM;
return ERROR;
}
if (msglen < 0 || msglen > (size_t)mqdes->msgq->maxmsgsize)
{
*get_errno_ptr() = EMSGSIZE;
return ERROR;
}
/* Get a pointer to the message queue */ /* Get a pointer to the message queue */
sched_lock();
msgq = mqdes->msgq; msgq = mqdes->msgq;
/* If we are sending a message from an interrupt handler, then /* If we are sending a message from an interrupt handler, then
@@ -251,6 +277,7 @@ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio)
{ {
/* No... We will return an error to the caller. */ /* No... We will return an error to the caller. */
*get_errno_ptr() = EAGAIN;
curr = NULL; curr = NULL;
} }
@@ -260,6 +287,8 @@ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio)
else else
{ {
boolean interrupted = FALSE;
/* Loop until there are fewer than max allowable messages in the /* Loop until there are fewer than max allowable messages in the
* receiving message queue * receiving message queue
*/ */
@@ -273,16 +302,33 @@ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio)
rtcb = (FAR _TCB*)g_readytorun.head; rtcb = (FAR _TCB*)g_readytorun.head;
rtcb->msgwaitq = msgq; rtcb->msgwaitq = msgq;
(msgq->nwaitnotfull)++; (msgq->nwaitnotfull)++;
up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL);
}
/* It should be okay to get add a message to the receiving *get_errno_ptr() = OK;
* message queue now. up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* errno value (should be EINTR).
*/ */
if (*get_errno_ptr() != OK)
{
interrupted = TRUE;
break;
}
}
/* If we were not interrupted, then it should be okay to add
* a message to the receiving message queue now.
*/
if (!interrupted)
{
curr = mq_msgalloc(); curr = mq_msgalloc();
} }
} }
}
/* We are not in an interrupt handler and the receiving message queue /* We are not in an interrupt handler and the receiving message queue
* is not full * is not full
@@ -402,7 +448,6 @@ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio)
irqrestore(saved_state); irqrestore(saved_state);
ret = OK; ret = OK;
} }
}
sched_unlock(); sched_unlock();
return(ret); return(ret);
+125
View File
@@ -0,0 +1,125 @@
/************************************************************
* mq_waitirq.c
*
* Copyright (C) 2007 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <spudmonkey@racsa.co.cr>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name Gregory Nutt nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
************************************************************/
/************************************************************
* Included Files
************************************************************/
#include <sys/types.h>
#include <sched.h>
#include <errno.h>
#include <nuttx/arch.h>
#include "sem_internal.h"
/************************************************************
* Compilation Switches
************************************************************/
/************************************************************
* Definitions
************************************************************/
/************************************************************
* Private Type Declarations
************************************************************/
/************************************************************
* Global Variables
************************************************************/
/************************************************************
* Private Variables
************************************************************/
/************************************************************
* Private Functions
************************************************************/
/************************************************************
* Public Functions
************************************************************/
/************************************************************
* Function: sem_waitirq
*
* Description:
* This function is called when a signal is received by a
* task that is waiting on a message queue -- either for a
* queue to becoming not full (on mq_send) or not empty
* (on mq_receive).
*
* Parameters:
* wtcb - A pointer to the TCB of the task that is waiting
* on a message queue, but has received a signal instead.
*
* Return Value:
* None
*
* Assumptions:
*
************************************************************/
void mq_waitirq(FAR _TCB *wtcb)
{
irqstate_t saved_state;
/* Disable interrupts. This is necessary because an
* interrupt handler may attempt to send a message while we are
* doing this.
*/
saved_state = irqsave();
/* It is possible that an interrupt/context switch beat us to the
* punch and already changed the task's state.
*/
if (wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY ||
wtcb->task_state == TSTATE_WAIT_MQNOTFULL)
{
/* Mark the errno value for the thread. */
wtcb->errno = EINTR;
/* Restart the the task. */
up_unblock_task(wtcb);
}
/* Interrupts may now be enabled. */
irqrestore(saved_state);
}
+3 -1
View File
@@ -156,11 +156,13 @@ int sem_wait(sem_t *sem)
/* Add the TCB to the prioritized semaphore wait queue */ /* Add the TCB to the prioritized semaphore wait queue */
*get_errno_ptr() = 0;
up_block_task(rtcb, TSTATE_WAIT_SEM); up_block_task(rtcb, TSTATE_WAIT_SEM);
/* When we resume at this point, either (1) the semaphore has been /* When we resume at this point, either (1) the semaphore has been
* assigned to this thread of execution, or (2) the semaphore wait * assigned to this thread of execution, or (2) the semaphore wait
* has been interrupted by a signal. * has been interrupted by a signal. We can detect the latter case
* be examining the errno value.
*/ */
if (*get_errno_ptr() != EINTR) if (*get_errno_ptr() != EINTR)
+8 -1
View File
@@ -47,6 +47,7 @@
#include "os_internal.h" #include "os_internal.h"
#include "sem_internal.h" #include "sem_internal.h"
#include "sig_internal.h" #include "sig_internal.h"
#include "mq_internal.h"
/************************************************************ /************************************************************
* Definitions * Definitions
@@ -380,7 +381,13 @@ int sig_received(FAR _TCB *stcb, siginfo_t *info)
* task must be unblocked when a signal is received. * task must be unblocked when a signal is received.
*/ */
/* NOT YET IMPLEMENTED. */ #ifndef CONFIG_DISABLE_MQUEUE
if (stcb->task_state == TSTATE_WAIT_MQNOTEMPTY ||
stcb->task_state == TSTATE_WAIT_MQNOTFULL)
{
mq_waitirq(stcb);
}
#endif
} }
return ret; return ret;