[PATCH 3/6] epoll: fix epoll's own poll

Stefan Bader stefan.bader at canonical.com
Thu Oct 27 13:08:36 UTC 2011


On 27.10.2011 14:56, Stefan Bader wrote:
> On 26.10.2011 17:26, Paolo Pisati wrote:
>> From: Davide Libenzi <davidel at xmailserver.org>
>>
>> Fix a bug inside the epoll's f_op->poll() code, that returns POLLIN even
>> though there are no actual ready monitored fds.  The bug shows up if you
>> add an epoll fd inside another fd container (poll, select, epoll).
>>
>> The problem is that callback-based wake ups used by epoll does not carry
>> (patches will follow, to fix this) any information about the events that
>> actually happened.  So the callback code, since it can't call the file*
>> ->poll() inside the callback, chains the file* into a ready-list.
>>
>> So, suppose you added an fd with EPOLLOUT only, and some data shows up on
>> the fd, the file* mapped by the fd will be added into the ready-list (via
>> wakeup callback).  During normal epoll_wait() use, this condition is
>> sorted out at the time we're actually able to call the file*'s
>> f_op->poll().
>>
>> Inside the old epoll's f_op->poll() though, only a quick check
>> !list_empty(ready-list) was performed, and this could have led to
>> reporting POLLIN even though no ready fds would show up at a following
>> epoll_wait().  In order to correctly report the ready status for an epoll
>> fd, the ready-list must be checked to see if any really available fd+event
>> would be ready in a following epoll_wait().
>>
>> Operation (calling f_op->poll() from inside f_op->poll()) that, like wake
>> ups, must be handled with care because of the fact that epoll fds can be
>> added to other epoll fds.
>>
>> Test code:
>>
>> /*
>>  *  epoll_test by Davide Libenzi (Simple code to test epoll internals)
>>  *  Copyright (C) 2008  Davide Libenzi
>>  *
>>  *  This program is free software; you can redistribute it and/or modify
>>  *  it under the terms of the GNU General Public License as published by
>>  *  the Free Software Foundation; either version 2 of the License, or
>>  *  (at your option) any later version.
>>  *
>>  *  This program is distributed in the hope that it will be useful,
>>  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
>>  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>>  *  GNU General Public License for more details.
>>  *
>>  *  You should have received a copy of the GNU General Public License
>>  *  along with this program; if not, write to the Free Software
>>  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
>>  *
>>  *  Davide Libenzi <davidel at xmailserver.org>
>>  *
>>  */
>>
>> struct epoll_test_cfg {
>> 	long size;
>> 	long flags;
>> };
>>
>> static int xepoll_create(int n) {
>> 	int epfd;
>>
>> 	if ((epfd = epoll_create(n)) == -1) {
>> 		perror("epoll_create");
>> 		exit(2);
>> 	}
>>
>> 	return epfd;
>> }
>>
>> static void xepoll_ctl(int epfd, int cmd, int fd, struct epoll_event *evt) {
>> 	if (epoll_ctl(epfd, cmd, fd, evt) < 0) {
>> 		perror("epoll_ctl");
>> 		exit(3);
>> 	}
>> }
>>
>> static void xpipe(int *fds) {
>> 	if (pipe(fds)) {
>> 		perror("pipe");
>> 		exit(4);
>> 	}
>> }
>>
>> static pid_t xfork(void) {
>> 	pid_t pid;
>>
>> 	if ((pid = fork()) == (pid_t) -1) {
>> 		perror("pipe");
>> 		exit(5);
>> 	}
>>
>> 	return pid;
>> }
>>
>> static int run_forked_proc(int (*proc)(void *), void *data) {
>> 	int status;
>> 	pid_t pid;
>>
>> 	if ((pid = xfork()) == 0)
>> 		exit((*proc)(data));
>> 	if (waitpid(pid, &status, 0) != pid) {
>> 		perror("waitpid");
>> 		return -1;
>> 	}
>>
>> 	return WIFEXITED(status) ? WEXITSTATUS(status): -2;
>> }
>>
>> static int check_events(int fd, int timeo) {
>> 	struct pollfd pfd;
>>
>> 	fprintf(stdout, "Checking events for fd %d\n", fd);
>> 	memset(&pfd, 0, sizeof(pfd));
>> 	pfd.fd = fd;
>> 	pfd.events = POLLIN | POLLOUT;
>> 	if (poll(&pfd, 1, timeo) < 0) {
>> 		perror("poll()");
>> 		return 0;
>> 	}
>> 	if (pfd.revents & POLLIN)
>> 		fprintf(stdout, "\tPOLLIN\n");
>> 	if (pfd.revents & POLLOUT)
>> 		fprintf(stdout, "\tPOLLOUT\n");
>> 	if (pfd.revents & POLLERR)
>> 		fprintf(stdout, "\tPOLLERR\n");
>> 	if (pfd.revents & POLLHUP)
>> 		fprintf(stdout, "\tPOLLHUP\n");
>> 	if (pfd.revents & POLLRDHUP)
>> 		fprintf(stdout, "\tPOLLRDHUP\n");
>>
>> 	return pfd.revents;
>> }
>>
>> static int epoll_test_tty(void *data) {
>> 	int epfd, ifd = fileno(stdin), res;
>> 	struct epoll_event evt;
>>
>> 	if (check_events(ifd, 0) != POLLOUT) {
>> 		fprintf(stderr, "Something is cooking on STDIN (%d)\n", ifd);
>> 		return 1;
>> 	}
>> 	epfd = xepoll_create(1);
>> 	fprintf(stdout, "Created epoll fd (%d)\n", epfd);
>> 	memset(&evt, 0, sizeof(evt));
>> 	evt.events = EPOLLIN;
>> 	xepoll_ctl(epfd, EPOLL_CTL_ADD, ifd, &evt);
>> 	if (check_events(epfd, 0) & POLLIN) {
>> 		res = epoll_wait(epfd, &evt, 1, 0);
>> 		if (res == 0) {
>> 			fprintf(stderr, "Epoll fd (%d) is ready when it shouldn't!\n",
>> 				epfd);
>> 			return 2;
>> 		}
>> 	}
>>
>> 	return 0;
>> }
>>
>> static int epoll_wakeup_chain(void *data) {
>> 	struct epoll_test_cfg *tcfg = data;
>> 	int i, res, epfd, bfd, nfd, pfds[2];
>> 	pid_t pid;
>> 	struct epoll_event evt;
>>
>> 	memset(&evt, 0, sizeof(evt));
>> 	evt.events = EPOLLIN;
>>
>> 	epfd = bfd = xepoll_create(1);
>>
>> 	for (i = 0; i < tcfg->size; i++) {
>> 		nfd = xepoll_create(1);
>> 		xepoll_ctl(bfd, EPOLL_CTL_ADD, nfd, &evt);
>> 		bfd = nfd;
>> 	}
>> 	xpipe(pfds);
>> 	if (tcfg->flags & EPOLL_TF_LOOP)
>> 	{
>> 		xepoll_ctl(bfd, EPOLL_CTL_ADD, epfd, &evt);
>> 		/*
>> 		 * If we're testing for loop, we want that the wakeup
>> 		 * triggered by the write to the pipe done in the child
>> 		 * process, triggers a fake event. So we add the pipe
>> 		 * read size with EPOLLOUT events. This will trigger
>> 		 * an addition to the ready-list, but no real events
>> 		 * will be there. The the epoll kernel code will proceed
>> 		 * in calling f_op->poll() of the epfd, triggering the
>> 		 * loop we want to test.
>> 		 */
>> 		evt.events = EPOLLOUT;
>> 	}
>> 	xepoll_ctl(bfd, EPOLL_CTL_ADD, pfds[0], &evt);
>>
>> 	/*
>> 	 * The pipe write must come after the poll(2) call inside
>> 	 * check_events(). This tests the nested wakeup code in
>> 	 * fs/eventpoll.c:ep_poll_safewake()
>> 	 * By having the check_events() (hence poll(2)) happens first,
>> 	 * we have poll wait queue filled up, and the write(2) in the
>> 	 * child will trigger the wakeup chain.
>> 	 */
>> 	if ((pid = xfork()) == 0) {
>> 		sleep(1);
>> 		write(pfds[1], "w", 1);
>> 		exit(0);
>> 	}
>>
>> 	res = check_events(epfd, 2000) & POLLIN;
>>
>> 	if (waitpid(pid, NULL, 0) != pid) {
>> 		perror("waitpid");
>> 		return -1;
>> 	}
>>
>> 	return res;
>> }
>>
>> static int epoll_poll_chain(void *data) {
>> 	struct epoll_test_cfg *tcfg = data;
>> 	int i, res, epfd, bfd, nfd, pfds[2];
>> 	pid_t pid;
>> 	struct epoll_event evt;
>>
>> 	memset(&evt, 0, sizeof(evt));
>> 	evt.events = EPOLLIN;
>>
>> 	epfd = bfd = xepoll_create(1);
>>
>> 	for (i = 0; i < tcfg->size; i++) {
>> 		nfd = xepoll_create(1);
>> 		xepoll_ctl(bfd, EPOLL_CTL_ADD, nfd, &evt);
>> 		bfd = nfd;
>> 	}
>> 	xpipe(pfds);
>> 	if (tcfg->flags & EPOLL_TF_LOOP)
>> 	{
>> 		xepoll_ctl(bfd, EPOLL_CTL_ADD, epfd, &evt);
>> 		/*
>> 		 * If we're testing for loop, we want that the wakeup
>> 		 * triggered by the write to the pipe done in the child
>> 		 * process, triggers a fake event. So we add the pipe
>> 		 * read size with EPOLLOUT events. This will trigger
>> 		 * an addition to the ready-list, but no real events
>> 		 * will be there. The the epoll kernel code will proceed
>> 		 * in calling f_op->poll() of the epfd, triggering the
>> 		 * loop we want to test.
>> 		 */
>> 		evt.events = EPOLLOUT;
>> 	}
>> 	xepoll_ctl(bfd, EPOLL_CTL_ADD, pfds[0], &evt);
>>
>> 	/*
>> 	 * The pipe write mush come before the poll(2) call inside
>> 	 * check_events(). This tests the nested f_op->poll calls code in
>> 	 * fs/eventpoll.c:ep_eventpoll_poll()
>> 	 * By having the pipe write(2) happen first, we make the kernel
>> 	 * epoll code to load the ready lists, and the following poll(2)
>> 	 * done inside check_events() will test nested poll code in
>> 	 * ep_eventpoll_poll().
>> 	 */
>> 	if ((pid = xfork()) == 0) {
>> 		write(pfds[1], "w", 1);
>> 		exit(0);
>> 	}
>> 	sleep(1);
>> 	res = check_events(epfd, 1000) & POLLIN;
>>
>> 	if (waitpid(pid, NULL, 0) != pid) {
>> 		perror("waitpid");
>> 		return -1;
>> 	}
>>
>> 	return res;
>> }
>>
>> int main(int ac, char **av) {
>> 	int error;
>> 	struct epoll_test_cfg tcfg;
>>
>> 	fprintf(stdout, "\n********** Testing TTY events\n");
>> 	error = run_forked_proc(epoll_test_tty, NULL);
>> 	fprintf(stdout, error == 0 ?
>> 		"********** OK\n": "********** FAIL (%d)\n", error);
>>
>> 	tcfg.size = 3;
>> 	tcfg.flags = 0;
>> 	fprintf(stdout, "\n********** Testing short wakeup chain\n");
>> 	error = run_forked_proc(epoll_wakeup_chain, &tcfg);
>> 	fprintf(stdout, error == POLLIN ?
>> 		"********** OK\n": "********** FAIL (%d)\n", error);
>>
>> 	tcfg.size = EPOLL_MAX_CHAIN;
>> 	tcfg.flags = 0;
>> 	fprintf(stdout, "\n********** Testing long wakeup chain (HOLD ON)\n");
>> 	error = run_forked_proc(epoll_wakeup_chain, &tcfg);
>> 	fprintf(stdout, error == 0 ?
>> 		"********** OK\n": "********** FAIL (%d)\n", error);
>>
>> 	tcfg.size = 3;
>> 	tcfg.flags = 0;
>> 	fprintf(stdout, "\n********** Testing short poll chain\n");
>> 	error = run_forked_proc(epoll_poll_chain, &tcfg);
>> 	fprintf(stdout, error == POLLIN ?
>> 		"********** OK\n": "********** FAIL (%d)\n", error);
>>
>> 	tcfg.size = EPOLL_MAX_CHAIN;
>> 	tcfg.flags = 0;
>> 	fprintf(stdout, "\n********** Testing long poll chain (HOLD ON)\n");
>> 	error = run_forked_proc(epoll_poll_chain, &tcfg);
>> 	fprintf(stdout, error == 0 ?
>> 		"********** OK\n": "********** FAIL (%d)\n", error);
>>
>> 	tcfg.size = 3;
>> 	tcfg.flags = EPOLL_TF_LOOP;
>> 	fprintf(stdout, "\n********** Testing loopy wakeup chain (HOLD ON)\n");
>> 	error = run_forked_proc(epoll_wakeup_chain, &tcfg);
>> 	fprintf(stdout, error == 0 ?
>> 		"********** OK\n": "********** FAIL (%d)\n", error);
>>
>> 	tcfg.size = 3;
>> 	tcfg.flags = EPOLL_TF_LOOP;
>> 	fprintf(stdout, "\n********** Testing loopy poll chain (HOLD ON)\n");
>> 	error = run_forked_proc(epoll_poll_chain, &tcfg);
>> 	fprintf(stdout, error == 0 ?
>> 		"********** OK\n": "********** FAIL (%d)\n", error);
>>
>> 	return 0;
>> }
>>
>> Signed-off-by: Davide Libenzi <davidel at xmailserver.org>
>> Cc: Pavel Pisa <pisa at cmp.felk.cvut.cz>
>> Signed-off-by: Andrew Morton <akpm at linux-foundation.org>
>> Signed-off-by: Linus Torvalds <torvalds at linux-foundation.org>
>>
>> commit 5071f97ec6d74f006072de0ce89b67c8792fe5a1 upstream
>>
>> Signed-off-by: Paolo Pisati <paolo.pisati at canonical.com>
>> ---
>>  fs/eventpoll.c |  461 ++++++++++++++++++++++++++++++++++----------------------
>>  1 files changed, 279 insertions(+), 182 deletions(-)
>>
>> diff --git a/fs/eventpoll.c b/fs/eventpoll.c
>> index d86c0c9..fe950e6 100644
>> --- a/fs/eventpoll.c
>> +++ b/fs/eventpoll.c
>> @@ -1,6 +1,6 @@
>>  /*
>> - *  fs/eventpoll.c (Efficent event polling implementation)
>> - *  Copyright (C) 2001,...,2007	 Davide Libenzi
>> + *  fs/eventpoll.c (Efficient event retrieval implementation)
>> + *  Copyright (C) 2001,...,2009	 Davide Libenzi
>>   *
>>   *  This program is free software; you can redistribute it and/or modify
>>   *  it under the terms of the GNU General Public License as published by
>> @@ -92,8 +92,8 @@
>>  /* Epoll private bits inside the event mask */
>>  #define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
>>  
>> -/* Maximum number of poll wake up nests we are allowing */
>> -#define EP_MAX_POLLWAKE_NESTS 4
>> +/* Maximum number of nesting allowed inside epoll sets */
>> +#define EP_MAX_NESTS 4
>>  
>>  /* Maximum msec timeout value storeable in a long int */
>>  #define EP_MAX_MSTIMEO min(1000ULL * MAX_SCHEDULE_TIMEOUT / HZ, (LONG_MAX - 999ULL) / HZ)
>> @@ -108,24 +108,21 @@ struct epoll_filefd {
>>  };
>>  
>>  /*
>> - * Node that is linked into the "wake_task_list" member of the "struct poll_safewake".
>> - * It is used to keep track on all tasks that are currently inside the wake_up() code
>> - * to 1) short-circuit the one coming from the same task and same wait queue head
>> - * (loop) 2) allow a maximum number of epoll descriptors inclusion nesting
>> - * 3) let go the ones coming from other tasks.
>> + * Structure used to track possible nested calls, for too deep recursions
>> + * and loop cycles.
>>   */
>> -struct wake_task_node {
>> +struct nested_call_node {
>>  	struct list_head llink;
>>  	struct task_struct *task;
>> -	wait_queue_head_t *wq;
>> +	void *cookie;
>>  };
>>  
>>  /*
>> - * This is used to implement the safe poll wake up avoiding to reenter
>> - * the poll callback from inside wake_up().
>> + * This structure is used as collector for nested calls, to check for
>> + * maximum recursion dept and loop cycles.
>>   */
>> -struct poll_safewake {
>> -	struct list_head wake_task_list;
>> +struct nested_calls {
>> +	struct list_head tasks_call_list;
>>  	spinlock_t lock;
>>  };
>>  
>> @@ -226,13 +223,22 @@ struct ep_pqueue {
>>  	struct epitem *epi;
>>  };
>>  
>> +/* Used by the ep_send_events() function as callback private data */
>> +struct ep_send_events_data {
>> +	int maxevents;
>> +	struct epoll_event __user *events;
>> +};
>> +
>>  /*
>>   * This mutex is used to serialize ep_free() and eventpoll_release_file().
>>   */
>>  static struct mutex epmutex;
>>  
>> -/* Safe wake up implementation */
>> -static struct poll_safewake psw;
>> +/* Used for safe wake up implementation */
>> +static struct nested_calls poll_safewake_ncalls;
>> +
>> +/* Used to call file's f_op->poll() under the nested calls boundaries */
>> +static struct nested_calls poll_readywalk_ncalls;
>>  
>>  /* Slab cache used to allocate "struct epitem" */
>>  static struct kmem_cache *epi_cache __read_mostly;
>> @@ -301,64 +307,96 @@ static inline int ep_op_has_event(int op)
>>  }
>>  
>>  /* Initialize the poll safe wake up structure */
>> -static void ep_poll_safewake_init(struct poll_safewake *psw)
>> +static void ep_nested_calls_init(struct nested_calls *ncalls)
>>  {
>> -
>> -	INIT_LIST_HEAD(&psw->wake_task_list);
>> -	spin_lock_init(&psw->lock);
>> +	INIT_LIST_HEAD(&ncalls->tasks_call_list);
>> +	spin_lock_init(&ncalls->lock);
>>  }
>>  
>> -/*
>> - * Perform a safe wake up of the poll wait list. The problem is that
>> - * with the new callback'd wake up system, it is possible that the
>> - * poll callback is reentered from inside the call to wake_up() done
>> - * on the poll wait queue head. The rule is that we cannot reenter the
>> - * wake up code from the same task more than EP_MAX_POLLWAKE_NESTS times,
>> - * and we cannot reenter the same wait queue head at all. This will
>> - * enable to have a hierarchy of epoll file descriptor of no more than
>> - * EP_MAX_POLLWAKE_NESTS deep. We need the irq version of the spin lock
>> - * because this one gets called by the poll callback, that in turn is called
>> - * from inside a wake_up(), that might be called from irq context.
>> +/**
>> + * ep_call_nested - Perform a bound (possibly) nested call, by checking
>> + *                  that the recursion limit is not exceeded, and that
>> + *                  the same nested call (by the meaning of same cookie) is
>> + *                  no re-entered.
>> + *
>> + * @ncalls: Pointer to the nested_calls structure to be used for this call.
>> + * @max_nests: Maximum number of allowed nesting calls.
>> + * @nproc: Nested call core function pointer.
>> + * @priv: Opaque data to be passed to the @nproc callback.
>> + * @cookie: Cookie to be used to identify this nested call.
>> + *
>> + * Returns: Returns the code returned by the @nproc callback, or -1 if
>> + *          the maximum recursion limit has been exceeded.
>>   */
>> -static void ep_poll_safewake(struct poll_safewake *psw, wait_queue_head_t *wq)
>> +static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
>> +			  int (*nproc)(void *, void *, int), void *priv,
>> +			  void *cookie)
>>  {
>> -	int wake_nests = 0;
>> +	int error, call_nests = 0;
>>  	unsigned long flags;
>>  	struct task_struct *this_task = current;
>> -	struct list_head *lsthead = &psw->wake_task_list;
>> -	struct wake_task_node *tncur;
>> -	struct wake_task_node tnode;
>> +	struct list_head *lsthead = &ncalls->tasks_call_list;
>> +	struct nested_call_node *tncur;
>> +	struct nested_call_node tnode;
>>  
>> -	spin_lock_irqsave(&psw->lock, flags);
>> +	spin_lock_irqsave(&ncalls->lock, flags);
>>  
>> -	/* Try to see if the current task is already inside this wakeup call */
>> +	/*
>> +	 * Try to see if the current task is already inside this wakeup call.
>> +	 * We use a list here, since the population inside this set is always
>> +	 * very much limited.
>> +	 */
>>  	list_for_each_entry(tncur, lsthead, llink) {
>> -
>> -		if (tncur->wq == wq ||
>> -		    (tncur->task == this_task && ++wake_nests > EP_MAX_POLLWAKE_NESTS)) {
>> +		if (tncur->task == this_task &&
>> +		    (tncur->cookie == cookie || ++call_nests > max_nests)) {
>>  			/*
>>  			 * Ops ... loop detected or maximum nest level reached.
>>  			 * We abort this wake by breaking the cycle itself.
>>  			 */
>> -			spin_unlock_irqrestore(&psw->lock, flags);
>> -			return;
>> +			spin_unlock_irqrestore(&ncalls->lock, flags);
>> +
>> +			return -1;
>>  		}
>>  	}
>>  
>> -	/* Add the current task to the list */
>> +	/* Add the current task and cookie to the list */
>>  	tnode.task = this_task;
>> -	tnode.wq = wq;
>> +	tnode.cookie = cookie;
>>  	list_add(&tnode.llink, lsthead);
>>  
>> -	spin_unlock_irqrestore(&psw->lock, flags);
>> +	spin_unlock_irqrestore(&ncalls->lock, flags);
>>  
>> -	/* Do really wake up now */
>> -	wake_up_nested(wq, 1 + wake_nests);
>> +	/* Call the nested function */
>> +	error = (*nproc)(priv, cookie, call_nests);
>>  
>>  	/* Remove the current task from the list */
>> -	spin_lock_irqsave(&psw->lock, flags);
>> +	spin_lock_irqsave(&ncalls->lock, flags);
>>  	list_del(&tnode.llink);
>> -	spin_unlock_irqrestore(&psw->lock, flags);
>> +	spin_unlock_irqrestore(&ncalls->lock, flags);
>> +
>> +	return error;
>> +}
>> +
>> +static int ep_poll_wakeup_proc(void *priv, void *cookie, int call_nests)
>> +{
>> +	wake_up_nested((wait_queue_head_t *) cookie, 1 + call_nests);
>> +	return 0;
>> +}
>> +
>> +/*
>> + * Perform a safe wake up of the poll wait list. The problem is that
>> + * with the new callback'd wake up system, it is possible that the
>> + * poll callback is reentered from inside the call to wake_up() done
>> + * on the poll wait queue head. The rule is that we cannot reenter the
>> + * wake up code from the same task more than EP_MAX_NESTS times,
>> + * and we cannot reenter the same wait queue head at all. This will
>> + * enable to have a hierarchy of epoll file descriptor of no more than
>> + * EP_MAX_NESTS deep.
>> + */
>> +static void ep_poll_safewake(wait_queue_head_t *wq)
>> +{
>> +	ep_call_nested(&poll_safewake_ncalls, EP_MAX_NESTS,
>> +		       ep_poll_wakeup_proc, NULL, wq);
>>  }
>>  
>>  /*
>> @@ -386,6 +424,104 @@ static void ep_unregister_pollwait(struct eventpoll *ep, struct epitem *epi)
>>  	}
>>  }
>>  
>> +/**
>> + * ep_scan_ready_list - Scans the ready list in a way that makes possible for
>> + *                      the scan code, to call f_op->poll(). Also allows for
>> + *                      O(NumReady) performance.
>> + *
>> + * @ep: Pointer to the epoll private data structure.
>> + * @sproc: Pointer to the scan callback.
>> + * @priv: Private opaque data passed to the @sproc callback.
>> + *
>> + * Returns: The same integer error code returned by the @sproc callback.
>> + */
>> +static int ep_scan_ready_list(struct eventpoll *ep,
>> +			      int (*sproc)(struct eventpoll *,
>> +					   struct list_head *, void *),
>> +			      void *priv)
>> +{
>> +	int error, pwake = 0;
>> +	unsigned long flags;
>> +	struct epitem *epi, *nepi;
>> +	struct list_head txlist;
>> +
>> +	INIT_LIST_HEAD(&txlist);
>> +
>> +	/*
>> +	 * We need to lock this because we could be hit by
>> +	 * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).
>> +	 */
>> +	mutex_lock(&ep->mtx);
>> +
>> +	/*
>> +	 * Steal the ready list, and re-init the original one to the
>> +	 * empty list. Also, set ep->ovflist to NULL so that events
>> +	 * happening while looping w/out locks, are not lost. We cannot
>> +	 * have the poll callback to queue directly on ep->rdllist,
>> +	 * because we want the "sproc" callback to be able to do it
>> +	 * in a lockless way.
>> +	 */
>> +	spin_lock_irqsave(&ep->lock, flags);
>> +	list_splice(&ep->rdllist, &txlist);
>> +	INIT_LIST_HEAD(&ep->rdllist);
>> +	ep->ovflist = NULL;
>> +	spin_unlock_irqrestore(&ep->lock, flags);
>> +
>> +	/*
>> +	 * Now call the callback function.
>> +	 */
>> +	error = (*sproc)(ep, &txlist, priv);
>> +
>> +	spin_lock_irqsave(&ep->lock, flags);
>> +	/*
>> +	 * During the time we spent inside the "sproc" callback, some
>> +	 * other events might have been queued by the poll callback.
>> +	 * We re-insert them inside the main ready-list here.
>> +	 */
>> +	for (nepi = ep->ovflist; (epi = nepi) != NULL;
>> +	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
>> +		/*
>> +		 * We need to check if the item is already in the list.
>> +		 * During the "sproc" callback execution time, items are
>> +		 * queued into ->ovflist but the "txlist" might already
>> +		 * contain them, and the list_splice() below takes care of them.
>> +		 */
>> +		if (!ep_is_linked(&epi->rdllink))
>> +			list_add_tail(&epi->rdllink, &ep->rdllist);
>> +	}
>> +	/*
>> +	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
>> +	 * releasing the lock, events will be queued in the normal way inside
>> +	 * ep->rdllist.
>> +	 */
>> +	ep->ovflist = EP_UNACTIVE_PTR;
>> +
>> +	/*
>> +	 * Quickly re-inject items left on "txlist".
>> +	 */
>> +	list_splice(&txlist, &ep->rdllist);
>> +
>> +	if (!list_empty(&ep->rdllist)) {
>> +		/*
>> +		 * Wake up (if active) both the eventpoll wait list and the ->poll()
>> +		 * wait list (delayed after we release the lock).
>> +		 */
>> +		if (waitqueue_active(&ep->wq))
>> +			wake_up_locked(&ep->wq);
>> +		if (waitqueue_active(&ep->poll_wait))
>> +			pwake++;
>> +	}
>> +	spin_unlock_irqrestore(&ep->lock, flags);
>> +
>> +	mutex_unlock(&ep->mtx);
>> +
>> +	/* We have to call this outside the lock */
>> +	if (pwake)
>> +		ep_poll_safewake(&ep->poll_wait);
>> +
>> +	return error;
>> +}
>> +
>>  /*
>>   * Removes a "struct epitem" from the eventpoll RB tree and deallocates
>>   * all the associated resources. Must be called with "mtx" held.
>> @@ -435,7 +571,7 @@ static void ep_free(struct eventpoll *ep)
>>  
>>  	/* We need to release all tasks waiting for these file */
>>  	if (waitqueue_active(&ep->poll_wait))
>> -		ep_poll_safewake(&psw, &ep->poll_wait);
>> +		ep_poll_safewake(&ep->poll_wait);
>>  
>>  	/*
>>  	 * We need to lock this because we could be hit by
>> @@ -483,22 +619,49 @@ static int ep_eventpoll_release(struct inode *inode, struct file *file)
>>  	return 0;
>>  }
>>  
>> +static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
>> +{
>> +	struct epitem *epi, *tmp;
>> +
>> +	list_for_each_entry_safe(epi, tmp, head, rdllink) {
>> +		if (epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
>> +		    epi->event.events)
>> +			return POLLIN | POLLRDNORM;
>> +		else
>> +			/*
>> +			 * Item has been dropped into the ready list by the poll
>> +			 * callback, but it's not actually ready, as far as
>> +			 * caller requested events goes. We can remove it here.
>> +			 */
>> +			list_del_init(&epi->rdllink);
>> +	}
>> +
>> +	return 0;
>> +}
>> +
>> +static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests)
>> +{
>> +	return ep_scan_ready_list(priv, ep_read_events_proc, NULL);
>> +}
>> +
>>  static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
>>  {
>> -	unsigned int pollflags = 0;
>> -	unsigned long flags;
>> +	int pollflags;
>>  	struct eventpoll *ep = file->private_data;
>>  
>>  	/* Insert inside our poll wait queue */
>>  	poll_wait(file, &ep->poll_wait, wait);
>>  
>> -	/* Check our condition */
>> -	spin_lock_irqsave(&ep->lock, flags);
>> -	if (!list_empty(&ep->rdllist))
>> -		pollflags = POLLIN | POLLRDNORM;
>> -	spin_unlock_irqrestore(&ep->lock, flags);
>> +	/*
>> +	 * Proceed to find out if wanted events are really available inside
>> +	 * the ready list. This need to be done under ep_call_nested()
>> +	 * supervision, since the call to f_op->poll() done on listed files
>> +	 * could re-enter here.
>> +	 */
>> +	pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
>> +				   ep_poll_readyevents_proc, ep, ep);
>>  
>> -	return pollflags;
>> +	return pollflags != -1 ? pollflags: 0;
>>  }
>>  
>>  /* File callbacks that implement the eventpoll file behaviour */
>> @@ -528,7 +691,7 @@ void eventpoll_release_file(struct file *file)
>>  	 * We don't want to get "file->f_ep_lock" because it is not
>>  	 * necessary. It is not necessary because we're in the "struct file"
>>  	 * cleanup path, and this means that noone is using this file anymore.
>> -	 * So, for example, epoll_ctl() cannot hit here sicne if we reach this
>> +	 * So, for example, epoll_ctl() cannot hit here since if we reach this
>>  	 * point, the file counter already went to zero and fget() would fail.
>>  	 * The only hit might come from ep_free() but by holding the mutex
>>  	 * will correctly serialize the operation. We do need to acquire
>> @@ -645,12 +808,9 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *k
>>  	}
>>  
>>  	/* If this file is already in the ready list we exit soon */
>> -	if (ep_is_linked(&epi->rdllink))
>> -		goto is_linked;
>> -
>> -	list_add_tail(&epi->rdllink, &ep->rdllist);
>> +	if (!ep_is_linked(&epi->rdllink))
>> +		list_add_tail(&epi->rdllink, &ep->rdllist);
>>  
>> -is_linked:
>>  	/*
>>  	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
>>  	 * wait list.
>> @@ -666,7 +826,7 @@ out_unlock:
>>  
>>  	/* We have to call this outside the lock */
>>  	if (pwake)
>> -		ep_poll_safewake(&psw, &ep->poll_wait);
>> +		ep_poll_safewake(&ep->poll_wait);
>>  
>>  	return 1;
>>  }
>> @@ -688,10 +848,9 @@ static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
>>  		add_wait_queue(whead, &pwq->wait);
>>  		list_add_tail(&pwq->llink, &epi->pwqlist);
>>  		epi->nwait++;
>> -	} else {
>> +	} else
>>  		/* We have to signal that an error occurred */
>>  		epi->nwait = -1;
>> -	}
>>  }
>>  
>>  static void ep_rbtree_insert(struct eventpoll *ep, struct epitem *epi)
>> @@ -789,7 +948,7 @@ static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
>>  
>>  	/* We have to call this outside the lock */
>>  	if (pwake)
>> -		ep_poll_safewake(&psw, &ep->poll_wait);
>> +		ep_poll_safewake(&ep->poll_wait);
>>  
>>  	DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_insert(%p, %p, %d)\n",
>>  		     current, ep, tfile, fd));
>> @@ -864,138 +1023,74 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_even
>>  
>>  	/* We have to call this outside the lock */
>>  	if (pwake)
>> -		ep_poll_safewake(&psw, &ep->poll_wait);
>> +		ep_poll_safewake(&ep->poll_wait);
>>  
>>  	return 0;
>>  }
>>  
>> -static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events,
>> -			  int maxevents)
>> +static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
>>  {
>> -	int eventcnt, error = -EFAULT, pwake = 0;
>> +	struct ep_send_events_data *esed = priv;
>> +	int eventcnt;
>>  	unsigned int revents;
>> -	unsigned long flags;
>> -	struct epitem *epi, *nepi;
>> -	struct list_head txlist;
>> -
>> -	INIT_LIST_HEAD(&txlist);
>> -
>> -	/*
>> -	 * We need to lock this because we could be hit by
>> -	 * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).
>> -	 */
>> -	mutex_lock(&ep->mtx);
>> -
>> -	/*
>> -	 * Steal the ready list, and re-init the original one to the
>> -	 * empty list. Also, set ep->ovflist to NULL so that events
>> -	 * happening while looping w/out locks, are not lost. We cannot
>> -	 * have the poll callback to queue directly on ep->rdllist,
>> -	 * because we are doing it in the loop below, in a lockless way.
>> -	 */
>> -	spin_lock_irqsave(&ep->lock, flags);
>> -	list_splice(&ep->rdllist, &txlist);
>> -	INIT_LIST_HEAD(&ep->rdllist);
>> -	ep->ovflist = NULL;
>> -	spin_unlock_irqrestore(&ep->lock, flags);
>> +	struct epitem *epi;
>> +	struct epoll_event __user *uevent;
>>  
>>  	/*
>> -	 * We can loop without lock because this is a task private list.
>> -	 * We just splice'd out the ep->rdllist in ep_collect_ready_items().
>> -	 * Items cannot vanish during the loop because we are holding "mtx".
>> +	 * We can loop without lock because we are passed a task private list.
>> +	 * Items cannot vanish during the loop because ep_scan_ready_list() is
>> +	 * holding "mtx" during this call.
>>  	 */
>> -	for (eventcnt = 0; !list_empty(&txlist) && eventcnt < maxevents;) {
>> -		epi = list_first_entry(&txlist, struct epitem, rdllink);
>> +	for (eventcnt = 0, uevent = esed->events;
>> +	     !list_empty(head) && eventcnt < esed->maxevents;) {
>> +		epi = list_first_entry(head, struct epitem, rdllink);
>>  
>>  		list_del_init(&epi->rdllink);
>>  
>> -		/*
>> -		 * Get the ready file event set. We can safely use the file
>> -		 * because we are holding the "mtx" and this will guarantee
>> -		 * that both the file and the item will not vanish.
>> -		 */
>> -		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
>> -		revents &= epi->event.events;
>> +		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
>> +			epi->event.events;
>>  
>>  		/*
>> -		 * Is the event mask intersect the caller-requested one,
>> -		 * deliver the event to userspace. Again, we are holding
>> -		 * "mtx", so no operations coming from userspace can change
>> -		 * the item.
>> +		 * If the event mask intersect the caller-requested one,
>> +		 * deliver the event to userspace. Again, ep_scan_ready_list()
>> +		 * is holding "mtx", so no operations coming from userspace
>> +		 * can change the item.
>>  		 */
>>  		if (revents) {
>> -			if (__put_user(revents,
>> -				       &events[eventcnt].events) ||
>> -			    __put_user(epi->event.data,
>> -				       &events[eventcnt].data))
>> -				goto errxit;
>> +			if (__put_user(revents, &uevent->events) ||
>> +			    __put_user(epi->event.data, &uevent->data))
>> +				return eventcnt ? eventcnt: -EFAULT;
>> +			eventcnt++;
>> +			uevent++;
>>  			if (epi->event.events & EPOLLONESHOT)
>>  				epi->event.events &= EP_PRIVATE_BITS;
>> -			eventcnt++;
>> +			else if (!(epi->event.events & EPOLLET))
>> +				/*
>> +				 * If this file has been added with Level Trigger
>> +				 * mode, we need to insert back inside the ready
>> +				 * list, so that the next call to epoll_wait()
>> +				 * will check again the events availability.
>> +				 * At this point, noone can insert into ep->rdllist
>> +				 * besides us. The epoll_ctl() callers are locked
>> +				 * out by ep_scan_ready_list() holding "mtx" and
>> +				 * the poll callback will queue them in ep->ovflist.
>> +				 */
>> +				list_add_tail(&epi->rdllink, &ep->rdllist);
>>  		}
>> -		/*
>> -		 * At this point, noone can insert into ep->rdllist besides
>> -		 * us. The epoll_ctl() callers are locked out by us holding
>> -		 * "mtx" and the poll callback will queue them in ep->ovflist.
>> -		 */
>> -		if (!(epi->event.events & EPOLLET) &&
>> -		    (revents & epi->event.events))
>> -			list_add_tail(&epi->rdllink, &ep->rdllist);
>>  	}
>> -	error = 0;
>> -
>> -errxit:
>> -
>> -	spin_lock_irqsave(&ep->lock, flags);
>> -	/*
>> -	 * During the time we spent in the loop above, some other events
>> -	 * might have been queued by the poll callback. We re-insert them
>> -	 * inside the main ready-list here.
>> -	 */
>> -	for (nepi = ep->ovflist; (epi = nepi) != NULL;
>> -	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
>> -		/*
>> -		 * If the above loop quit with errors, the epoll item might still
>> -		 * be linked to "txlist", and the list_splice() done below will
>> -		 * take care of those cases.
>> -		 */
>> -		if (!ep_is_linked(&epi->rdllink))
>> -			list_add_tail(&epi->rdllink, &ep->rdllist);
>> -	}
>> -	/*
>> -	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
>> -	 * releasing the lock, events will be queued in the normal way inside
>> -	 * ep->rdllist.
>> -	 */
>> -	ep->ovflist = EP_UNACTIVE_PTR;
>>  
>> -	/*
>> -	 * In case of error in the event-send loop, or in case the number of
>> -	 * ready events exceeds the userspace limit, we need to splice the
>> -	 * "txlist" back inside ep->rdllist.
>> -	 */
>> -	list_splice(&txlist, &ep->rdllist);
>> -
>> -	if (!list_empty(&ep->rdllist)) {
>> -		/*
>> -		 * Wake up (if active) both the eventpoll wait list and the ->poll()
>> -		 * wait list (delayed after we release the lock).
>> -		 */
>> -		if (waitqueue_active(&ep->wq))
>> -			__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
>> -					 TASK_INTERRUPTIBLE);
>> -		if (waitqueue_active(&ep->poll_wait))
>> -			pwake++;
>> -	}
>> -	spin_unlock_irqrestore(&ep->lock, flags);
>> +	return eventcnt;
>> +}
>>  
>> -	mutex_unlock(&ep->mtx);
>> +static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events,
>> +			  int maxevents)
>> +{
>> +	struct ep_send_events_data esed;
>>  
>> -	/* We have to call this outside the lock */
>> -	if (pwake)
>> -		ep_poll_safewake(&psw, &ep->poll_wait);
>> +	esed.maxevents = maxevents;
>> +	esed.events = events;
>>  
>> -	return eventcnt == 0 ? error: eventcnt;
>> +	return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
>>  }
>>  
>>  static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
>> @@ -1007,7 +1102,7 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
>>  	wait_queue_t wait;
>>  
>>  	/*
>> -	 * Calculate the timeout by checking for the "infinite" value ( -1 )
>> +	 * Calculate the timeout by checking for the "infinite" value (-1)
>>  	 * and the overflow condition. The passed timeout is in milliseconds,
>>  	 * that why (t * HZ) / 1000.
>>  	 */
>> @@ -1050,9 +1145,8 @@ retry:
>>  
>>  		set_current_state(TASK_RUNNING);
>>  	}
>> -
>>  	/* Is it worth to try to dig for events ? */
>> -	eavail = !list_empty(&ep->rdllist);
>> +	eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
>>  
>>  	spin_unlock_irqrestore(&ep->lock, flags);
>>  
>> @@ -1322,7 +1416,10 @@ static int __init eventpoll_init(void)
>>  	mutex_init(&epmutex);
>>  
>>  	/* Initialize the structure used to perform safe poll wait head wake ups */
>> -	ep_poll_safewake_init(&psw);
>> +	ep_nested_calls_init(&poll_safewake_ncalls);
>> +
>> +	/* Initialize the structure used to perform file's f_op->poll() calls */
>> +	ep_nested_calls_init(&poll_readywalk_ncalls);
>>  
>>  	/* Allocates slab cache used to allocate "struct epitem" items */
>>  	epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
> 
> Somehow I am not sure whether there isn't a hunk missing for epoll_create...
> Need to look a bit closer...
> 
@@ -1099,41 +1194,40 @@ retry:
  */
 SYSCALL_DEFINE1(epoll_create1, int, flags)
 {
-       int error, fd = -1;
-       struct eventpoll *ep;
+       int error;
+       struct eventpoll *ep = NULL;

        /* Check the EPOLL_* constant for consistency.  */
        BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

-       if (flags & ~EPOLL_CLOEXEC)
-               return -EINVAL;
-
        DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)\n",
                     current, flags));

+       error = -EINVAL
+       if (flags & ~EPOLL_CLOEXEC)
+               goto error_return;
+
        /*
-        * Create the internal data structure ( "struct eventpoll" ).
+        * Create the internal data structure ("struct eventpoll").
         */
        error = ep_alloc(&ep);
-       if (error < 0) {
-               fd = error;
+       if (error < 0)
                goto error_return;
-       }

        /*
         * Creates all the items needed to setup an eventpoll file. That is,
         * a file structure and a free file descriptor.
         */
-       fd = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
-                             flags & O_CLOEXEC);
-       if (fd < 0)
+       error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
+                                flags & O_CLOEXEC);
+       if (error < 0)
                ep_free(ep);

 error_return:
        DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n",
-                    current, flags, fd));
+                    current, flags, error));

-       return fd;
+       return error;
 }

 SYSCALL_DEFINE1(epoll_create, int, size)

The hunk above is in the upstream patch and giving it only a quick look, seems
to be close enough to go into sys_epoll_create.
We did not backport the whole of the syscall rewrites into hardy as the affected
architectures were not the ones primarily supported by us. And of course that
change was a PITA to backport (into Dapper).




More information about the kernel-team mailing list