1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * linux/net/sunrpc/sched.c
4   *
5   * Scheduling for synchronous and asynchronous RPC requests.
6   *
7   * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
8   *
9   * TCP NFS related read + write fixes
10   * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
11   */
12  
13  #include <linux/module.h>
14  
15  #include <linux/sched.h>
16  #include <linux/interrupt.h>
17  #include <linux/slab.h>
18  #include <linux/mempool.h>
19  #include <linux/smp.h>
20  #include <linux/spinlock.h>
21  #include <linux/mutex.h>
22  #include <linux/freezer.h>
23  #include <linux/sched/mm.h>
24  
25  #include <linux/sunrpc/clnt.h>
26  #include <linux/sunrpc/metrics.h>
27  
28  #include "sunrpc.h"
29  
30  #define CREATE_TRACE_POINTS
31  #include <trace/events/sunrpc.h>
32  
33  /*
34   * RPC slabs and memory pools
35   */
36  #define RPC_BUFFER_MAXSIZE	(2048)
37  #define RPC_BUFFER_POOLSIZE	(8)
38  #define RPC_TASK_POOLSIZE	(8)
39  static struct kmem_cache	*rpc_task_slabp __read_mostly;
40  static struct kmem_cache	*rpc_buffer_slabp __read_mostly;
41  static mempool_t	*rpc_task_mempool __read_mostly;
42  static mempool_t	*rpc_buffer_mempool __read_mostly;
43  
44  static void			rpc_async_schedule(struct work_struct *);
45  static void			 rpc_release_task(struct rpc_task *task);
46  static void __rpc_queue_timer_fn(struct work_struct *);
47  
48  /*
49   * RPC tasks sit here while waiting for conditions to improve.
50   */
51  static struct rpc_wait_queue delay_queue;
52  
53  /*
54   * rpciod-related stuff
55   */
56  struct workqueue_struct *rpciod_workqueue __read_mostly;
57  struct workqueue_struct *xprtiod_workqueue __read_mostly;
58  EXPORT_SYMBOL_GPL(xprtiod_workqueue);
59  
rpc_task_gfp_mask(void)60  gfp_t rpc_task_gfp_mask(void)
61  {
62  	if (current->flags & PF_WQ_WORKER)
63  		return GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
64  	return GFP_KERNEL;
65  }
66  EXPORT_SYMBOL_GPL(rpc_task_gfp_mask);
67  
rpc_task_set_rpc_status(struct rpc_task * task,int rpc_status)68  bool rpc_task_set_rpc_status(struct rpc_task *task, int rpc_status)
69  {
70  	if (cmpxchg(&task->tk_rpc_status, 0, rpc_status) == 0)
71  		return true;
72  	return false;
73  }
74  
75  unsigned long
rpc_task_timeout(const struct rpc_task * task)76  rpc_task_timeout(const struct rpc_task *task)
77  {
78  	unsigned long timeout = READ_ONCE(task->tk_timeout);
79  
80  	if (timeout != 0) {
81  		unsigned long now = jiffies;
82  		if (time_before(now, timeout))
83  			return timeout - now;
84  	}
85  	return 0;
86  }
87  EXPORT_SYMBOL_GPL(rpc_task_timeout);
88  
89  /*
90   * Disable the timer for a given RPC task. Should be called with
91   * queue->lock and bh_disabled in order to avoid races within
92   * rpc_run_timer().
93   */
94  static void
__rpc_disable_timer(struct rpc_wait_queue * queue,struct rpc_task * task)95  __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
96  {
97  	if (list_empty(&task->u.tk_wait.timer_list))
98  		return;
99  	task->tk_timeout = 0;
100  	list_del(&task->u.tk_wait.timer_list);
101  	if (list_empty(&queue->timer_list.list))
102  		cancel_delayed_work(&queue->timer_list.dwork);
103  }
104  
105  static void
rpc_set_queue_timer(struct rpc_wait_queue * queue,unsigned long expires)106  rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
107  {
108  	unsigned long now = jiffies;
109  	queue->timer_list.expires = expires;
110  	if (time_before_eq(expires, now))
111  		expires = 0;
112  	else
113  		expires -= now;
114  	mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
115  }
116  
117  /*
118   * Set up a timer for the current task.
119   */
120  static void
__rpc_add_timer(struct rpc_wait_queue * queue,struct rpc_task * task,unsigned long timeout)121  __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task,
122  		unsigned long timeout)
123  {
124  	task->tk_timeout = timeout;
125  	if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
126  		rpc_set_queue_timer(queue, timeout);
127  	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
128  }
129  
rpc_set_waitqueue_priority(struct rpc_wait_queue * queue,int priority)130  static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
131  {
132  	if (queue->priority != priority) {
133  		queue->priority = priority;
134  		queue->nr = 1U << priority;
135  	}
136  }
137  
rpc_reset_waitqueue_priority(struct rpc_wait_queue * queue)138  static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
139  {
140  	rpc_set_waitqueue_priority(queue, queue->maxpriority);
141  }
142  
143  /*
144   * Add a request to a queue list
145   */
146  static void
__rpc_list_enqueue_task(struct list_head * q,struct rpc_task * task)147  __rpc_list_enqueue_task(struct list_head *q, struct rpc_task *task)
148  {
149  	struct rpc_task *t;
150  
151  	list_for_each_entry(t, q, u.tk_wait.list) {
152  		if (t->tk_owner == task->tk_owner) {
153  			list_add_tail(&task->u.tk_wait.links,
154  					&t->u.tk_wait.links);
155  			/* Cache the queue head in task->u.tk_wait.list */
156  			task->u.tk_wait.list.next = q;
157  			task->u.tk_wait.list.prev = NULL;
158  			return;
159  		}
160  	}
161  	INIT_LIST_HEAD(&task->u.tk_wait.links);
162  	list_add_tail(&task->u.tk_wait.list, q);
163  }
164  
165  /*
166   * Remove request from a queue list
167   */
168  static void
__rpc_list_dequeue_task(struct rpc_task * task)169  __rpc_list_dequeue_task(struct rpc_task *task)
170  {
171  	struct list_head *q;
172  	struct rpc_task *t;
173  
174  	if (task->u.tk_wait.list.prev == NULL) {
175  		list_del(&task->u.tk_wait.links);
176  		return;
177  	}
178  	if (!list_empty(&task->u.tk_wait.links)) {
179  		t = list_first_entry(&task->u.tk_wait.links,
180  				struct rpc_task,
181  				u.tk_wait.links);
182  		/* Assume __rpc_list_enqueue_task() cached the queue head */
183  		q = t->u.tk_wait.list.next;
184  		list_add_tail(&t->u.tk_wait.list, q);
185  		list_del(&task->u.tk_wait.links);
186  	}
187  	list_del(&task->u.tk_wait.list);
188  }
189  
190  /*
191   * Add new request to a priority queue.
192   */
__rpc_add_wait_queue_priority(struct rpc_wait_queue * queue,struct rpc_task * task,unsigned char queue_priority)193  static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
194  		struct rpc_task *task,
195  		unsigned char queue_priority)
196  {
197  	if (unlikely(queue_priority > queue->maxpriority))
198  		queue_priority = queue->maxpriority;
199  	__rpc_list_enqueue_task(&queue->tasks[queue_priority], task);
200  }
201  
202  /*
203   * Add new request to wait queue.
204   */
__rpc_add_wait_queue(struct rpc_wait_queue * queue,struct rpc_task * task,unsigned char queue_priority)205  static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
206  		struct rpc_task *task,
207  		unsigned char queue_priority)
208  {
209  	INIT_LIST_HEAD(&task->u.tk_wait.timer_list);
210  	if (RPC_IS_PRIORITY(queue))
211  		__rpc_add_wait_queue_priority(queue, task, queue_priority);
212  	else
213  		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
214  	task->tk_waitqueue = queue;
215  	queue->qlen++;
216  	/* barrier matches the read in rpc_wake_up_task_queue_locked() */
217  	smp_wmb();
218  	rpc_set_queued(task);
219  }
220  
221  /*
222   * Remove request from a priority queue.
223   */
__rpc_remove_wait_queue_priority(struct rpc_task * task)224  static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
225  {
226  	__rpc_list_dequeue_task(task);
227  }
228  
229  /*
230   * Remove request from queue.
231   * Note: must be called with spin lock held.
232   */
__rpc_remove_wait_queue(struct rpc_wait_queue * queue,struct rpc_task * task)233  static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
234  {
235  	__rpc_disable_timer(queue, task);
236  	if (RPC_IS_PRIORITY(queue))
237  		__rpc_remove_wait_queue_priority(task);
238  	else
239  		list_del(&task->u.tk_wait.list);
240  	queue->qlen--;
241  }
242  
__rpc_init_priority_wait_queue(struct rpc_wait_queue * queue,const char * qname,unsigned char nr_queues)243  static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
244  {
245  	int i;
246  
247  	spin_lock_init(&queue->lock);
248  	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
249  		INIT_LIST_HEAD(&queue->tasks[i]);
250  	queue->maxpriority = nr_queues - 1;
251  	rpc_reset_waitqueue_priority(queue);
252  	queue->qlen = 0;
253  	queue->timer_list.expires = 0;
254  	INIT_DELAYED_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
255  	INIT_LIST_HEAD(&queue->timer_list.list);
256  	rpc_assign_waitqueue_name(queue, qname);
257  }
258  
rpc_init_priority_wait_queue(struct rpc_wait_queue * queue,const char * qname)259  void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
260  {
261  	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
262  }
263  EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
264  
rpc_init_wait_queue(struct rpc_wait_queue * queue,const char * qname)265  void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
266  {
267  	__rpc_init_priority_wait_queue(queue, qname, 1);
268  }
269  EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
270  
rpc_destroy_wait_queue(struct rpc_wait_queue * queue)271  void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
272  {
273  	cancel_delayed_work_sync(&queue->timer_list.dwork);
274  }
275  EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
276  
rpc_wait_bit_killable(struct wait_bit_key * key,int mode)277  static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode)
278  {
279  	schedule();
280  	if (signal_pending_state(mode, current))
281  		return -ERESTARTSYS;
282  	return 0;
283  }
284  
285  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) || IS_ENABLED(CONFIG_TRACEPOINTS)
rpc_task_set_debuginfo(struct rpc_task * task)286  static void rpc_task_set_debuginfo(struct rpc_task *task)
287  {
288  	struct rpc_clnt *clnt = task->tk_client;
289  
290  	/* Might be a task carrying a reverse-direction operation */
291  	if (!clnt) {
292  		static atomic_t rpc_pid;
293  
294  		task->tk_pid = atomic_inc_return(&rpc_pid);
295  		return;
296  	}
297  
298  	task->tk_pid = atomic_inc_return(&clnt->cl_pid);
299  }
300  #else
rpc_task_set_debuginfo(struct rpc_task * task)301  static inline void rpc_task_set_debuginfo(struct rpc_task *task)
302  {
303  }
304  #endif
305  
rpc_set_active(struct rpc_task * task)306  static void rpc_set_active(struct rpc_task *task)
307  {
308  	rpc_task_set_debuginfo(task);
309  	set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
310  	trace_rpc_task_begin(task, NULL);
311  }
312  
313  /*
314   * Mark an RPC call as having completed by clearing the 'active' bit
315   * and then waking up all tasks that were sleeping.
316   */
rpc_complete_task(struct rpc_task * task)317  static int rpc_complete_task(struct rpc_task *task)
318  {
319  	void *m = &task->tk_runstate;
320  	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
321  	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
322  	unsigned long flags;
323  	int ret;
324  
325  	trace_rpc_task_complete(task, NULL);
326  
327  	spin_lock_irqsave(&wq->lock, flags);
328  	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
329  	ret = atomic_dec_and_test(&task->tk_count);
330  	if (waitqueue_active(wq))
331  		__wake_up_locked_key(wq, TASK_NORMAL, &k);
332  	spin_unlock_irqrestore(&wq->lock, flags);
333  	return ret;
334  }
335  
336  /*
337   * Allow callers to wait for completion of an RPC call
338   *
339   * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
340   * to enforce taking of the wq->lock and hence avoid races with
341   * rpc_complete_task().
342   */
rpc_wait_for_completion_task(struct rpc_task * task)343  int rpc_wait_for_completion_task(struct rpc_task *task)
344  {
345  	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
346  			rpc_wait_bit_killable, TASK_KILLABLE|TASK_FREEZABLE_UNSAFE);
347  }
348  EXPORT_SYMBOL_GPL(rpc_wait_for_completion_task);
349  
350  /*
351   * Make an RPC task runnable.
352   *
353   * Note: If the task is ASYNC, and is being made runnable after sitting on an
354   * rpc_wait_queue, this must be called with the queue spinlock held to protect
355   * the wait queue operation.
356   * Note the ordering of rpc_test_and_set_running() and rpc_clear_queued(),
357   * which is needed to ensure that __rpc_execute() doesn't loop (due to the
358   * lockless RPC_IS_QUEUED() test) before we've had a chance to test
359   * the RPC_TASK_RUNNING flag.
360   */
rpc_make_runnable(struct workqueue_struct * wq,struct rpc_task * task)361  static void rpc_make_runnable(struct workqueue_struct *wq,
362  		struct rpc_task *task)
363  {
364  	bool need_wakeup = !rpc_test_and_set_running(task);
365  
366  	rpc_clear_queued(task);
367  	if (!need_wakeup)
368  		return;
369  	if (RPC_IS_ASYNC(task)) {
370  		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
371  		queue_work(wq, &task->u.tk_work);
372  	} else {
373  		smp_mb__after_atomic();
374  		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
375  	}
376  }
377  
378  /*
379   * Prepare for sleeping on a wait queue.
380   * By always appending tasks to the list we ensure FIFO behavior.
381   * NB: An RPC task will only receive interrupt-driven events as long
382   * as it's on a wait queue.
383   */
__rpc_do_sleep_on_priority(struct rpc_wait_queue * q,struct rpc_task * task,unsigned char queue_priority)384  static void __rpc_do_sleep_on_priority(struct rpc_wait_queue *q,
385  		struct rpc_task *task,
386  		unsigned char queue_priority)
387  {
388  	trace_rpc_task_sleep(task, q);
389  
390  	__rpc_add_wait_queue(q, task, queue_priority);
391  }
392  
__rpc_sleep_on_priority(struct rpc_wait_queue * q,struct rpc_task * task,unsigned char queue_priority)393  static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
394  		struct rpc_task *task,
395  		unsigned char queue_priority)
396  {
397  	if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
398  		return;
399  	__rpc_do_sleep_on_priority(q, task, queue_priority);
400  }
401  
__rpc_sleep_on_priority_timeout(struct rpc_wait_queue * q,struct rpc_task * task,unsigned long timeout,unsigned char queue_priority)402  static void __rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
403  		struct rpc_task *task, unsigned long timeout,
404  		unsigned char queue_priority)
405  {
406  	if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
407  		return;
408  	if (time_is_after_jiffies(timeout)) {
409  		__rpc_do_sleep_on_priority(q, task, queue_priority);
410  		__rpc_add_timer(q, task, timeout);
411  	} else
412  		task->tk_status = -ETIMEDOUT;
413  }
414  
rpc_set_tk_callback(struct rpc_task * task,rpc_action action)415  static void rpc_set_tk_callback(struct rpc_task *task, rpc_action action)
416  {
417  	if (action && !WARN_ON_ONCE(task->tk_callback != NULL))
418  		task->tk_callback = action;
419  }
420  
rpc_sleep_check_activated(struct rpc_task * task)421  static bool rpc_sleep_check_activated(struct rpc_task *task)
422  {
423  	/* We shouldn't ever put an inactive task to sleep */
424  	if (WARN_ON_ONCE(!RPC_IS_ACTIVATED(task))) {
425  		task->tk_status = -EIO;
426  		rpc_put_task_async(task);
427  		return false;
428  	}
429  	return true;
430  }
431  
rpc_sleep_on_timeout(struct rpc_wait_queue * q,struct rpc_task * task,rpc_action action,unsigned long timeout)432  void rpc_sleep_on_timeout(struct rpc_wait_queue *q, struct rpc_task *task,
433  				rpc_action action, unsigned long timeout)
434  {
435  	if (!rpc_sleep_check_activated(task))
436  		return;
437  
438  	rpc_set_tk_callback(task, action);
439  
440  	/*
441  	 * Protect the queue operations.
442  	 */
443  	spin_lock(&q->lock);
444  	__rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority);
445  	spin_unlock(&q->lock);
446  }
447  EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout);
448  
rpc_sleep_on(struct rpc_wait_queue * q,struct rpc_task * task,rpc_action action)449  void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
450  				rpc_action action)
451  {
452  	if (!rpc_sleep_check_activated(task))
453  		return;
454  
455  	rpc_set_tk_callback(task, action);
456  
457  	WARN_ON_ONCE(task->tk_timeout != 0);
458  	/*
459  	 * Protect the queue operations.
460  	 */
461  	spin_lock(&q->lock);
462  	__rpc_sleep_on_priority(q, task, task->tk_priority);
463  	spin_unlock(&q->lock);
464  }
465  EXPORT_SYMBOL_GPL(rpc_sleep_on);
466  
rpc_sleep_on_priority_timeout(struct rpc_wait_queue * q,struct rpc_task * task,unsigned long timeout,int priority)467  void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
468  		struct rpc_task *task, unsigned long timeout, int priority)
469  {
470  	if (!rpc_sleep_check_activated(task))
471  		return;
472  
473  	priority -= RPC_PRIORITY_LOW;
474  	/*
475  	 * Protect the queue operations.
476  	 */
477  	spin_lock(&q->lock);
478  	__rpc_sleep_on_priority_timeout(q, task, timeout, priority);
479  	spin_unlock(&q->lock);
480  }
481  EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout);
482  
rpc_sleep_on_priority(struct rpc_wait_queue * q,struct rpc_task * task,int priority)483  void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
484  		int priority)
485  {
486  	if (!rpc_sleep_check_activated(task))
487  		return;
488  
489  	WARN_ON_ONCE(task->tk_timeout != 0);
490  	priority -= RPC_PRIORITY_LOW;
491  	/*
492  	 * Protect the queue operations.
493  	 */
494  	spin_lock(&q->lock);
495  	__rpc_sleep_on_priority(q, task, priority);
496  	spin_unlock(&q->lock);
497  }
498  EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
499  
500  /**
501   * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
502   * @wq: workqueue on which to run task
503   * @queue: wait queue
504   * @task: task to be woken up
505   *
506   * Caller must hold queue->lock, and have cleared the task queued flag.
507   */
__rpc_do_wake_up_task_on_wq(struct workqueue_struct * wq,struct rpc_wait_queue * queue,struct rpc_task * task)508  static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
509  		struct rpc_wait_queue *queue,
510  		struct rpc_task *task)
511  {
512  	/* Has the task been executed yet? If not, we cannot wake it up! */
513  	if (!RPC_IS_ACTIVATED(task)) {
514  		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
515  		return;
516  	}
517  
518  	trace_rpc_task_wakeup(task, queue);
519  
520  	__rpc_remove_wait_queue(queue, task);
521  
522  	rpc_make_runnable(wq, task);
523  }
524  
525  /*
526   * Wake up a queued task while the queue lock is being held
527   */
528  static struct rpc_task *
rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct * wq,struct rpc_wait_queue * queue,struct rpc_task * task,bool (* action)(struct rpc_task *,void *),void * data)529  rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
530  		struct rpc_wait_queue *queue, struct rpc_task *task,
531  		bool (*action)(struct rpc_task *, void *), void *data)
532  {
533  	if (RPC_IS_QUEUED(task)) {
534  		smp_rmb();
535  		if (task->tk_waitqueue == queue) {
536  			if (action == NULL || action(task, data)) {
537  				__rpc_do_wake_up_task_on_wq(wq, queue, task);
538  				return task;
539  			}
540  		}
541  	}
542  	return NULL;
543  }
544  
545  /*
546   * Wake up a queued task while the queue lock is being held
547   */
rpc_wake_up_task_queue_locked(struct rpc_wait_queue * queue,struct rpc_task * task)548  static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue,
549  					  struct rpc_task *task)
550  {
551  	rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
552  						   task, NULL, NULL);
553  }
554  
555  /*
556   * Wake up a task on a specific queue
557   */
rpc_wake_up_queued_task(struct rpc_wait_queue * queue,struct rpc_task * task)558  void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
559  {
560  	if (!RPC_IS_QUEUED(task))
561  		return;
562  	spin_lock(&queue->lock);
563  	rpc_wake_up_task_queue_locked(queue, task);
564  	spin_unlock(&queue->lock);
565  }
566  EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
567  
rpc_task_action_set_status(struct rpc_task * task,void * status)568  static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
569  {
570  	task->tk_status = *(int *)status;
571  	return true;
572  }
573  
574  static void
rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue * queue,struct rpc_task * task,int status)575  rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
576  		struct rpc_task *task, int status)
577  {
578  	rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
579  			task, rpc_task_action_set_status, &status);
580  }
581  
582  /**
583   * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status
584   * @queue: pointer to rpc_wait_queue
585   * @task: pointer to rpc_task
586   * @status: integer error value
587   *
588   * If @task is queued on @queue, then it is woken up, and @task->tk_status is
589   * set to the value of @status.
590   */
591  void
rpc_wake_up_queued_task_set_status(struct rpc_wait_queue * queue,struct rpc_task * task,int status)592  rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
593  		struct rpc_task *task, int status)
594  {
595  	if (!RPC_IS_QUEUED(task))
596  		return;
597  	spin_lock(&queue->lock);
598  	rpc_wake_up_task_queue_set_status_locked(queue, task, status);
599  	spin_unlock(&queue->lock);
600  }
601  
602  /*
603   * Wake up the next task on a priority queue.
604   */
__rpc_find_next_queued_priority(struct rpc_wait_queue * queue)605  static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
606  {
607  	struct list_head *q;
608  	struct rpc_task *task;
609  
610  	/*
611  	 * Service the privileged queue.
612  	 */
613  	q = &queue->tasks[RPC_NR_PRIORITY - 1];
614  	if (queue->maxpriority > RPC_PRIORITY_PRIVILEGED && !list_empty(q)) {
615  		task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
616  		goto out;
617  	}
618  
619  	/*
620  	 * Service a batch of tasks from a single owner.
621  	 */
622  	q = &queue->tasks[queue->priority];
623  	if (!list_empty(q) && queue->nr) {
624  		queue->nr--;
625  		task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
626  		goto out;
627  	}
628  
629  	/*
630  	 * Service the next queue.
631  	 */
632  	do {
633  		if (q == &queue->tasks[0])
634  			q = &queue->tasks[queue->maxpriority];
635  		else
636  			q = q - 1;
637  		if (!list_empty(q)) {
638  			task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
639  			goto new_queue;
640  		}
641  	} while (q != &queue->tasks[queue->priority]);
642  
643  	rpc_reset_waitqueue_priority(queue);
644  	return NULL;
645  
646  new_queue:
647  	rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
648  out:
649  	return task;
650  }
651  
__rpc_find_next_queued(struct rpc_wait_queue * queue)652  static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
653  {
654  	if (RPC_IS_PRIORITY(queue))
655  		return __rpc_find_next_queued_priority(queue);
656  	if (!list_empty(&queue->tasks[0]))
657  		return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
658  	return NULL;
659  }
660  
661  /*
662   * Wake up the first task on the wait queue.
663   */
rpc_wake_up_first_on_wq(struct workqueue_struct * wq,struct rpc_wait_queue * queue,bool (* func)(struct rpc_task *,void *),void * data)664  struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
665  		struct rpc_wait_queue *queue,
666  		bool (*func)(struct rpc_task *, void *), void *data)
667  {
668  	struct rpc_task	*task = NULL;
669  
670  	spin_lock(&queue->lock);
671  	task = __rpc_find_next_queued(queue);
672  	if (task != NULL)
673  		task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
674  				task, func, data);
675  	spin_unlock(&queue->lock);
676  
677  	return task;
678  }
679  
680  /*
681   * Wake up the first task on the wait queue.
682   */
rpc_wake_up_first(struct rpc_wait_queue * queue,bool (* func)(struct rpc_task *,void *),void * data)683  struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
684  		bool (*func)(struct rpc_task *, void *), void *data)
685  {
686  	return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
687  }
688  EXPORT_SYMBOL_GPL(rpc_wake_up_first);
689  
rpc_wake_up_next_func(struct rpc_task * task,void * data)690  static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
691  {
692  	return true;
693  }
694  
695  /*
696   * Wake up the next task on the wait queue.
697  */
rpc_wake_up_next(struct rpc_wait_queue * queue)698  struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
699  {
700  	return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
701  }
702  EXPORT_SYMBOL_GPL(rpc_wake_up_next);
703  
704  /**
705   * rpc_wake_up_locked - wake up all rpc_tasks
706   * @queue: rpc_wait_queue on which the tasks are sleeping
707   *
708   */
rpc_wake_up_locked(struct rpc_wait_queue * queue)709  static void rpc_wake_up_locked(struct rpc_wait_queue *queue)
710  {
711  	struct rpc_task *task;
712  
713  	for (;;) {
714  		task = __rpc_find_next_queued(queue);
715  		if (task == NULL)
716  			break;
717  		rpc_wake_up_task_queue_locked(queue, task);
718  	}
719  }
720  
721  /**
722   * rpc_wake_up - wake up all rpc_tasks
723   * @queue: rpc_wait_queue on which the tasks are sleeping
724   *
725   * Grabs queue->lock
726   */
rpc_wake_up(struct rpc_wait_queue * queue)727  void rpc_wake_up(struct rpc_wait_queue *queue)
728  {
729  	spin_lock(&queue->lock);
730  	rpc_wake_up_locked(queue);
731  	spin_unlock(&queue->lock);
732  }
733  EXPORT_SYMBOL_GPL(rpc_wake_up);
734  
735  /**
736   * rpc_wake_up_status_locked - wake up all rpc_tasks and set their status value.
737   * @queue: rpc_wait_queue on which the tasks are sleeping
738   * @status: status value to set
739   */
rpc_wake_up_status_locked(struct rpc_wait_queue * queue,int status)740  static void rpc_wake_up_status_locked(struct rpc_wait_queue *queue, int status)
741  {
742  	struct rpc_task *task;
743  
744  	for (;;) {
745  		task = __rpc_find_next_queued(queue);
746  		if (task == NULL)
747  			break;
748  		rpc_wake_up_task_queue_set_status_locked(queue, task, status);
749  	}
750  }
751  
752  /**
753   * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
754   * @queue: rpc_wait_queue on which the tasks are sleeping
755   * @status: status value to set
756   *
757   * Grabs queue->lock
758   */
rpc_wake_up_status(struct rpc_wait_queue * queue,int status)759  void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
760  {
761  	spin_lock(&queue->lock);
762  	rpc_wake_up_status_locked(queue, status);
763  	spin_unlock(&queue->lock);
764  }
765  EXPORT_SYMBOL_GPL(rpc_wake_up_status);
766  
__rpc_queue_timer_fn(struct work_struct * work)767  static void __rpc_queue_timer_fn(struct work_struct *work)
768  {
769  	struct rpc_wait_queue *queue = container_of(work,
770  			struct rpc_wait_queue,
771  			timer_list.dwork.work);
772  	struct rpc_task *task, *n;
773  	unsigned long expires, now, timeo;
774  
775  	spin_lock(&queue->lock);
776  	expires = now = jiffies;
777  	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
778  		timeo = task->tk_timeout;
779  		if (time_after_eq(now, timeo)) {
780  			trace_rpc_task_timeout(task, task->tk_action);
781  			task->tk_status = -ETIMEDOUT;
782  			rpc_wake_up_task_queue_locked(queue, task);
783  			continue;
784  		}
785  		if (expires == now || time_after(expires, timeo))
786  			expires = timeo;
787  	}
788  	if (!list_empty(&queue->timer_list.list))
789  		rpc_set_queue_timer(queue, expires);
790  	spin_unlock(&queue->lock);
791  }
792  
__rpc_atrun(struct rpc_task * task)793  static void __rpc_atrun(struct rpc_task *task)
794  {
795  	if (task->tk_status == -ETIMEDOUT)
796  		task->tk_status = 0;
797  }
798  
799  /*
800   * Run a task at a later time
801   */
rpc_delay(struct rpc_task * task,unsigned long delay)802  void rpc_delay(struct rpc_task *task, unsigned long delay)
803  {
804  	rpc_sleep_on_timeout(&delay_queue, task, __rpc_atrun, jiffies + delay);
805  }
806  EXPORT_SYMBOL_GPL(rpc_delay);
807  
808  /*
809   * Helper to call task->tk_ops->rpc_call_prepare
810   */
rpc_prepare_task(struct rpc_task * task)811  void rpc_prepare_task(struct rpc_task *task)
812  {
813  	task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
814  }
815  
816  static void
rpc_init_task_statistics(struct rpc_task * task)817  rpc_init_task_statistics(struct rpc_task *task)
818  {
819  	/* Initialize retry counters */
820  	task->tk_garb_retry = 2;
821  	task->tk_cred_retry = 2;
822  
823  	/* starting timestamp */
824  	task->tk_start = ktime_get();
825  }
826  
827  static void
rpc_reset_task_statistics(struct rpc_task * task)828  rpc_reset_task_statistics(struct rpc_task *task)
829  {
830  	task->tk_timeouts = 0;
831  	task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_SENT);
832  	rpc_init_task_statistics(task);
833  }
834  
835  /*
836   * Helper that calls task->tk_ops->rpc_call_done if it exists
837   */
rpc_exit_task(struct rpc_task * task)838  void rpc_exit_task(struct rpc_task *task)
839  {
840  	trace_rpc_task_end(task, task->tk_action);
841  	task->tk_action = NULL;
842  	if (task->tk_ops->rpc_count_stats)
843  		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
844  	else if (task->tk_client)
845  		rpc_count_iostats(task, task->tk_client->cl_metrics);
846  	if (task->tk_ops->rpc_call_done != NULL) {
847  		trace_rpc_task_call_done(task, task->tk_ops->rpc_call_done);
848  		task->tk_ops->rpc_call_done(task, task->tk_calldata);
849  		if (task->tk_action != NULL) {
850  			/* Always release the RPC slot and buffer memory */
851  			xprt_release(task);
852  			rpc_reset_task_statistics(task);
853  		}
854  	}
855  }
856  
rpc_signal_task(struct rpc_task * task)857  void rpc_signal_task(struct rpc_task *task)
858  {
859  	struct rpc_wait_queue *queue;
860  
861  	if (!RPC_IS_ACTIVATED(task))
862  		return;
863  
864  	if (!rpc_task_set_rpc_status(task, -ERESTARTSYS))
865  		return;
866  	trace_rpc_task_signalled(task, task->tk_action);
867  	set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
868  	smp_mb__after_atomic();
869  	queue = READ_ONCE(task->tk_waitqueue);
870  	if (queue)
871  		rpc_wake_up_queued_task(queue, task);
872  }
873  
rpc_task_try_cancel(struct rpc_task * task,int error)874  void rpc_task_try_cancel(struct rpc_task *task, int error)
875  {
876  	struct rpc_wait_queue *queue;
877  
878  	if (!rpc_task_set_rpc_status(task, error))
879  		return;
880  	queue = READ_ONCE(task->tk_waitqueue);
881  	if (queue)
882  		rpc_wake_up_queued_task(queue, task);
883  }
884  
rpc_exit(struct rpc_task * task,int status)885  void rpc_exit(struct rpc_task *task, int status)
886  {
887  	task->tk_status = status;
888  	task->tk_action = rpc_exit_task;
889  	rpc_wake_up_queued_task(task->tk_waitqueue, task);
890  }
891  EXPORT_SYMBOL_GPL(rpc_exit);
892  
rpc_release_calldata(const struct rpc_call_ops * ops,void * calldata)893  void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
894  {
895  	if (ops->rpc_release != NULL)
896  		ops->rpc_release(calldata);
897  }
898  
xprt_needs_memalloc(struct rpc_xprt * xprt,struct rpc_task * tk)899  static bool xprt_needs_memalloc(struct rpc_xprt *xprt, struct rpc_task *tk)
900  {
901  	if (!xprt)
902  		return false;
903  	if (!atomic_read(&xprt->swapper))
904  		return false;
905  	return test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == tk;
906  }
907  
908  /*
909   * This is the RPC `scheduler' (or rather, the finite state machine).
910   */
__rpc_execute(struct rpc_task * task)911  static void __rpc_execute(struct rpc_task *task)
912  {
913  	struct rpc_wait_queue *queue;
914  	int task_is_async = RPC_IS_ASYNC(task);
915  	int status = 0;
916  	unsigned long pflags = current->flags;
917  
918  	WARN_ON_ONCE(RPC_IS_QUEUED(task));
919  	if (RPC_IS_QUEUED(task))
920  		return;
921  
922  	for (;;) {
923  		void (*do_action)(struct rpc_task *);
924  
925  		/*
926  		 * Perform the next FSM step or a pending callback.
927  		 *
928  		 * tk_action may be NULL if the task has been killed.
929  		 */
930  		do_action = task->tk_action;
931  		/* Tasks with an RPC error status should exit */
932  		if (do_action && do_action != rpc_exit_task &&
933  		    (status = READ_ONCE(task->tk_rpc_status)) != 0) {
934  			task->tk_status = status;
935  			do_action = rpc_exit_task;
936  		}
937  		/* Callbacks override all actions */
938  		if (task->tk_callback) {
939  			do_action = task->tk_callback;
940  			task->tk_callback = NULL;
941  		}
942  		if (!do_action)
943  			break;
944  		if (RPC_IS_SWAPPER(task) ||
945  		    xprt_needs_memalloc(task->tk_xprt, task))
946  			current->flags |= PF_MEMALLOC;
947  
948  		trace_rpc_task_run_action(task, do_action);
949  		do_action(task);
950  
951  		/*
952  		 * Lockless check for whether task is sleeping or not.
953  		 */
954  		if (!RPC_IS_QUEUED(task)) {
955  			cond_resched();
956  			continue;
957  		}
958  
959  		/*
960  		 * The queue->lock protects against races with
961  		 * rpc_make_runnable().
962  		 *
963  		 * Note that once we clear RPC_TASK_RUNNING on an asynchronous
964  		 * rpc_task, rpc_make_runnable() can assign it to a
965  		 * different workqueue. We therefore cannot assume that the
966  		 * rpc_task pointer may still be dereferenced.
967  		 */
968  		queue = task->tk_waitqueue;
969  		spin_lock(&queue->lock);
970  		if (!RPC_IS_QUEUED(task)) {
971  			spin_unlock(&queue->lock);
972  			continue;
973  		}
974  		/* Wake up any task that has an exit status */
975  		if (READ_ONCE(task->tk_rpc_status) != 0) {
976  			rpc_wake_up_task_queue_locked(queue, task);
977  			spin_unlock(&queue->lock);
978  			continue;
979  		}
980  		rpc_clear_running(task);
981  		spin_unlock(&queue->lock);
982  		if (task_is_async)
983  			goto out;
984  
985  		/* sync task: sleep here */
986  		trace_rpc_task_sync_sleep(task, task->tk_action);
987  		status = out_of_line_wait_on_bit(&task->tk_runstate,
988  				RPC_TASK_QUEUED, rpc_wait_bit_killable,
989  				TASK_KILLABLE|TASK_FREEZABLE);
990  		if (status < 0) {
991  			/*
992  			 * When a sync task receives a signal, it exits with
993  			 * -ERESTARTSYS. In order to catch any callbacks that
994  			 * clean up after sleeping on some queue, we don't
995  			 * break the loop here, but go around once more.
996  			 */
997  			rpc_signal_task(task);
998  		}
999  		trace_rpc_task_sync_wake(task, task->tk_action);
1000  	}
1001  
1002  	/* Release all resources associated with the task */
1003  	rpc_release_task(task);
1004  out:
1005  	current_restore_flags(pflags, PF_MEMALLOC);
1006  }
1007  
1008  /*
1009   * User-visible entry point to the scheduler.
1010   *
1011   * This may be called recursively if e.g. an async NFS task updates
1012   * the attributes and finds that dirty pages must be flushed.
1013   * NOTE: Upon exit of this function the task is guaranteed to be
1014   *	 released. In particular note that tk_release() will have
1015   *	 been called, so your task memory may have been freed.
1016   */
rpc_execute(struct rpc_task * task)1017  void rpc_execute(struct rpc_task *task)
1018  {
1019  	bool is_async = RPC_IS_ASYNC(task);
1020  
1021  	rpc_set_active(task);
1022  	rpc_make_runnable(rpciod_workqueue, task);
1023  	if (!is_async) {
1024  		unsigned int pflags = memalloc_nofs_save();
1025  		__rpc_execute(task);
1026  		memalloc_nofs_restore(pflags);
1027  	}
1028  }
1029  
rpc_async_schedule(struct work_struct * work)1030  static void rpc_async_schedule(struct work_struct *work)
1031  {
1032  	unsigned int pflags = memalloc_nofs_save();
1033  
1034  	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
1035  	memalloc_nofs_restore(pflags);
1036  }
1037  
1038  /**
1039   * rpc_malloc - allocate RPC buffer resources
1040   * @task: RPC task
1041   *
1042   * A single memory region is allocated, which is split between the
1043   * RPC call and RPC reply that this task is being used for. When
1044   * this RPC is retired, the memory is released by calling rpc_free.
1045   *
1046   * To prevent rpciod from hanging, this allocator never sleeps,
1047   * returning -ENOMEM and suppressing warning if the request cannot
1048   * be serviced immediately. The caller can arrange to sleep in a
1049   * way that is safe for rpciod.
1050   *
1051   * Most requests are 'small' (under 2KiB) and can be serviced from a
1052   * mempool, ensuring that NFS reads and writes can always proceed,
1053   * and that there is good locality of reference for these buffers.
1054   */
rpc_malloc(struct rpc_task * task)1055  int rpc_malloc(struct rpc_task *task)
1056  {
1057  	struct rpc_rqst *rqst = task->tk_rqstp;
1058  	size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
1059  	struct rpc_buffer *buf;
1060  	gfp_t gfp = rpc_task_gfp_mask();
1061  
1062  	size += sizeof(struct rpc_buffer);
1063  	if (size <= RPC_BUFFER_MAXSIZE) {
1064  		buf = kmem_cache_alloc(rpc_buffer_slabp, gfp);
1065  		/* Reach for the mempool if dynamic allocation fails */
1066  		if (!buf && RPC_IS_ASYNC(task))
1067  			buf = mempool_alloc(rpc_buffer_mempool, GFP_NOWAIT);
1068  	} else
1069  		buf = kmalloc(size, gfp);
1070  
1071  	if (!buf)
1072  		return -ENOMEM;
1073  
1074  	buf->len = size;
1075  	rqst->rq_buffer = buf->data;
1076  	rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
1077  	return 0;
1078  }
1079  EXPORT_SYMBOL_GPL(rpc_malloc);
1080  
1081  /**
1082   * rpc_free - free RPC buffer resources allocated via rpc_malloc
1083   * @task: RPC task
1084   *
1085   */
rpc_free(struct rpc_task * task)1086  void rpc_free(struct rpc_task *task)
1087  {
1088  	void *buffer = task->tk_rqstp->rq_buffer;
1089  	size_t size;
1090  	struct rpc_buffer *buf;
1091  
1092  	buf = container_of(buffer, struct rpc_buffer, data);
1093  	size = buf->len;
1094  
1095  	if (size <= RPC_BUFFER_MAXSIZE)
1096  		mempool_free(buf, rpc_buffer_mempool);
1097  	else
1098  		kfree(buf);
1099  }
1100  EXPORT_SYMBOL_GPL(rpc_free);
1101  
1102  /*
1103   * Creation and deletion of RPC task structures
1104   */
rpc_init_task(struct rpc_task * task,const struct rpc_task_setup * task_setup_data)1105  static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
1106  {
1107  	memset(task, 0, sizeof(*task));
1108  	atomic_set(&task->tk_count, 1);
1109  	task->tk_flags  = task_setup_data->flags;
1110  	task->tk_ops = task_setup_data->callback_ops;
1111  	task->tk_calldata = task_setup_data->callback_data;
1112  	INIT_LIST_HEAD(&task->tk_task);
1113  
1114  	task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
1115  	task->tk_owner = current->tgid;
1116  
1117  	/* Initialize workqueue for async tasks */
1118  	task->tk_workqueue = task_setup_data->workqueue;
1119  
1120  	task->tk_xprt = rpc_task_get_xprt(task_setup_data->rpc_client,
1121  			xprt_get(task_setup_data->rpc_xprt));
1122  
1123  	task->tk_op_cred = get_rpccred(task_setup_data->rpc_op_cred);
1124  
1125  	if (task->tk_ops->rpc_call_prepare != NULL)
1126  		task->tk_action = rpc_prepare_task;
1127  
1128  	rpc_init_task_statistics(task);
1129  }
1130  
rpc_alloc_task(void)1131  static struct rpc_task *rpc_alloc_task(void)
1132  {
1133  	struct rpc_task *task;
1134  
1135  	task = kmem_cache_alloc(rpc_task_slabp, rpc_task_gfp_mask());
1136  	if (task)
1137  		return task;
1138  	return mempool_alloc(rpc_task_mempool, GFP_NOWAIT);
1139  }
1140  
1141  /*
1142   * Create a new task for the specified client.
1143   */
rpc_new_task(const struct rpc_task_setup * setup_data)1144  struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
1145  {
1146  	struct rpc_task	*task = setup_data->task;
1147  	unsigned short flags = 0;
1148  
1149  	if (task == NULL) {
1150  		task = rpc_alloc_task();
1151  		if (task == NULL) {
1152  			rpc_release_calldata(setup_data->callback_ops,
1153  					     setup_data->callback_data);
1154  			return ERR_PTR(-ENOMEM);
1155  		}
1156  		flags = RPC_TASK_DYNAMIC;
1157  	}
1158  
1159  	rpc_init_task(task, setup_data);
1160  	task->tk_flags |= flags;
1161  	return task;
1162  }
1163  
1164  /*
1165   * rpc_free_task - release rpc task and perform cleanups
1166   *
1167   * Note that we free up the rpc_task _after_ rpc_release_calldata()
1168   * in order to work around a workqueue dependency issue.
1169   *
1170   * Tejun Heo states:
1171   * "Workqueue currently considers two work items to be the same if they're
1172   * on the same address and won't execute them concurrently - ie. it
1173   * makes a work item which is queued again while being executed wait
1174   * for the previous execution to complete.
1175   *
1176   * If a work function frees the work item, and then waits for an event
1177   * which should be performed by another work item and *that* work item
1178   * recycles the freed work item, it can create a false dependency loop.
1179   * There really is no reliable way to detect this short of verifying
1180   * every memory free."
1181   *
1182   */
rpc_free_task(struct rpc_task * task)1183  static void rpc_free_task(struct rpc_task *task)
1184  {
1185  	unsigned short tk_flags = task->tk_flags;
1186  
1187  	put_rpccred(task->tk_op_cred);
1188  	rpc_release_calldata(task->tk_ops, task->tk_calldata);
1189  
1190  	if (tk_flags & RPC_TASK_DYNAMIC)
1191  		mempool_free(task, rpc_task_mempool);
1192  }
1193  
rpc_async_release(struct work_struct * work)1194  static void rpc_async_release(struct work_struct *work)
1195  {
1196  	unsigned int pflags = memalloc_nofs_save();
1197  
1198  	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
1199  	memalloc_nofs_restore(pflags);
1200  }
1201  
rpc_release_resources_task(struct rpc_task * task)1202  static void rpc_release_resources_task(struct rpc_task *task)
1203  {
1204  	xprt_release(task);
1205  	if (task->tk_msg.rpc_cred) {
1206  		if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1207  			put_cred(task->tk_msg.rpc_cred);
1208  		task->tk_msg.rpc_cred = NULL;
1209  	}
1210  	rpc_task_release_client(task);
1211  }
1212  
rpc_final_put_task(struct rpc_task * task,struct workqueue_struct * q)1213  static void rpc_final_put_task(struct rpc_task *task,
1214  		struct workqueue_struct *q)
1215  {
1216  	if (q != NULL) {
1217  		INIT_WORK(&task->u.tk_work, rpc_async_release);
1218  		queue_work(q, &task->u.tk_work);
1219  	} else
1220  		rpc_free_task(task);
1221  }
1222  
rpc_do_put_task(struct rpc_task * task,struct workqueue_struct * q)1223  static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
1224  {
1225  	if (atomic_dec_and_test(&task->tk_count)) {
1226  		rpc_release_resources_task(task);
1227  		rpc_final_put_task(task, q);
1228  	}
1229  }
1230  
rpc_put_task(struct rpc_task * task)1231  void rpc_put_task(struct rpc_task *task)
1232  {
1233  	rpc_do_put_task(task, NULL);
1234  }
1235  EXPORT_SYMBOL_GPL(rpc_put_task);
1236  
rpc_put_task_async(struct rpc_task * task)1237  void rpc_put_task_async(struct rpc_task *task)
1238  {
1239  	rpc_do_put_task(task, task->tk_workqueue);
1240  }
1241  EXPORT_SYMBOL_GPL(rpc_put_task_async);
1242  
rpc_release_task(struct rpc_task * task)1243  static void rpc_release_task(struct rpc_task *task)
1244  {
1245  	WARN_ON_ONCE(RPC_IS_QUEUED(task));
1246  
1247  	rpc_release_resources_task(task);
1248  
1249  	/*
1250  	 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
1251  	 * so it should be safe to use task->tk_count as a test for whether
1252  	 * or not any other processes still hold references to our rpc_task.
1253  	 */
1254  	if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
1255  		/* Wake up anyone who may be waiting for task completion */
1256  		if (!rpc_complete_task(task))
1257  			return;
1258  	} else {
1259  		if (!atomic_dec_and_test(&task->tk_count))
1260  			return;
1261  	}
1262  	rpc_final_put_task(task, task->tk_workqueue);
1263  }
1264  
rpciod_up(void)1265  int rpciod_up(void)
1266  {
1267  	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
1268  }
1269  
rpciod_down(void)1270  void rpciod_down(void)
1271  {
1272  	module_put(THIS_MODULE);
1273  }
1274  
1275  /*
1276   * Start up the rpciod workqueue.
1277   */
rpciod_start(void)1278  static int rpciod_start(void)
1279  {
1280  	struct workqueue_struct *wq;
1281  
1282  	/*
1283  	 * Create the rpciod thread and wait for it to start.
1284  	 */
1285  	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
1286  	if (!wq)
1287  		goto out_failed;
1288  	rpciod_workqueue = wq;
1289  	wq = alloc_workqueue("xprtiod", WQ_UNBOUND | WQ_MEM_RECLAIM, 0);
1290  	if (!wq)
1291  		goto free_rpciod;
1292  	xprtiod_workqueue = wq;
1293  	return 1;
1294  free_rpciod:
1295  	wq = rpciod_workqueue;
1296  	rpciod_workqueue = NULL;
1297  	destroy_workqueue(wq);
1298  out_failed:
1299  	return 0;
1300  }
1301  
rpciod_stop(void)1302  static void rpciod_stop(void)
1303  {
1304  	struct workqueue_struct *wq = NULL;
1305  
1306  	if (rpciod_workqueue == NULL)
1307  		return;
1308  
1309  	wq = rpciod_workqueue;
1310  	rpciod_workqueue = NULL;
1311  	destroy_workqueue(wq);
1312  	wq = xprtiod_workqueue;
1313  	xprtiod_workqueue = NULL;
1314  	destroy_workqueue(wq);
1315  }
1316  
1317  void
rpc_destroy_mempool(void)1318  rpc_destroy_mempool(void)
1319  {
1320  	rpciod_stop();
1321  	mempool_destroy(rpc_buffer_mempool);
1322  	mempool_destroy(rpc_task_mempool);
1323  	kmem_cache_destroy(rpc_task_slabp);
1324  	kmem_cache_destroy(rpc_buffer_slabp);
1325  	rpc_destroy_wait_queue(&delay_queue);
1326  }
1327  
1328  int
rpc_init_mempool(void)1329  rpc_init_mempool(void)
1330  {
1331  	/*
1332  	 * The following is not strictly a mempool initialisation,
1333  	 * but there is no harm in doing it here
1334  	 */
1335  	rpc_init_wait_queue(&delay_queue, "delayq");
1336  	if (!rpciod_start())
1337  		goto err_nomem;
1338  
1339  	rpc_task_slabp = kmem_cache_create("rpc_tasks",
1340  					     sizeof(struct rpc_task),
1341  					     0, SLAB_HWCACHE_ALIGN,
1342  					     NULL);
1343  	if (!rpc_task_slabp)
1344  		goto err_nomem;
1345  	rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1346  					     RPC_BUFFER_MAXSIZE,
1347  					     0, SLAB_HWCACHE_ALIGN,
1348  					     NULL);
1349  	if (!rpc_buffer_slabp)
1350  		goto err_nomem;
1351  	rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1352  						    rpc_task_slabp);
1353  	if (!rpc_task_mempool)
1354  		goto err_nomem;
1355  	rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1356  						      rpc_buffer_slabp);
1357  	if (!rpc_buffer_mempool)
1358  		goto err_nomem;
1359  	return 0;
1360  err_nomem:
1361  	rpc_destroy_mempool();
1362  	return -ENOMEM;
1363  }
1364