diff --git a/drivers/pipes/pipe.c b/drivers/pipes/pipe.c index 8f2c4e8ea25..61918b8f5e9 100644 --- a/drivers/pipes/pipe.c +++ b/drivers/pipes/pipe.c @@ -285,7 +285,6 @@ int pipe2(int fd[2], int flags) { char devname[32]; int ret; - bool blocking; /* Register a new pipe device */ @@ -296,10 +295,6 @@ int pipe2(int fd[2], int flags) return ERROR; } - /* Check for the O_NONBLOCK bit on flags */ - - blocking = (flags & O_NONBLOCK) == 0; - /* Get a write file descriptor setting O_NONBLOCK temporarily */ fd[1] = open(devname, O_WRONLY | O_NONBLOCK | flags); @@ -310,7 +305,7 @@ int pipe2(int fd[2], int flags) /* Clear O_NONBLOCK if it was set previously */ - if (blocking) + if ((flags & O_NONBLOCK) == 0) { ret = fcntl(fd[1], F_SETFL, flags & (~O_NONBLOCK)); if (ret < 0) diff --git a/drivers/pipes/pipe_common.c b/drivers/pipes/pipe_common.c index f5df0007230..daf3eb3e7af 100644 --- a/drivers/pipes/pipe_common.c +++ b/drivers/pipes/pipe_common.c @@ -65,18 +65,16 @@ ****************************************************************************/ /**************************************************************************** - * Name: pipecommon_bufferused + * Name: pipecommon_wakeup ****************************************************************************/ -static pipe_ndx_t pipecommon_bufferused(FAR struct pipe_dev_s *dev) +static void pipecommon_wakeup(FAR sem_t *sem) { - if (dev->d_wrndx >= dev->d_rdndx) + int sval; + + while (nxsem_get_value(sem, &sval) == OK && sval <= 0) { - return dev->d_wrndx - dev->d_rdndx; - } - else - { - return dev->d_bufsize + dev->d_wrndx - dev->d_rdndx; + nxsem_post(sem); } } @@ -105,7 +103,7 @@ FAR struct pipe_dev_s *pipecommon_allocdev(size_t bufsize) nxmutex_init(&dev->d_bflock); nxsem_init(&dev->d_rdsem, 0, 0); nxsem_init(&dev->d_wrsem, 0, 0); - dev->d_bufsize = bufsize + 1; /* +1 to compensate the full indicator */ + dev->d_bufsize = bufsize; } return dev; @@ -131,7 +129,6 @@ int pipecommon_open(FAR struct file *filep) { FAR struct inode *inode = filep->f_inode; FAR struct pipe_dev_s *dev = inode->i_private; - int sval; int ret; DEBUGASSERT(dev != NULL); @@ -153,13 +150,13 @@ int pipecommon_open(FAR struct file *filep) * is first opened. */ - if (inode->i_crefs == 1 && dev->d_buffer == NULL) + if (inode->i_crefs == 1 && !circbuf_is_init(&dev->d_buffer)) { - dev->d_buffer = (FAR uint8_t *)kmm_malloc(dev->d_bufsize); - if (!dev->d_buffer) + ret = circbuf_init(&dev->d_buffer, NULL, dev->d_bufsize); + if (ret < 0) { nxmutex_unlock(&dev->d_bflock); - return -ENOMEM; + return ret; } } @@ -178,17 +175,14 @@ int pipecommon_open(FAR struct file *filep) if (dev->d_nwriters == 1) { - while (nxsem_get_value(&dev->d_rdsem, &sval) == 0 && sval <= 0) - { - nxsem_post(&dev->d_rdsem); - } + pipecommon_wakeup(&dev->d_rdsem); } } while ((filep->f_oflags & O_NONBLOCK) == 0 && /* Blocking */ (filep->f_oflags & O_RDWR) == O_WRONLY && /* Write-only */ dev->d_nreaders < 1 && /* No readers on the pipe */ - dev->d_wrndx == dev->d_rdndx) /* Buffer is empty */ + circbuf_is_empty(&dev->d_buffer)) /* Buffer is empty */ { /* If opened for write-only, then wait for at least one reader * on the pipe. @@ -245,17 +239,14 @@ int pipecommon_open(FAR struct file *filep) if (dev->d_nreaders == 1) { - while (nxsem_get_value(&dev->d_wrsem, &sval) == 0 && sval <= 0) - { - nxsem_post(&dev->d_wrsem); - } + pipecommon_wakeup(&dev->d_wrsem); } } while ((filep->f_oflags & O_NONBLOCK) == 0 && /* Blocking */ (filep->f_oflags & O_RDWR) == O_RDONLY && /* Read-only */ dev->d_nwriters < 1 && /* No writers on the pipe */ - dev->d_wrndx == dev->d_rdndx) /* Buffer is empty */ + circbuf_is_empty(&dev->d_buffer)) /* Buffer is empty */ { /* If opened for read-only, then wait for either at least one writer * on the pipe. @@ -311,7 +302,6 @@ int pipecommon_close(FAR struct file *filep) { FAR struct inode *inode = filep->f_inode; FAR struct pipe_dev_s *dev = inode->i_private; - int sval; int ret; DEBUGASSERT(dev && filep->f_inode->i_crefs > 0); @@ -353,10 +343,7 @@ int pipecommon_close(FAR struct file *filep) poll_notify(dev->d_fds, CONFIG_DEV_PIPE_NPOLLWAITERS, POLLHUP); - while (nxsem_get_value(&dev->d_rdsem, &sval) == 0 && sval <= 0) - { - nxsem_post(&dev->d_rdsem); - } + pipecommon_wakeup(&dev->d_rdsem); } } @@ -374,11 +361,7 @@ int pipecommon_close(FAR struct file *filep) poll_notify(dev->d_fds, CONFIG_DEV_PIPE_NPOLLWAITERS, POLLERR); - while (nxsem_get_value(&dev->d_wrsem, &sval) == 0 && - sval <= 0) - { - nxsem_post(&dev->d_wrsem); - } + pipecommon_wakeup(&dev->d_wrsem); } } } @@ -390,17 +373,15 @@ int pipecommon_close(FAR struct file *filep) * obtained when the pipe is re-opened. */ - else if (PIPE_IS_POLICY_0(dev->d_flags) || dev->d_wrndx == dev->d_rdndx) + else if (PIPE_IS_POLICY_0(dev->d_flags) || + circbuf_is_empty(&dev->d_buffer)) { /* Policy 0 or the buffer is empty ... deallocate the buffer now. */ - kmm_free(dev->d_buffer); - dev->d_buffer = NULL; + circbuf_uninit(&dev->d_buffer); /* And reset all counts and indices */ - dev->d_wrndx = 0; - dev->d_rdndx = 0; dev->d_nwriters = 0; dev->d_nreaders = 0; @@ -429,11 +410,7 @@ ssize_t pipecommon_read(FAR struct file *filep, FAR char *buffer, size_t len) { FAR struct inode *inode = filep->f_inode; FAR struct pipe_dev_s *dev = inode->i_private; -#ifdef CONFIG_DEV_PIPEDUMP - FAR uint8_t *start = (FAR uint8_t *)buffer; -#endif ssize_t nread = 0; - int sval; int ret; DEBUGASSERT(dev); @@ -457,7 +434,7 @@ ssize_t pipecommon_read(FAR struct file *filep, FAR char *buffer, size_t len) /* If the pipe is empty, then wait for something to be written to it */ - while (dev->d_wrndx == dev->d_rdndx) + while (circbuf_is_empty(&dev->d_buffer)) { /* If there are no writers on the pipe, then return end of file */ @@ -494,23 +471,13 @@ ssize_t pipecommon_read(FAR struct file *filep, FAR char *buffer, size_t len) * byte). */ - nread = 0; - while ((size_t)nread < len && dev->d_wrndx != dev->d_rdndx) - { - *buffer++ = dev->d_buffer[dev->d_rdndx]; - if (++dev->d_rdndx >= dev->d_bufsize) - { - dev->d_rdndx = 0; - } - - nread++; - } + nread = circbuf_read(&dev->d_buffer, buffer, len); /* Notify all poll/select waiters that they can write to the * FIFO when buffer can accept more than d_polloutthrd bytes. */ - if (pipecommon_bufferused(dev) < (dev->d_bufsize - 1 - dev->d_polloutthrd)) + if (circbuf_used(&dev->d_buffer) <= (dev->d_bufsize - dev->d_polloutthrd)) { poll_notify(dev->d_fds, CONFIG_DEV_PIPE_NPOLLWAITERS, POLLOUT); } @@ -519,13 +486,10 @@ ssize_t pipecommon_read(FAR struct file *filep, FAR char *buffer, size_t len) * buffer. */ - while (nxsem_get_value(&dev->d_wrsem, &sval) == 0 && sval <= 0) - { - nxsem_post(&dev->d_wrsem); - } + pipecommon_wakeup(&dev->d_wrsem); nxmutex_unlock(&dev->d_bflock); - pipe_dumpbuffer("From PIPE:", start, nread); + pipe_dumpbuffer("From PIPE:", buffer, nread); return nread; } @@ -540,8 +504,6 @@ ssize_t pipecommon_write(FAR struct file *filep, FAR const char *buffer, FAR struct pipe_dev_s *dev = inode->i_private; ssize_t nwritten = 0; ssize_t last; - int nxtwrndx; - int sval; int ret; DEBUGASSERT(dev); @@ -598,33 +560,22 @@ ssize_t pipecommon_write(FAR struct file *filep, FAR const char *buffer, return nwritten == 0 ? -EPIPE : nwritten; } - /* Calculate the write index AFTER the next byte is written */ - - nxtwrndx = dev->d_wrndx + 1; - if (nxtwrndx >= dev->d_bufsize) - { - nxtwrndx = 0; - } - /* Would the next write overflow the circular buffer? */ - if (nxtwrndx != dev->d_rdndx) + if (!circbuf_is_full(&dev->d_buffer)) { - /* No... copy the byte */ + /* Loop until all of the bytes have been written */ - dev->d_buffer[dev->d_wrndx] = *buffer++; - dev->d_wrndx = nxtwrndx; + nwritten += circbuf_write(&dev->d_buffer, + buffer + nwritten, len - nwritten); - /* Is the write complete? */ - - nwritten++; - if ((size_t)nwritten >= len) + if ((size_t)nwritten == len) { /* Notify all poll/select waiters that they can read from the * FIFO when buffer used exceeds poll threshold. */ - if (pipecommon_bufferused(dev) > dev->d_pollinthrd) + if (circbuf_used(&dev->d_buffer) > dev->d_pollinthrd) { poll_notify(dev->d_fds, CONFIG_DEV_PIPE_NPOLLWAITERS, POLLIN); @@ -634,10 +585,7 @@ ssize_t pipecommon_write(FAR struct file *filep, FAR const char *buffer, * available. */ - while (nxsem_get_value(&dev->d_rdsem, &sval) == 0 && sval <= 0) - { - nxsem_post(&dev->d_rdsem); - } + pipecommon_wakeup(&dev->d_rdsem); /* Return the number of bytes written */ @@ -663,10 +611,7 @@ ssize_t pipecommon_write(FAR struct file *filep, FAR const char *buffer, * available. */ - while (nxsem_get_value(&dev->d_rdsem, &sval) == 0 && sval <= 0) - { - nxsem_post(&dev->d_rdsem); - } + pipecommon_wakeup(&dev->d_rdsem); } last = nwritten; @@ -759,7 +704,7 @@ int pipecommon_poll(FAR struct file *filep, FAR struct pollfd *fds, * First, determine how many bytes are in the buffer */ - nbytes = pipecommon_bufferused(dev); + nbytes = circbuf_used(&dev->d_buffer); /* Notify the POLLOUT event if the pipe buffer can accept * more than d_polloutthrd bytes, but only if @@ -768,7 +713,7 @@ int pipecommon_poll(FAR struct file *filep, FAR struct pollfd *fds, eventset = 0; if ((filep->f_oflags & O_WROK) && - nbytes < (dev->d_bufsize - 1 - dev->d_polloutthrd)) + nbytes <= (dev->d_bufsize - dev->d_polloutthrd)) { eventset |= POLLOUT; } @@ -896,26 +841,7 @@ int pipecommon_ioctl(FAR struct file *filep, int cmd, unsigned long arg) case FIONWRITE: /* Number of bytes waiting in send queue */ case FIONREAD: /* Number of bytes available for reading */ { - int count; - - /* Determine the number of bytes written to the buffer. This is, - * of course, also the number of bytes that may be read from the - * buffer. - * - * d_rdndx - index to remove next byte from the buffer - * d_wrndx - Index to next location to add a byte to the buffer. - */ - - if (dev->d_wrndx < dev->d_rdndx) - { - count = (dev->d_bufsize - dev->d_rdndx) + dev->d_wrndx; - } - else - { - count = dev->d_wrndx - dev->d_rdndx; - } - - *(FAR int *)((uintptr_t)arg) = count; + *(FAR int *)((uintptr_t)arg) = circbuf_used(&dev->d_buffer); ret = 0; } break; @@ -924,24 +850,7 @@ int pipecommon_ioctl(FAR struct file *filep, int cmd, unsigned long arg) case FIONSPACE: { - int count; - - /* Determine the number of bytes free in the buffer. - * - * d_rdndx - index to remove next byte from the buffer - * d_wrndx - Index to next location to add a byte to the buffer. - */ - - if (dev->d_wrndx < dev->d_rdndx) - { - count = (dev->d_rdndx - dev->d_wrndx) - 1; - } - else - { - count = ((dev->d_bufsize - dev->d_wrndx) + dev->d_rdndx) - 1; - } - - *(FAR int *)((uintptr_t)arg) = count; + *(FAR int *)((uintptr_t)arg) = circbuf_space(&dev->d_buffer); ret = 0; } break; @@ -981,10 +890,7 @@ int pipecommon_unlink(FAR struct inode *inode) { /* No.. free the buffer (if there is one) */ - if (dev->d_buffer) - { - kmm_free(dev->d_buffer); - } + circbuf_uninit(&dev->d_buffer); /* And free the device structure. */ diff --git a/drivers/pipes/pipe_common.h b/drivers/pipes/pipe_common.h index 1e587a2c960..72f1af89f5c 100644 --- a/drivers/pipes/pipe_common.h +++ b/drivers/pipes/pipe_common.h @@ -27,6 +27,7 @@ #include #include +#include #include #include @@ -114,20 +115,18 @@ typedef uint8_t pipe_ndx_t; /* 8-bit index */ struct pipe_dev_s { - mutex_t d_bflock; /* Used to serialize access to d_buffer and indices */ - sem_t d_rdsem; /* Empty buffer - Reader waits for data write AND - * block O_RDONLY open until there is at least one writer */ - sem_t d_wrsem; /* Full buffer - Writer waits for data read AND - * block O_WRONLY open until there is at least one reader */ - pipe_ndx_t d_wrndx; /* Index in d_buffer to save next byte written */ - pipe_ndx_t d_rdndx; /* Index in d_buffer to return the next byte read */ - pipe_ndx_t d_bufsize; /* allocated size of d_buffer in bytes */ - pipe_ndx_t d_pollinthrd; /* Buffer threshold for POLLIN to occur */ - pipe_ndx_t d_polloutthrd; /* Buffer threshold for POLLOUT to occur */ - uint8_t d_nwriters; /* Number of reference counts for write access */ - uint8_t d_nreaders; /* Number of reference counts for read access */ - uint8_t d_flags; /* See PIPE_FLAG_* definitions */ - uint8_t *d_buffer; /* Buffer allocated when device opened */ + mutex_t d_bflock; /* Used to serialize access to d_buffer and indices */ + sem_t d_rdsem; /* Empty buffer - Reader waits for data write AND + * block O_RDONLY open until there is at least one writer */ + sem_t d_wrsem; /* Full buffer - Writer waits for data read AND + * block O_WRONLY open until there is at least one reader */ + pipe_ndx_t d_bufsize; /* allocated size of d_buffer in bytes */ + pipe_ndx_t d_pollinthrd; /* Buffer threshold for POLLIN to occur */ + pipe_ndx_t d_polloutthrd; /* Buffer threshold for POLLOUT to occur */ + uint8_t d_nwriters; /* Number of reference counts for write access */ + uint8_t d_nreaders; /* Number of reference counts for read access */ + uint8_t d_flags; /* See PIPE_FLAG_* definitions */ + struct circbuf_s d_buffer; /* Buffer allocated when device opened */ /* The following is a list if poll structures of threads waiting for * driver events. The 'struct pollfd' reference for each open is also