[PATCH 3/6] epoll: fix epoll's own poll
Stefan Bader
stefan.bader at canonical.com
Thu Oct 27 12:56:09 UTC 2011
On 26.10.2011 17:26, Paolo Pisati wrote:
> From: Davide Libenzi <davidel at xmailserver.org>
>
> Fix a bug inside the epoll's f_op->poll() code, that returns POLLIN even
> though there are no actual ready monitored fds. The bug shows up if you
> add an epoll fd inside another fd container (poll, select, epoll).
>
> The problem is that callback-based wake ups used by epoll does not carry
> (patches will follow, to fix this) any information about the events that
> actually happened. So the callback code, since it can't call the file*
> ->poll() inside the callback, chains the file* into a ready-list.
>
> So, suppose you added an fd with EPOLLOUT only, and some data shows up on
> the fd, the file* mapped by the fd will be added into the ready-list (via
> wakeup callback). During normal epoll_wait() use, this condition is
> sorted out at the time we're actually able to call the file*'s
> f_op->poll().
>
> Inside the old epoll's f_op->poll() though, only a quick check
> !list_empty(ready-list) was performed, and this could have led to
> reporting POLLIN even though no ready fds would show up at a following
> epoll_wait(). In order to correctly report the ready status for an epoll
> fd, the ready-list must be checked to see if any really available fd+event
> would be ready in a following epoll_wait().
>
> Operation (calling f_op->poll() from inside f_op->poll()) that, like wake
> ups, must be handled with care because of the fact that epoll fds can be
> added to other epoll fds.
>
> Test code:
>
> /*
> * epoll_test by Davide Libenzi (Simple code to test epoll internals)
> * Copyright (C) 2008 Davide Libenzi
> *
> * This program is free software; you can redistribute it and/or modify
> * it under the terms of the GNU General Public License as published by
> * the Free Software Foundation; either version 2 of the License, or
> * (at your option) any later version.
> *
> * This program is distributed in the hope that it will be useful,
> * but WITHOUT ANY WARRANTY; without even the implied warranty of
> * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> * GNU General Public License for more details.
> *
> * You should have received a copy of the GNU General Public License
> * along with this program; if not, write to the Free Software
> * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> *
> * Davide Libenzi <davidel at xmailserver.org>
> *
> */
>
> struct epoll_test_cfg {
> long size;
> long flags;
> };
>
> static int xepoll_create(int n) {
> int epfd;
>
> if ((epfd = epoll_create(n)) == -1) {
> perror("epoll_create");
> exit(2);
> }
>
> return epfd;
> }
>
> static void xepoll_ctl(int epfd, int cmd, int fd, struct epoll_event *evt) {
> if (epoll_ctl(epfd, cmd, fd, evt) < 0) {
> perror("epoll_ctl");
> exit(3);
> }
> }
>
> static void xpipe(int *fds) {
> if (pipe(fds)) {
> perror("pipe");
> exit(4);
> }
> }
>
> static pid_t xfork(void) {
> pid_t pid;
>
> if ((pid = fork()) == (pid_t) -1) {
> perror("pipe");
> exit(5);
> }
>
> return pid;
> }
>
> static int run_forked_proc(int (*proc)(void *), void *data) {
> int status;
> pid_t pid;
>
> if ((pid = xfork()) == 0)
> exit((*proc)(data));
> if (waitpid(pid, &status, 0) != pid) {
> perror("waitpid");
> return -1;
> }
>
> return WIFEXITED(status) ? WEXITSTATUS(status): -2;
> }
>
> static int check_events(int fd, int timeo) {
> struct pollfd pfd;
>
> fprintf(stdout, "Checking events for fd %d\n", fd);
> memset(&pfd, 0, sizeof(pfd));
> pfd.fd = fd;
> pfd.events = POLLIN | POLLOUT;
> if (poll(&pfd, 1, timeo) < 0) {
> perror("poll()");
> return 0;
> }
> if (pfd.revents & POLLIN)
> fprintf(stdout, "\tPOLLIN\n");
> if (pfd.revents & POLLOUT)
> fprintf(stdout, "\tPOLLOUT\n");
> if (pfd.revents & POLLERR)
> fprintf(stdout, "\tPOLLERR\n");
> if (pfd.revents & POLLHUP)
> fprintf(stdout, "\tPOLLHUP\n");
> if (pfd.revents & POLLRDHUP)
> fprintf(stdout, "\tPOLLRDHUP\n");
>
> return pfd.revents;
> }
>
> static int epoll_test_tty(void *data) {
> int epfd, ifd = fileno(stdin), res;
> struct epoll_event evt;
>
> if (check_events(ifd, 0) != POLLOUT) {
> fprintf(stderr, "Something is cooking on STDIN (%d)\n", ifd);
> return 1;
> }
> epfd = xepoll_create(1);
> fprintf(stdout, "Created epoll fd (%d)\n", epfd);
> memset(&evt, 0, sizeof(evt));
> evt.events = EPOLLIN;
> xepoll_ctl(epfd, EPOLL_CTL_ADD, ifd, &evt);
> if (check_events(epfd, 0) & POLLIN) {
> res = epoll_wait(epfd, &evt, 1, 0);
> if (res == 0) {
> fprintf(stderr, "Epoll fd (%d) is ready when it shouldn't!\n",
> epfd);
> return 2;
> }
> }
>
> return 0;
> }
>
> static int epoll_wakeup_chain(void *data) {
> struct epoll_test_cfg *tcfg = data;
> int i, res, epfd, bfd, nfd, pfds[2];
> pid_t pid;
> struct epoll_event evt;
>
> memset(&evt, 0, sizeof(evt));
> evt.events = EPOLLIN;
>
> epfd = bfd = xepoll_create(1);
>
> for (i = 0; i < tcfg->size; i++) {
> nfd = xepoll_create(1);
> xepoll_ctl(bfd, EPOLL_CTL_ADD, nfd, &evt);
> bfd = nfd;
> }
> xpipe(pfds);
> if (tcfg->flags & EPOLL_TF_LOOP)
> {
> xepoll_ctl(bfd, EPOLL_CTL_ADD, epfd, &evt);
> /*
> * If we're testing for loop, we want that the wakeup
> * triggered by the write to the pipe done in the child
> * process, triggers a fake event. So we add the pipe
> * read size with EPOLLOUT events. This will trigger
> * an addition to the ready-list, but no real events
> * will be there. The the epoll kernel code will proceed
> * in calling f_op->poll() of the epfd, triggering the
> * loop we want to test.
> */
> evt.events = EPOLLOUT;
> }
> xepoll_ctl(bfd, EPOLL_CTL_ADD, pfds[0], &evt);
>
> /*
> * The pipe write must come after the poll(2) call inside
> * check_events(). This tests the nested wakeup code in
> * fs/eventpoll.c:ep_poll_safewake()
> * By having the check_events() (hence poll(2)) happens first,
> * we have poll wait queue filled up, and the write(2) in the
> * child will trigger the wakeup chain.
> */
> if ((pid = xfork()) == 0) {
> sleep(1);
> write(pfds[1], "w", 1);
> exit(0);
> }
>
> res = check_events(epfd, 2000) & POLLIN;
>
> if (waitpid(pid, NULL, 0) != pid) {
> perror("waitpid");
> return -1;
> }
>
> return res;
> }
>
> static int epoll_poll_chain(void *data) {
> struct epoll_test_cfg *tcfg = data;
> int i, res, epfd, bfd, nfd, pfds[2];
> pid_t pid;
> struct epoll_event evt;
>
> memset(&evt, 0, sizeof(evt));
> evt.events = EPOLLIN;
>
> epfd = bfd = xepoll_create(1);
>
> for (i = 0; i < tcfg->size; i++) {
> nfd = xepoll_create(1);
> xepoll_ctl(bfd, EPOLL_CTL_ADD, nfd, &evt);
> bfd = nfd;
> }
> xpipe(pfds);
> if (tcfg->flags & EPOLL_TF_LOOP)
> {
> xepoll_ctl(bfd, EPOLL_CTL_ADD, epfd, &evt);
> /*
> * If we're testing for loop, we want that the wakeup
> * triggered by the write to the pipe done in the child
> * process, triggers a fake event. So we add the pipe
> * read size with EPOLLOUT events. This will trigger
> * an addition to the ready-list, but no real events
> * will be there. The the epoll kernel code will proceed
> * in calling f_op->poll() of the epfd, triggering the
> * loop we want to test.
> */
> evt.events = EPOLLOUT;
> }
> xepoll_ctl(bfd, EPOLL_CTL_ADD, pfds[0], &evt);
>
> /*
> * The pipe write mush come before the poll(2) call inside
> * check_events(). This tests the nested f_op->poll calls code in
> * fs/eventpoll.c:ep_eventpoll_poll()
> * By having the pipe write(2) happen first, we make the kernel
> * epoll code to load the ready lists, and the following poll(2)
> * done inside check_events() will test nested poll code in
> * ep_eventpoll_poll().
> */
> if ((pid = xfork()) == 0) {
> write(pfds[1], "w", 1);
> exit(0);
> }
> sleep(1);
> res = check_events(epfd, 1000) & POLLIN;
>
> if (waitpid(pid, NULL, 0) != pid) {
> perror("waitpid");
> return -1;
> }
>
> return res;
> }
>
> int main(int ac, char **av) {
> int error;
> struct epoll_test_cfg tcfg;
>
> fprintf(stdout, "\n********** Testing TTY events\n");
> error = run_forked_proc(epoll_test_tty, NULL);
> fprintf(stdout, error == 0 ?
> "********** OK\n": "********** FAIL (%d)\n", error);
>
> tcfg.size = 3;
> tcfg.flags = 0;
> fprintf(stdout, "\n********** Testing short wakeup chain\n");
> error = run_forked_proc(epoll_wakeup_chain, &tcfg);
> fprintf(stdout, error == POLLIN ?
> "********** OK\n": "********** FAIL (%d)\n", error);
>
> tcfg.size = EPOLL_MAX_CHAIN;
> tcfg.flags = 0;
> fprintf(stdout, "\n********** Testing long wakeup chain (HOLD ON)\n");
> error = run_forked_proc(epoll_wakeup_chain, &tcfg);
> fprintf(stdout, error == 0 ?
> "********** OK\n": "********** FAIL (%d)\n", error);
>
> tcfg.size = 3;
> tcfg.flags = 0;
> fprintf(stdout, "\n********** Testing short poll chain\n");
> error = run_forked_proc(epoll_poll_chain, &tcfg);
> fprintf(stdout, error == POLLIN ?
> "********** OK\n": "********** FAIL (%d)\n", error);
>
> tcfg.size = EPOLL_MAX_CHAIN;
> tcfg.flags = 0;
> fprintf(stdout, "\n********** Testing long poll chain (HOLD ON)\n");
> error = run_forked_proc(epoll_poll_chain, &tcfg);
> fprintf(stdout, error == 0 ?
> "********** OK\n": "********** FAIL (%d)\n", error);
>
> tcfg.size = 3;
> tcfg.flags = EPOLL_TF_LOOP;
> fprintf(stdout, "\n********** Testing loopy wakeup chain (HOLD ON)\n");
> error = run_forked_proc(epoll_wakeup_chain, &tcfg);
> fprintf(stdout, error == 0 ?
> "********** OK\n": "********** FAIL (%d)\n", error);
>
> tcfg.size = 3;
> tcfg.flags = EPOLL_TF_LOOP;
> fprintf(stdout, "\n********** Testing loopy poll chain (HOLD ON)\n");
> error = run_forked_proc(epoll_poll_chain, &tcfg);
> fprintf(stdout, error == 0 ?
> "********** OK\n": "********** FAIL (%d)\n", error);
>
> return 0;
> }
>
> Signed-off-by: Davide Libenzi <davidel at xmailserver.org>
> Cc: Pavel Pisa <pisa at cmp.felk.cvut.cz>
> Signed-off-by: Andrew Morton <akpm at linux-foundation.org>
> Signed-off-by: Linus Torvalds <torvalds at linux-foundation.org>
>
> commit 5071f97ec6d74f006072de0ce89b67c8792fe5a1 upstream
>
> Signed-off-by: Paolo Pisati <paolo.pisati at canonical.com>
> ---
> fs/eventpoll.c | 461 ++++++++++++++++++++++++++++++++++----------------------
> 1 files changed, 279 insertions(+), 182 deletions(-)
>
> diff --git a/fs/eventpoll.c b/fs/eventpoll.c
> index d86c0c9..fe950e6 100644
> --- a/fs/eventpoll.c
> +++ b/fs/eventpoll.c
> @@ -1,6 +1,6 @@
> /*
> - * fs/eventpoll.c (Efficent event polling implementation)
> - * Copyright (C) 2001,...,2007 Davide Libenzi
> + * fs/eventpoll.c (Efficient event retrieval implementation)
> + * Copyright (C) 2001,...,2009 Davide Libenzi
> *
> * This program is free software; you can redistribute it and/or modify
> * it under the terms of the GNU General Public License as published by
> @@ -92,8 +92,8 @@
> /* Epoll private bits inside the event mask */
> #define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
>
> -/* Maximum number of poll wake up nests we are allowing */
> -#define EP_MAX_POLLWAKE_NESTS 4
> +/* Maximum number of nesting allowed inside epoll sets */
> +#define EP_MAX_NESTS 4
>
> /* Maximum msec timeout value storeable in a long int */
> #define EP_MAX_MSTIMEO min(1000ULL * MAX_SCHEDULE_TIMEOUT / HZ, (LONG_MAX - 999ULL) / HZ)
> @@ -108,24 +108,21 @@ struct epoll_filefd {
> };
>
> /*
> - * Node that is linked into the "wake_task_list" member of the "struct poll_safewake".
> - * It is used to keep track on all tasks that are currently inside the wake_up() code
> - * to 1) short-circuit the one coming from the same task and same wait queue head
> - * (loop) 2) allow a maximum number of epoll descriptors inclusion nesting
> - * 3) let go the ones coming from other tasks.
> + * Structure used to track possible nested calls, for too deep recursions
> + * and loop cycles.
> */
> -struct wake_task_node {
> +struct nested_call_node {
> struct list_head llink;
> struct task_struct *task;
> - wait_queue_head_t *wq;
> + void *cookie;
> };
>
> /*
> - * This is used to implement the safe poll wake up avoiding to reenter
> - * the poll callback from inside wake_up().
> + * This structure is used as collector for nested calls, to check for
> + * maximum recursion dept and loop cycles.
> */
> -struct poll_safewake {
> - struct list_head wake_task_list;
> +struct nested_calls {
> + struct list_head tasks_call_list;
> spinlock_t lock;
> };
>
> @@ -226,13 +223,22 @@ struct ep_pqueue {
> struct epitem *epi;
> };
>
> +/* Used by the ep_send_events() function as callback private data */
> +struct ep_send_events_data {
> + int maxevents;
> + struct epoll_event __user *events;
> +};
> +
> /*
> * This mutex is used to serialize ep_free() and eventpoll_release_file().
> */
> static struct mutex epmutex;
>
> -/* Safe wake up implementation */
> -static struct poll_safewake psw;
> +/* Used for safe wake up implementation */
> +static struct nested_calls poll_safewake_ncalls;
> +
> +/* Used to call file's f_op->poll() under the nested calls boundaries */
> +static struct nested_calls poll_readywalk_ncalls;
>
> /* Slab cache used to allocate "struct epitem" */
> static struct kmem_cache *epi_cache __read_mostly;
> @@ -301,64 +307,96 @@ static inline int ep_op_has_event(int op)
> }
>
> /* Initialize the poll safe wake up structure */
> -static void ep_poll_safewake_init(struct poll_safewake *psw)
> +static void ep_nested_calls_init(struct nested_calls *ncalls)
> {
> -
> - INIT_LIST_HEAD(&psw->wake_task_list);
> - spin_lock_init(&psw->lock);
> + INIT_LIST_HEAD(&ncalls->tasks_call_list);
> + spin_lock_init(&ncalls->lock);
> }
>
> -/*
> - * Perform a safe wake up of the poll wait list. The problem is that
> - * with the new callback'd wake up system, it is possible that the
> - * poll callback is reentered from inside the call to wake_up() done
> - * on the poll wait queue head. The rule is that we cannot reenter the
> - * wake up code from the same task more than EP_MAX_POLLWAKE_NESTS times,
> - * and we cannot reenter the same wait queue head at all. This will
> - * enable to have a hierarchy of epoll file descriptor of no more than
> - * EP_MAX_POLLWAKE_NESTS deep. We need the irq version of the spin lock
> - * because this one gets called by the poll callback, that in turn is called
> - * from inside a wake_up(), that might be called from irq context.
> +/**
> + * ep_call_nested - Perform a bound (possibly) nested call, by checking
> + * that the recursion limit is not exceeded, and that
> + * the same nested call (by the meaning of same cookie) is
> + * no re-entered.
> + *
> + * @ncalls: Pointer to the nested_calls structure to be used for this call.
> + * @max_nests: Maximum number of allowed nesting calls.
> + * @nproc: Nested call core function pointer.
> + * @priv: Opaque data to be passed to the @nproc callback.
> + * @cookie: Cookie to be used to identify this nested call.
> + *
> + * Returns: Returns the code returned by the @nproc callback, or -1 if
> + * the maximum recursion limit has been exceeded.
> */
> -static void ep_poll_safewake(struct poll_safewake *psw, wait_queue_head_t *wq)
> +static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
> + int (*nproc)(void *, void *, int), void *priv,
> + void *cookie)
> {
> - int wake_nests = 0;
> + int error, call_nests = 0;
> unsigned long flags;
> struct task_struct *this_task = current;
> - struct list_head *lsthead = &psw->wake_task_list;
> - struct wake_task_node *tncur;
> - struct wake_task_node tnode;
> + struct list_head *lsthead = &ncalls->tasks_call_list;
> + struct nested_call_node *tncur;
> + struct nested_call_node tnode;
>
> - spin_lock_irqsave(&psw->lock, flags);
> + spin_lock_irqsave(&ncalls->lock, flags);
>
> - /* Try to see if the current task is already inside this wakeup call */
> + /*
> + * Try to see if the current task is already inside this wakeup call.
> + * We use a list here, since the population inside this set is always
> + * very much limited.
> + */
> list_for_each_entry(tncur, lsthead, llink) {
> -
> - if (tncur->wq == wq ||
> - (tncur->task == this_task && ++wake_nests > EP_MAX_POLLWAKE_NESTS)) {
> + if (tncur->task == this_task &&
> + (tncur->cookie == cookie || ++call_nests > max_nests)) {
> /*
> * Ops ... loop detected or maximum nest level reached.
> * We abort this wake by breaking the cycle itself.
> */
> - spin_unlock_irqrestore(&psw->lock, flags);
> - return;
> + spin_unlock_irqrestore(&ncalls->lock, flags);
> +
> + return -1;
> }
> }
>
> - /* Add the current task to the list */
> + /* Add the current task and cookie to the list */
> tnode.task = this_task;
> - tnode.wq = wq;
> + tnode.cookie = cookie;
> list_add(&tnode.llink, lsthead);
>
> - spin_unlock_irqrestore(&psw->lock, flags);
> + spin_unlock_irqrestore(&ncalls->lock, flags);
>
> - /* Do really wake up now */
> - wake_up_nested(wq, 1 + wake_nests);
> + /* Call the nested function */
> + error = (*nproc)(priv, cookie, call_nests);
>
> /* Remove the current task from the list */
> - spin_lock_irqsave(&psw->lock, flags);
> + spin_lock_irqsave(&ncalls->lock, flags);
> list_del(&tnode.llink);
> - spin_unlock_irqrestore(&psw->lock, flags);
> + spin_unlock_irqrestore(&ncalls->lock, flags);
> +
> + return error;
> +}
> +
> +static int ep_poll_wakeup_proc(void *priv, void *cookie, int call_nests)
> +{
> + wake_up_nested((wait_queue_head_t *) cookie, 1 + call_nests);
> + return 0;
> +}
> +
> +/*
> + * Perform a safe wake up of the poll wait list. The problem is that
> + * with the new callback'd wake up system, it is possible that the
> + * poll callback is reentered from inside the call to wake_up() done
> + * on the poll wait queue head. The rule is that we cannot reenter the
> + * wake up code from the same task more than EP_MAX_NESTS times,
> + * and we cannot reenter the same wait queue head at all. This will
> + * enable to have a hierarchy of epoll file descriptor of no more than
> + * EP_MAX_NESTS deep.
> + */
> +static void ep_poll_safewake(wait_queue_head_t *wq)
> +{
> + ep_call_nested(&poll_safewake_ncalls, EP_MAX_NESTS,
> + ep_poll_wakeup_proc, NULL, wq);
> }
>
> /*
> @@ -386,6 +424,104 @@ static void ep_unregister_pollwait(struct eventpoll *ep, struct epitem *epi)
> }
> }
>
> +/**
> + * ep_scan_ready_list - Scans the ready list in a way that makes possible for
> + * the scan code, to call f_op->poll(). Also allows for
> + * O(NumReady) performance.
> + *
> + * @ep: Pointer to the epoll private data structure.
> + * @sproc: Pointer to the scan callback.
> + * @priv: Private opaque data passed to the @sproc callback.
> + *
> + * Returns: The same integer error code returned by the @sproc callback.
> + */
> +static int ep_scan_ready_list(struct eventpoll *ep,
> + int (*sproc)(struct eventpoll *,
> + struct list_head *, void *),
> + void *priv)
> +{
> + int error, pwake = 0;
> + unsigned long flags;
> + struct epitem *epi, *nepi;
> + struct list_head txlist;
> +
> + INIT_LIST_HEAD(&txlist);
> +
> + /*
> + * We need to lock this because we could be hit by
> + * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).
> + */
> + mutex_lock(&ep->mtx);
> +
> + /*
> + * Steal the ready list, and re-init the original one to the
> + * empty list. Also, set ep->ovflist to NULL so that events
> + * happening while looping w/out locks, are not lost. We cannot
> + * have the poll callback to queue directly on ep->rdllist,
> + * because we want the "sproc" callback to be able to do it
> + * in a lockless way.
> + */
> + spin_lock_irqsave(&ep->lock, flags);
> + list_splice(&ep->rdllist, &txlist);
> + INIT_LIST_HEAD(&ep->rdllist);
> + ep->ovflist = NULL;
> + spin_unlock_irqrestore(&ep->lock, flags);
> +
> + /*
> + * Now call the callback function.
> + */
> + error = (*sproc)(ep, &txlist, priv);
> +
> + spin_lock_irqsave(&ep->lock, flags);
> + /*
> + * During the time we spent inside the "sproc" callback, some
> + * other events might have been queued by the poll callback.
> + * We re-insert them inside the main ready-list here.
> + */
> + for (nepi = ep->ovflist; (epi = nepi) != NULL;
> + nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
> + /*
> + * We need to check if the item is already in the list.
> + * During the "sproc" callback execution time, items are
> + * queued into ->ovflist but the "txlist" might already
> + * contain them, and the list_splice() below takes care of them.
> + */
> + if (!ep_is_linked(&epi->rdllink))
> + list_add_tail(&epi->rdllink, &ep->rdllist);
> + }
> + /*
> + * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
> + * releasing the lock, events will be queued in the normal way inside
> + * ep->rdllist.
> + */
> + ep->ovflist = EP_UNACTIVE_PTR;
> +
> + /*
> + * Quickly re-inject items left on "txlist".
> + */
> + list_splice(&txlist, &ep->rdllist);
> +
> + if (!list_empty(&ep->rdllist)) {
> + /*
> + * Wake up (if active) both the eventpoll wait list and the ->poll()
> + * wait list (delayed after we release the lock).
> + */
> + if (waitqueue_active(&ep->wq))
> + wake_up_locked(&ep->wq);
> + if (waitqueue_active(&ep->poll_wait))
> + pwake++;
> + }
> + spin_unlock_irqrestore(&ep->lock, flags);
> +
> + mutex_unlock(&ep->mtx);
> +
> + /* We have to call this outside the lock */
> + if (pwake)
> + ep_poll_safewake(&ep->poll_wait);
> +
> + return error;
> +}
> +
> /*
> * Removes a "struct epitem" from the eventpoll RB tree and deallocates
> * all the associated resources. Must be called with "mtx" held.
> @@ -435,7 +571,7 @@ static void ep_free(struct eventpoll *ep)
>
> /* We need to release all tasks waiting for these file */
> if (waitqueue_active(&ep->poll_wait))
> - ep_poll_safewake(&psw, &ep->poll_wait);
> + ep_poll_safewake(&ep->poll_wait);
>
> /*
> * We need to lock this because we could be hit by
> @@ -483,22 +619,49 @@ static int ep_eventpoll_release(struct inode *inode, struct file *file)
> return 0;
> }
>
> +static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
> +{
> + struct epitem *epi, *tmp;
> +
> + list_for_each_entry_safe(epi, tmp, head, rdllink) {
> + if (epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
> + epi->event.events)
> + return POLLIN | POLLRDNORM;
> + else
> + /*
> + * Item has been dropped into the ready list by the poll
> + * callback, but it's not actually ready, as far as
> + * caller requested events goes. We can remove it here.
> + */
> + list_del_init(&epi->rdllink);
> + }
> +
> + return 0;
> +}
> +
> +static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests)
> +{
> + return ep_scan_ready_list(priv, ep_read_events_proc, NULL);
> +}
> +
> static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
> {
> - unsigned int pollflags = 0;
> - unsigned long flags;
> + int pollflags;
> struct eventpoll *ep = file->private_data;
>
> /* Insert inside our poll wait queue */
> poll_wait(file, &ep->poll_wait, wait);
>
> - /* Check our condition */
> - spin_lock_irqsave(&ep->lock, flags);
> - if (!list_empty(&ep->rdllist))
> - pollflags = POLLIN | POLLRDNORM;
> - spin_unlock_irqrestore(&ep->lock, flags);
> + /*
> + * Proceed to find out if wanted events are really available inside
> + * the ready list. This need to be done under ep_call_nested()
> + * supervision, since the call to f_op->poll() done on listed files
> + * could re-enter here.
> + */
> + pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
> + ep_poll_readyevents_proc, ep, ep);
>
> - return pollflags;
> + return pollflags != -1 ? pollflags: 0;
> }
>
> /* File callbacks that implement the eventpoll file behaviour */
> @@ -528,7 +691,7 @@ void eventpoll_release_file(struct file *file)
> * We don't want to get "file->f_ep_lock" because it is not
> * necessary. It is not necessary because we're in the "struct file"
> * cleanup path, and this means that noone is using this file anymore.
> - * So, for example, epoll_ctl() cannot hit here sicne if we reach this
> + * So, for example, epoll_ctl() cannot hit here since if we reach this
> * point, the file counter already went to zero and fget() would fail.
> * The only hit might come from ep_free() but by holding the mutex
> * will correctly serialize the operation. We do need to acquire
> @@ -645,12 +808,9 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *k
> }
>
> /* If this file is already in the ready list we exit soon */
> - if (ep_is_linked(&epi->rdllink))
> - goto is_linked;
> -
> - list_add_tail(&epi->rdllink, &ep->rdllist);
> + if (!ep_is_linked(&epi->rdllink))
> + list_add_tail(&epi->rdllink, &ep->rdllist);
>
> -is_linked:
> /*
> * Wake up ( if active ) both the eventpoll wait list and the ->poll()
> * wait list.
> @@ -666,7 +826,7 @@ out_unlock:
>
> /* We have to call this outside the lock */
> if (pwake)
> - ep_poll_safewake(&psw, &ep->poll_wait);
> + ep_poll_safewake(&ep->poll_wait);
>
> return 1;
> }
> @@ -688,10 +848,9 @@ static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
> add_wait_queue(whead, &pwq->wait);
> list_add_tail(&pwq->llink, &epi->pwqlist);
> epi->nwait++;
> - } else {
> + } else
> /* We have to signal that an error occurred */
> epi->nwait = -1;
> - }
> }
>
> static void ep_rbtree_insert(struct eventpoll *ep, struct epitem *epi)
> @@ -789,7 +948,7 @@ static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
>
> /* We have to call this outside the lock */
> if (pwake)
> - ep_poll_safewake(&psw, &ep->poll_wait);
> + ep_poll_safewake(&ep->poll_wait);
>
> DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_insert(%p, %p, %d)\n",
> current, ep, tfile, fd));
> @@ -864,138 +1023,74 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_even
>
> /* We have to call this outside the lock */
> if (pwake)
> - ep_poll_safewake(&psw, &ep->poll_wait);
> + ep_poll_safewake(&ep->poll_wait);
>
> return 0;
> }
>
> -static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events,
> - int maxevents)
> +static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
> {
> - int eventcnt, error = -EFAULT, pwake = 0;
> + struct ep_send_events_data *esed = priv;
> + int eventcnt;
> unsigned int revents;
> - unsigned long flags;
> - struct epitem *epi, *nepi;
> - struct list_head txlist;
> -
> - INIT_LIST_HEAD(&txlist);
> -
> - /*
> - * We need to lock this because we could be hit by
> - * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).
> - */
> - mutex_lock(&ep->mtx);
> -
> - /*
> - * Steal the ready list, and re-init the original one to the
> - * empty list. Also, set ep->ovflist to NULL so that events
> - * happening while looping w/out locks, are not lost. We cannot
> - * have the poll callback to queue directly on ep->rdllist,
> - * because we are doing it in the loop below, in a lockless way.
> - */
> - spin_lock_irqsave(&ep->lock, flags);
> - list_splice(&ep->rdllist, &txlist);
> - INIT_LIST_HEAD(&ep->rdllist);
> - ep->ovflist = NULL;
> - spin_unlock_irqrestore(&ep->lock, flags);
> + struct epitem *epi;
> + struct epoll_event __user *uevent;
>
> /*
> - * We can loop without lock because this is a task private list.
> - * We just splice'd out the ep->rdllist in ep_collect_ready_items().
> - * Items cannot vanish during the loop because we are holding "mtx".
> + * We can loop without lock because we are passed a task private list.
> + * Items cannot vanish during the loop because ep_scan_ready_list() is
> + * holding "mtx" during this call.
> */
> - for (eventcnt = 0; !list_empty(&txlist) && eventcnt < maxevents;) {
> - epi = list_first_entry(&txlist, struct epitem, rdllink);
> + for (eventcnt = 0, uevent = esed->events;
> + !list_empty(head) && eventcnt < esed->maxevents;) {
> + epi = list_first_entry(head, struct epitem, rdllink);
>
> list_del_init(&epi->rdllink);
>
> - /*
> - * Get the ready file event set. We can safely use the file
> - * because we are holding the "mtx" and this will guarantee
> - * that both the file and the item will not vanish.
> - */
> - revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
> - revents &= epi->event.events;
> + revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
> + epi->event.events;
>
> /*
> - * Is the event mask intersect the caller-requested one,
> - * deliver the event to userspace. Again, we are holding
> - * "mtx", so no operations coming from userspace can change
> - * the item.
> + * If the event mask intersect the caller-requested one,
> + * deliver the event to userspace. Again, ep_scan_ready_list()
> + * is holding "mtx", so no operations coming from userspace
> + * can change the item.
> */
> if (revents) {
> - if (__put_user(revents,
> - &events[eventcnt].events) ||
> - __put_user(epi->event.data,
> - &events[eventcnt].data))
> - goto errxit;
> + if (__put_user(revents, &uevent->events) ||
> + __put_user(epi->event.data, &uevent->data))
> + return eventcnt ? eventcnt: -EFAULT;
> + eventcnt++;
> + uevent++;
> if (epi->event.events & EPOLLONESHOT)
> epi->event.events &= EP_PRIVATE_BITS;
> - eventcnt++;
> + else if (!(epi->event.events & EPOLLET))
> + /*
> + * If this file has been added with Level Trigger
> + * mode, we need to insert back inside the ready
> + * list, so that the next call to epoll_wait()
> + * will check again the events availability.
> + * At this point, noone can insert into ep->rdllist
> + * besides us. The epoll_ctl() callers are locked
> + * out by ep_scan_ready_list() holding "mtx" and
> + * the poll callback will queue them in ep->ovflist.
> + */
> + list_add_tail(&epi->rdllink, &ep->rdllist);
> }
> - /*
> - * At this point, noone can insert into ep->rdllist besides
> - * us. The epoll_ctl() callers are locked out by us holding
> - * "mtx" and the poll callback will queue them in ep->ovflist.
> - */
> - if (!(epi->event.events & EPOLLET) &&
> - (revents & epi->event.events))
> - list_add_tail(&epi->rdllink, &ep->rdllist);
> }
> - error = 0;
> -
> -errxit:
> -
> - spin_lock_irqsave(&ep->lock, flags);
> - /*
> - * During the time we spent in the loop above, some other events
> - * might have been queued by the poll callback. We re-insert them
> - * inside the main ready-list here.
> - */
> - for (nepi = ep->ovflist; (epi = nepi) != NULL;
> - nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
> - /*
> - * If the above loop quit with errors, the epoll item might still
> - * be linked to "txlist", and the list_splice() done below will
> - * take care of those cases.
> - */
> - if (!ep_is_linked(&epi->rdllink))
> - list_add_tail(&epi->rdllink, &ep->rdllist);
> - }
> - /*
> - * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
> - * releasing the lock, events will be queued in the normal way inside
> - * ep->rdllist.
> - */
> - ep->ovflist = EP_UNACTIVE_PTR;
>
> - /*
> - * In case of error in the event-send loop, or in case the number of
> - * ready events exceeds the userspace limit, we need to splice the
> - * "txlist" back inside ep->rdllist.
> - */
> - list_splice(&txlist, &ep->rdllist);
> -
> - if (!list_empty(&ep->rdllist)) {
> - /*
> - * Wake up (if active) both the eventpoll wait list and the ->poll()
> - * wait list (delayed after we release the lock).
> - */
> - if (waitqueue_active(&ep->wq))
> - __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
> - TASK_INTERRUPTIBLE);
> - if (waitqueue_active(&ep->poll_wait))
> - pwake++;
> - }
> - spin_unlock_irqrestore(&ep->lock, flags);
> + return eventcnt;
> +}
>
> - mutex_unlock(&ep->mtx);
> +static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events,
> + int maxevents)
> +{
> + struct ep_send_events_data esed;
>
> - /* We have to call this outside the lock */
> - if (pwake)
> - ep_poll_safewake(&psw, &ep->poll_wait);
> + esed.maxevents = maxevents;
> + esed.events = events;
>
> - return eventcnt == 0 ? error: eventcnt;
> + return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
> }
>
> static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
> @@ -1007,7 +1102,7 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
> wait_queue_t wait;
>
> /*
> - * Calculate the timeout by checking for the "infinite" value ( -1 )
> + * Calculate the timeout by checking for the "infinite" value (-1)
> * and the overflow condition. The passed timeout is in milliseconds,
> * that why (t * HZ) / 1000.
> */
> @@ -1050,9 +1145,8 @@ retry:
>
> set_current_state(TASK_RUNNING);
> }
> -
> /* Is it worth to try to dig for events ? */
> - eavail = !list_empty(&ep->rdllist);
> + eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
>
> spin_unlock_irqrestore(&ep->lock, flags);
>
> @@ -1322,7 +1416,10 @@ static int __init eventpoll_init(void)
> mutex_init(&epmutex);
>
> /* Initialize the structure used to perform safe poll wait head wake ups */
> - ep_poll_safewake_init(&psw);
> + ep_nested_calls_init(&poll_safewake_ncalls);
> +
> + /* Initialize the structure used to perform file's f_op->poll() calls */
> + ep_nested_calls_init(&poll_readywalk_ncalls);
>
> /* Allocates slab cache used to allocate "struct epitem" items */
> epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
Somehow I am not sure whether there isn't a hunk missing for epoll_create...
Need to look a bit closer...
More information about the kernel-team
mailing list