1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   *  linux/net/sunrpc/xprt.c
4   *
5   *  This is a generic RPC call interface supporting congestion avoidance,
6   *  and asynchronous calls.
7   *
8   *  The interface works like this:
9   *
10   *  -	When a process places a call, it allocates a request slot if
11   *	one is available. Otherwise, it sleeps on the backlog queue
12   *	(xprt_reserve).
13   *  -	Next, the caller puts together the RPC message, stuffs it into
14   *	the request struct, and calls xprt_transmit().
15   *  -	xprt_transmit sends the message and installs the caller on the
16   *	transport's wait list. At the same time, if a reply is expected,
17   *	it installs a timer that is run after the packet's timeout has
18   *	expired.
19   *  -	When a packet arrives, the data_ready handler walks the list of
20   *	pending requests for that transport. If a matching XID is found, the
21   *	caller is woken up, and the timer removed.
22   *  -	When no reply arrives within the timeout interval, the timer is
23   *	fired by the kernel and runs xprt_timer(). It either adjusts the
24   *	timeout values (minor timeout) or wakes up the caller with a status
25   *	of -ETIMEDOUT.
26   *  -	When the caller receives a notification from RPC that a reply arrived,
27   *	it should release the RPC slot, and process the reply.
28   *	If the call timed out, it may choose to retry the operation by
29   *	adjusting the initial timeout value, and simply calling rpc_call
30   *	again.
31   *
32   *  Support for async RPC is done through a set of RPC-specific scheduling
33   *  primitives that `transparently' work for processes as well as async
34   *  tasks that rely on callbacks.
35   *
36   *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
37   *
38   *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
39   */
40  
41  #include <linux/module.h>
42  
43  #include <linux/types.h>
44  #include <linux/interrupt.h>
45  #include <linux/workqueue.h>
46  #include <linux/net.h>
47  #include <linux/ktime.h>
48  
49  #include <linux/sunrpc/clnt.h>
50  #include <linux/sunrpc/metrics.h>
51  #include <linux/sunrpc/bc_xprt.h>
52  #include <linux/rcupdate.h>
53  #include <linux/sched/mm.h>
54  
55  #include <trace/events/sunrpc.h>
56  
57  #include "sunrpc.h"
58  #include "sysfs.h"
59  #include "fail.h"
60  
61  /*
62   * Local variables
63   */
64  
65  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
66  # define RPCDBG_FACILITY	RPCDBG_XPRT
67  #endif
68  
69  /*
70   * Local functions
71   */
72  static void	xprt_init(struct rpc_xprt *xprt, struct net *net);
73  static __be32	xprt_alloc_xid(struct rpc_xprt *xprt);
74  static void	xprt_destroy(struct rpc_xprt *xprt);
75  static void	xprt_request_init(struct rpc_task *task);
76  static int	xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf);
77  
78  static DEFINE_SPINLOCK(xprt_list_lock);
79  static LIST_HEAD(xprt_list);
80  
xprt_request_timeout(const struct rpc_rqst * req)81  static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
82  {
83  	unsigned long timeout = jiffies + req->rq_timeout;
84  
85  	if (time_before(timeout, req->rq_majortimeo))
86  		return timeout;
87  	return req->rq_majortimeo;
88  }
89  
90  /**
91   * xprt_register_transport - register a transport implementation
92   * @transport: transport to register
93   *
94   * If a transport implementation is loaded as a kernel module, it can
95   * call this interface to make itself known to the RPC client.
96   *
97   * Returns:
98   * 0:		transport successfully registered
99   * -EEXIST:	transport already registered
100   * -EINVAL:	transport module being unloaded
101   */
xprt_register_transport(struct xprt_class * transport)102  int xprt_register_transport(struct xprt_class *transport)
103  {
104  	struct xprt_class *t;
105  	int result;
106  
107  	result = -EEXIST;
108  	spin_lock(&xprt_list_lock);
109  	list_for_each_entry(t, &xprt_list, list) {
110  		/* don't register the same transport class twice */
111  		if (t->ident == transport->ident)
112  			goto out;
113  	}
114  
115  	list_add_tail(&transport->list, &xprt_list);
116  	printk(KERN_INFO "RPC: Registered %s transport module.\n",
117  	       transport->name);
118  	result = 0;
119  
120  out:
121  	spin_unlock(&xprt_list_lock);
122  	return result;
123  }
124  EXPORT_SYMBOL_GPL(xprt_register_transport);
125  
126  /**
127   * xprt_unregister_transport - unregister a transport implementation
128   * @transport: transport to unregister
129   *
130   * Returns:
131   * 0:		transport successfully unregistered
132   * -ENOENT:	transport never registered
133   */
xprt_unregister_transport(struct xprt_class * transport)134  int xprt_unregister_transport(struct xprt_class *transport)
135  {
136  	struct xprt_class *t;
137  	int result;
138  
139  	result = 0;
140  	spin_lock(&xprt_list_lock);
141  	list_for_each_entry(t, &xprt_list, list) {
142  		if (t == transport) {
143  			printk(KERN_INFO
144  				"RPC: Unregistered %s transport module.\n",
145  				transport->name);
146  			list_del_init(&transport->list);
147  			goto out;
148  		}
149  	}
150  	result = -ENOENT;
151  
152  out:
153  	spin_unlock(&xprt_list_lock);
154  	return result;
155  }
156  EXPORT_SYMBOL_GPL(xprt_unregister_transport);
157  
158  static void
xprt_class_release(const struct xprt_class * t)159  xprt_class_release(const struct xprt_class *t)
160  {
161  	module_put(t->owner);
162  }
163  
164  static const struct xprt_class *
xprt_class_find_by_ident_locked(int ident)165  xprt_class_find_by_ident_locked(int ident)
166  {
167  	const struct xprt_class *t;
168  
169  	list_for_each_entry(t, &xprt_list, list) {
170  		if (t->ident != ident)
171  			continue;
172  		if (!try_module_get(t->owner))
173  			continue;
174  		return t;
175  	}
176  	return NULL;
177  }
178  
179  static const struct xprt_class *
xprt_class_find_by_ident(int ident)180  xprt_class_find_by_ident(int ident)
181  {
182  	const struct xprt_class *t;
183  
184  	spin_lock(&xprt_list_lock);
185  	t = xprt_class_find_by_ident_locked(ident);
186  	spin_unlock(&xprt_list_lock);
187  	return t;
188  }
189  
190  static const struct xprt_class *
xprt_class_find_by_netid_locked(const char * netid)191  xprt_class_find_by_netid_locked(const char *netid)
192  {
193  	const struct xprt_class *t;
194  	unsigned int i;
195  
196  	list_for_each_entry(t, &xprt_list, list) {
197  		for (i = 0; t->netid[i][0] != '\0'; i++) {
198  			if (strcmp(t->netid[i], netid) != 0)
199  				continue;
200  			if (!try_module_get(t->owner))
201  				continue;
202  			return t;
203  		}
204  	}
205  	return NULL;
206  }
207  
208  static const struct xprt_class *
xprt_class_find_by_netid(const char * netid)209  xprt_class_find_by_netid(const char *netid)
210  {
211  	const struct xprt_class *t;
212  
213  	spin_lock(&xprt_list_lock);
214  	t = xprt_class_find_by_netid_locked(netid);
215  	if (!t) {
216  		spin_unlock(&xprt_list_lock);
217  		request_module("rpc%s", netid);
218  		spin_lock(&xprt_list_lock);
219  		t = xprt_class_find_by_netid_locked(netid);
220  	}
221  	spin_unlock(&xprt_list_lock);
222  	return t;
223  }
224  
225  /**
226   * xprt_find_transport_ident - convert a netid into a transport identifier
227   * @netid: transport to load
228   *
229   * Returns:
230   * > 0:		transport identifier
231   * -ENOENT:	transport module not available
232   */
xprt_find_transport_ident(const char * netid)233  int xprt_find_transport_ident(const char *netid)
234  {
235  	const struct xprt_class *t;
236  	int ret;
237  
238  	t = xprt_class_find_by_netid(netid);
239  	if (!t)
240  		return -ENOENT;
241  	ret = t->ident;
242  	xprt_class_release(t);
243  	return ret;
244  }
245  EXPORT_SYMBOL_GPL(xprt_find_transport_ident);
246  
xprt_clear_locked(struct rpc_xprt * xprt)247  static void xprt_clear_locked(struct rpc_xprt *xprt)
248  {
249  	xprt->snd_task = NULL;
250  	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state))
251  		clear_bit_unlock(XPRT_LOCKED, &xprt->state);
252  	else
253  		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
254  }
255  
256  /**
257   * xprt_reserve_xprt - serialize write access to transports
258   * @task: task that is requesting access to the transport
259   * @xprt: pointer to the target transport
260   *
261   * This prevents mixing the payload of separate requests, and prevents
262   * transport connects from colliding with writes.  No congestion control
263   * is provided.
264   */
xprt_reserve_xprt(struct rpc_xprt * xprt,struct rpc_task * task)265  int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
266  {
267  	struct rpc_rqst *req = task->tk_rqstp;
268  
269  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
270  		if (task == xprt->snd_task)
271  			goto out_locked;
272  		goto out_sleep;
273  	}
274  	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
275  		goto out_unlock;
276  	xprt->snd_task = task;
277  
278  out_locked:
279  	trace_xprt_reserve_xprt(xprt, task);
280  	return 1;
281  
282  out_unlock:
283  	xprt_clear_locked(xprt);
284  out_sleep:
285  	task->tk_status = -EAGAIN;
286  	if (RPC_IS_SOFT(task) || RPC_IS_SOFTCONN(task))
287  		rpc_sleep_on_timeout(&xprt->sending, task, NULL,
288  				xprt_request_timeout(req));
289  	else
290  		rpc_sleep_on(&xprt->sending, task, NULL);
291  	return 0;
292  }
293  EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
294  
295  static bool
xprt_need_congestion_window_wait(struct rpc_xprt * xprt)296  xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
297  {
298  	return test_bit(XPRT_CWND_WAIT, &xprt->state);
299  }
300  
301  static void
xprt_set_congestion_window_wait(struct rpc_xprt * xprt)302  xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
303  {
304  	if (!list_empty(&xprt->xmit_queue)) {
305  		/* Peek at head of queue to see if it can make progress */
306  		if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
307  					rq_xmit)->rq_cong)
308  			return;
309  	}
310  	set_bit(XPRT_CWND_WAIT, &xprt->state);
311  }
312  
313  static void
xprt_test_and_clear_congestion_window_wait(struct rpc_xprt * xprt)314  xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
315  {
316  	if (!RPCXPRT_CONGESTED(xprt))
317  		clear_bit(XPRT_CWND_WAIT, &xprt->state);
318  }
319  
320  /*
321   * xprt_reserve_xprt_cong - serialize write access to transports
322   * @task: task that is requesting access to the transport
323   *
324   * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
325   * integrated into the decision of whether a request is allowed to be
326   * woken up and given access to the transport.
327   * Note that the lock is only granted if we know there are free slots.
328   */
xprt_reserve_xprt_cong(struct rpc_xprt * xprt,struct rpc_task * task)329  int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
330  {
331  	struct rpc_rqst *req = task->tk_rqstp;
332  
333  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
334  		if (task == xprt->snd_task)
335  			goto out_locked;
336  		goto out_sleep;
337  	}
338  	if (req == NULL) {
339  		xprt->snd_task = task;
340  		goto out_locked;
341  	}
342  	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
343  		goto out_unlock;
344  	if (!xprt_need_congestion_window_wait(xprt)) {
345  		xprt->snd_task = task;
346  		goto out_locked;
347  	}
348  out_unlock:
349  	xprt_clear_locked(xprt);
350  out_sleep:
351  	task->tk_status = -EAGAIN;
352  	if (RPC_IS_SOFT(task) || RPC_IS_SOFTCONN(task))
353  		rpc_sleep_on_timeout(&xprt->sending, task, NULL,
354  				xprt_request_timeout(req));
355  	else
356  		rpc_sleep_on(&xprt->sending, task, NULL);
357  	return 0;
358  out_locked:
359  	trace_xprt_reserve_cong(xprt, task);
360  	return 1;
361  }
362  EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
363  
xprt_lock_write(struct rpc_xprt * xprt,struct rpc_task * task)364  static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
365  {
366  	int retval;
367  
368  	if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
369  		return 1;
370  	spin_lock(&xprt->transport_lock);
371  	retval = xprt->ops->reserve_xprt(xprt, task);
372  	spin_unlock(&xprt->transport_lock);
373  	return retval;
374  }
375  
__xprt_lock_write_func(struct rpc_task * task,void * data)376  static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
377  {
378  	struct rpc_xprt *xprt = data;
379  
380  	xprt->snd_task = task;
381  	return true;
382  }
383  
__xprt_lock_write_next(struct rpc_xprt * xprt)384  static void __xprt_lock_write_next(struct rpc_xprt *xprt)
385  {
386  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
387  		return;
388  	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
389  		goto out_unlock;
390  	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
391  				__xprt_lock_write_func, xprt))
392  		return;
393  out_unlock:
394  	xprt_clear_locked(xprt);
395  }
396  
__xprt_lock_write_next_cong(struct rpc_xprt * xprt)397  static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
398  {
399  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
400  		return;
401  	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
402  		goto out_unlock;
403  	if (xprt_need_congestion_window_wait(xprt))
404  		goto out_unlock;
405  	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
406  				__xprt_lock_write_func, xprt))
407  		return;
408  out_unlock:
409  	xprt_clear_locked(xprt);
410  }
411  
412  /**
413   * xprt_release_xprt - allow other requests to use a transport
414   * @xprt: transport with other tasks potentially waiting
415   * @task: task that is releasing access to the transport
416   *
417   * Note that "task" can be NULL.  No congestion control is provided.
418   */
xprt_release_xprt(struct rpc_xprt * xprt,struct rpc_task * task)419  void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
420  {
421  	if (xprt->snd_task == task) {
422  		xprt_clear_locked(xprt);
423  		__xprt_lock_write_next(xprt);
424  	}
425  	trace_xprt_release_xprt(xprt, task);
426  }
427  EXPORT_SYMBOL_GPL(xprt_release_xprt);
428  
429  /**
430   * xprt_release_xprt_cong - allow other requests to use a transport
431   * @xprt: transport with other tasks potentially waiting
432   * @task: task that is releasing access to the transport
433   *
434   * Note that "task" can be NULL.  Another task is awoken to use the
435   * transport if the transport's congestion window allows it.
436   */
xprt_release_xprt_cong(struct rpc_xprt * xprt,struct rpc_task * task)437  void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
438  {
439  	if (xprt->snd_task == task) {
440  		xprt_clear_locked(xprt);
441  		__xprt_lock_write_next_cong(xprt);
442  	}
443  	trace_xprt_release_cong(xprt, task);
444  }
445  EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
446  
xprt_release_write(struct rpc_xprt * xprt,struct rpc_task * task)447  void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
448  {
449  	if (xprt->snd_task != task)
450  		return;
451  	spin_lock(&xprt->transport_lock);
452  	xprt->ops->release_xprt(xprt, task);
453  	spin_unlock(&xprt->transport_lock);
454  }
455  
456  /*
457   * Van Jacobson congestion avoidance. Check if the congestion window
458   * overflowed. Put the task to sleep if this is the case.
459   */
460  static int
__xprt_get_cong(struct rpc_xprt * xprt,struct rpc_rqst * req)461  __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
462  {
463  	if (req->rq_cong)
464  		return 1;
465  	trace_xprt_get_cong(xprt, req->rq_task);
466  	if (RPCXPRT_CONGESTED(xprt)) {
467  		xprt_set_congestion_window_wait(xprt);
468  		return 0;
469  	}
470  	req->rq_cong = 1;
471  	xprt->cong += RPC_CWNDSCALE;
472  	return 1;
473  }
474  
475  /*
476   * Adjust the congestion window, and wake up the next task
477   * that has been sleeping due to congestion
478   */
479  static void
__xprt_put_cong(struct rpc_xprt * xprt,struct rpc_rqst * req)480  __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
481  {
482  	if (!req->rq_cong)
483  		return;
484  	req->rq_cong = 0;
485  	xprt->cong -= RPC_CWNDSCALE;
486  	xprt_test_and_clear_congestion_window_wait(xprt);
487  	trace_xprt_put_cong(xprt, req->rq_task);
488  	__xprt_lock_write_next_cong(xprt);
489  }
490  
491  /**
492   * xprt_request_get_cong - Request congestion control credits
493   * @xprt: pointer to transport
494   * @req: pointer to RPC request
495   *
496   * Useful for transports that require congestion control.
497   */
498  bool
xprt_request_get_cong(struct rpc_xprt * xprt,struct rpc_rqst * req)499  xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
500  {
501  	bool ret = false;
502  
503  	if (req->rq_cong)
504  		return true;
505  	spin_lock(&xprt->transport_lock);
506  	ret = __xprt_get_cong(xprt, req) != 0;
507  	spin_unlock(&xprt->transport_lock);
508  	return ret;
509  }
510  EXPORT_SYMBOL_GPL(xprt_request_get_cong);
511  
512  /**
513   * xprt_release_rqst_cong - housekeeping when request is complete
514   * @task: RPC request that recently completed
515   *
516   * Useful for transports that require congestion control.
517   */
xprt_release_rqst_cong(struct rpc_task * task)518  void xprt_release_rqst_cong(struct rpc_task *task)
519  {
520  	struct rpc_rqst *req = task->tk_rqstp;
521  
522  	__xprt_put_cong(req->rq_xprt, req);
523  }
524  EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
525  
xprt_clear_congestion_window_wait_locked(struct rpc_xprt * xprt)526  static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
527  {
528  	if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
529  		__xprt_lock_write_next_cong(xprt);
530  }
531  
532  /*
533   * Clear the congestion window wait flag and wake up the next
534   * entry on xprt->sending
535   */
536  static void
xprt_clear_congestion_window_wait(struct rpc_xprt * xprt)537  xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
538  {
539  	if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
540  		spin_lock(&xprt->transport_lock);
541  		__xprt_lock_write_next_cong(xprt);
542  		spin_unlock(&xprt->transport_lock);
543  	}
544  }
545  
546  /**
547   * xprt_adjust_cwnd - adjust transport congestion window
548   * @xprt: pointer to xprt
549   * @task: recently completed RPC request used to adjust window
550   * @result: result code of completed RPC request
551   *
552   * The transport code maintains an estimate on the maximum number of out-
553   * standing RPC requests, using a smoothed version of the congestion
554   * avoidance implemented in 44BSD. This is basically the Van Jacobson
555   * congestion algorithm: If a retransmit occurs, the congestion window is
556   * halved; otherwise, it is incremented by 1/cwnd when
557   *
558   *	-	a reply is received and
559   *	-	a full number of requests are outstanding and
560   *	-	the congestion window hasn't been updated recently.
561   */
xprt_adjust_cwnd(struct rpc_xprt * xprt,struct rpc_task * task,int result)562  void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
563  {
564  	struct rpc_rqst *req = task->tk_rqstp;
565  	unsigned long cwnd = xprt->cwnd;
566  
567  	if (result >= 0 && cwnd <= xprt->cong) {
568  		/* The (cwnd >> 1) term makes sure
569  		 * the result gets rounded properly. */
570  		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
571  		if (cwnd > RPC_MAXCWND(xprt))
572  			cwnd = RPC_MAXCWND(xprt);
573  		__xprt_lock_write_next_cong(xprt);
574  	} else if (result == -ETIMEDOUT) {
575  		cwnd >>= 1;
576  		if (cwnd < RPC_CWNDSCALE)
577  			cwnd = RPC_CWNDSCALE;
578  	}
579  	dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
580  			xprt->cong, xprt->cwnd, cwnd);
581  	xprt->cwnd = cwnd;
582  	__xprt_put_cong(xprt, req);
583  }
584  EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
585  
586  /**
587   * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
588   * @xprt: transport with waiting tasks
589   * @status: result code to plant in each task before waking it
590   *
591   */
xprt_wake_pending_tasks(struct rpc_xprt * xprt,int status)592  void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
593  {
594  	if (status < 0)
595  		rpc_wake_up_status(&xprt->pending, status);
596  	else
597  		rpc_wake_up(&xprt->pending);
598  }
599  EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
600  
601  /**
602   * xprt_wait_for_buffer_space - wait for transport output buffer to clear
603   * @xprt: transport
604   *
605   * Note that we only set the timer for the case of RPC_IS_SOFT(), since
606   * we don't in general want to force a socket disconnection due to
607   * an incomplete RPC call transmission.
608   */
xprt_wait_for_buffer_space(struct rpc_xprt * xprt)609  void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
610  {
611  	set_bit(XPRT_WRITE_SPACE, &xprt->state);
612  }
613  EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
614  
615  static bool
xprt_clear_write_space_locked(struct rpc_xprt * xprt)616  xprt_clear_write_space_locked(struct rpc_xprt *xprt)
617  {
618  	if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
619  		__xprt_lock_write_next(xprt);
620  		dprintk("RPC:       write space: waking waiting task on "
621  				"xprt %p\n", xprt);
622  		return true;
623  	}
624  	return false;
625  }
626  
627  /**
628   * xprt_write_space - wake the task waiting for transport output buffer space
629   * @xprt: transport with waiting tasks
630   *
631   * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
632   */
xprt_write_space(struct rpc_xprt * xprt)633  bool xprt_write_space(struct rpc_xprt *xprt)
634  {
635  	bool ret;
636  
637  	if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
638  		return false;
639  	spin_lock(&xprt->transport_lock);
640  	ret = xprt_clear_write_space_locked(xprt);
641  	spin_unlock(&xprt->transport_lock);
642  	return ret;
643  }
644  EXPORT_SYMBOL_GPL(xprt_write_space);
645  
xprt_abs_ktime_to_jiffies(ktime_t abstime)646  static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
647  {
648  	s64 delta = ktime_to_ns(ktime_get() - abstime);
649  	return likely(delta >= 0) ?
650  		jiffies - nsecs_to_jiffies(delta) :
651  		jiffies + nsecs_to_jiffies(-delta);
652  }
653  
xprt_calc_majortimeo(struct rpc_rqst * req,const struct rpc_timeout * to)654  static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req,
655  		const struct rpc_timeout *to)
656  {
657  	unsigned long majortimeo = req->rq_timeout;
658  
659  	if (to->to_exponential)
660  		majortimeo <<= to->to_retries;
661  	else
662  		majortimeo += to->to_increment * to->to_retries;
663  	if (majortimeo > to->to_maxval || majortimeo == 0)
664  		majortimeo = to->to_maxval;
665  	return majortimeo;
666  }
667  
xprt_reset_majortimeo(struct rpc_rqst * req,const struct rpc_timeout * to)668  static void xprt_reset_majortimeo(struct rpc_rqst *req,
669  		const struct rpc_timeout *to)
670  {
671  	req->rq_majortimeo += xprt_calc_majortimeo(req, to);
672  }
673  
xprt_reset_minortimeo(struct rpc_rqst * req)674  static void xprt_reset_minortimeo(struct rpc_rqst *req)
675  {
676  	req->rq_minortimeo += req->rq_timeout;
677  }
678  
xprt_init_majortimeo(struct rpc_task * task,struct rpc_rqst * req,const struct rpc_timeout * to)679  static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req,
680  		const struct rpc_timeout *to)
681  {
682  	unsigned long time_init;
683  	struct rpc_xprt *xprt = req->rq_xprt;
684  
685  	if (likely(xprt && xprt_connected(xprt)))
686  		time_init = jiffies;
687  	else
688  		time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
689  
690  	req->rq_timeout = to->to_initval;
691  	req->rq_majortimeo = time_init + xprt_calc_majortimeo(req, to);
692  	req->rq_minortimeo = time_init + req->rq_timeout;
693  }
694  
695  /**
696   * xprt_adjust_timeout - adjust timeout values for next retransmit
697   * @req: RPC request containing parameters to use for the adjustment
698   *
699   */
xprt_adjust_timeout(struct rpc_rqst * req)700  int xprt_adjust_timeout(struct rpc_rqst *req)
701  {
702  	struct rpc_xprt *xprt = req->rq_xprt;
703  	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
704  	int status = 0;
705  
706  	if (time_before(jiffies, req->rq_majortimeo)) {
707  		if (time_before(jiffies, req->rq_minortimeo))
708  			return status;
709  		if (to->to_exponential)
710  			req->rq_timeout <<= 1;
711  		else
712  			req->rq_timeout += to->to_increment;
713  		if (to->to_maxval && req->rq_timeout >= to->to_maxval)
714  			req->rq_timeout = to->to_maxval;
715  		req->rq_retries++;
716  	} else {
717  		req->rq_timeout = to->to_initval;
718  		req->rq_retries = 0;
719  		xprt_reset_majortimeo(req, to);
720  		/* Reset the RTT counters == "slow start" */
721  		spin_lock(&xprt->transport_lock);
722  		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
723  		spin_unlock(&xprt->transport_lock);
724  		status = -ETIMEDOUT;
725  	}
726  	xprt_reset_minortimeo(req);
727  
728  	if (req->rq_timeout == 0) {
729  		printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
730  		req->rq_timeout = 5 * HZ;
731  	}
732  	return status;
733  }
734  
xprt_autoclose(struct work_struct * work)735  static void xprt_autoclose(struct work_struct *work)
736  {
737  	struct rpc_xprt *xprt =
738  		container_of(work, struct rpc_xprt, task_cleanup);
739  	unsigned int pflags = memalloc_nofs_save();
740  
741  	trace_xprt_disconnect_auto(xprt);
742  	xprt->connect_cookie++;
743  	smp_mb__before_atomic();
744  	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
745  	xprt->ops->close(xprt);
746  	xprt_release_write(xprt, NULL);
747  	wake_up_bit(&xprt->state, XPRT_LOCKED);
748  	memalloc_nofs_restore(pflags);
749  }
750  
751  /**
752   * xprt_disconnect_done - mark a transport as disconnected
753   * @xprt: transport to flag for disconnect
754   *
755   */
xprt_disconnect_done(struct rpc_xprt * xprt)756  void xprt_disconnect_done(struct rpc_xprt *xprt)
757  {
758  	trace_xprt_disconnect_done(xprt);
759  	spin_lock(&xprt->transport_lock);
760  	xprt_clear_connected(xprt);
761  	xprt_clear_write_space_locked(xprt);
762  	xprt_clear_congestion_window_wait_locked(xprt);
763  	xprt_wake_pending_tasks(xprt, -ENOTCONN);
764  	spin_unlock(&xprt->transport_lock);
765  }
766  EXPORT_SYMBOL_GPL(xprt_disconnect_done);
767  
768  /**
769   * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call
770   * @xprt: transport to disconnect
771   */
xprt_schedule_autoclose_locked(struct rpc_xprt * xprt)772  static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt)
773  {
774  	if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state))
775  		return;
776  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
777  		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
778  	else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state))
779  		rpc_wake_up_queued_task_set_status(&xprt->pending,
780  						   xprt->snd_task, -ENOTCONN);
781  }
782  
783  /**
784   * xprt_force_disconnect - force a transport to disconnect
785   * @xprt: transport to disconnect
786   *
787   */
xprt_force_disconnect(struct rpc_xprt * xprt)788  void xprt_force_disconnect(struct rpc_xprt *xprt)
789  {
790  	trace_xprt_disconnect_force(xprt);
791  
792  	/* Don't race with the test_bit() in xprt_clear_locked() */
793  	spin_lock(&xprt->transport_lock);
794  	xprt_schedule_autoclose_locked(xprt);
795  	spin_unlock(&xprt->transport_lock);
796  }
797  EXPORT_SYMBOL_GPL(xprt_force_disconnect);
798  
799  static unsigned int
xprt_connect_cookie(struct rpc_xprt * xprt)800  xprt_connect_cookie(struct rpc_xprt *xprt)
801  {
802  	return READ_ONCE(xprt->connect_cookie);
803  }
804  
805  static bool
xprt_request_retransmit_after_disconnect(struct rpc_task * task)806  xprt_request_retransmit_after_disconnect(struct rpc_task *task)
807  {
808  	struct rpc_rqst *req = task->tk_rqstp;
809  	struct rpc_xprt *xprt = req->rq_xprt;
810  
811  	return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
812  		!xprt_connected(xprt);
813  }
814  
815  /**
816   * xprt_conditional_disconnect - force a transport to disconnect
817   * @xprt: transport to disconnect
818   * @cookie: 'connection cookie'
819   *
820   * This attempts to break the connection if and only if 'cookie' matches
821   * the current transport 'connection cookie'. It ensures that we don't
822   * try to break the connection more than once when we need to retransmit
823   * a batch of RPC requests.
824   *
825   */
xprt_conditional_disconnect(struct rpc_xprt * xprt,unsigned int cookie)826  void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
827  {
828  	/* Don't race with the test_bit() in xprt_clear_locked() */
829  	spin_lock(&xprt->transport_lock);
830  	if (cookie != xprt->connect_cookie)
831  		goto out;
832  	if (test_bit(XPRT_CLOSING, &xprt->state))
833  		goto out;
834  	xprt_schedule_autoclose_locked(xprt);
835  out:
836  	spin_unlock(&xprt->transport_lock);
837  }
838  
839  static bool
xprt_has_timer(const struct rpc_xprt * xprt)840  xprt_has_timer(const struct rpc_xprt *xprt)
841  {
842  	return xprt->idle_timeout != 0;
843  }
844  
845  static void
xprt_schedule_autodisconnect(struct rpc_xprt * xprt)846  xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
847  	__must_hold(&xprt->transport_lock)
848  {
849  	xprt->last_used = jiffies;
850  	if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
851  		mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
852  }
853  
854  static void
xprt_init_autodisconnect(struct timer_list * t)855  xprt_init_autodisconnect(struct timer_list *t)
856  {
857  	struct rpc_xprt *xprt = from_timer(xprt, t, timer);
858  
859  	if (!RB_EMPTY_ROOT(&xprt->recv_queue))
860  		return;
861  	/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
862  	xprt->last_used = jiffies;
863  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
864  		return;
865  	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
866  }
867  
868  #if IS_ENABLED(CONFIG_FAIL_SUNRPC)
xprt_inject_disconnect(struct rpc_xprt * xprt)869  static void xprt_inject_disconnect(struct rpc_xprt *xprt)
870  {
871  	if (!fail_sunrpc.ignore_client_disconnect &&
872  	    should_fail(&fail_sunrpc.attr, 1))
873  		xprt->ops->inject_disconnect(xprt);
874  }
875  #else
xprt_inject_disconnect(struct rpc_xprt * xprt)876  static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
877  {
878  }
879  #endif
880  
xprt_lock_connect(struct rpc_xprt * xprt,struct rpc_task * task,void * cookie)881  bool xprt_lock_connect(struct rpc_xprt *xprt,
882  		struct rpc_task *task,
883  		void *cookie)
884  {
885  	bool ret = false;
886  
887  	spin_lock(&xprt->transport_lock);
888  	if (!test_bit(XPRT_LOCKED, &xprt->state))
889  		goto out;
890  	if (xprt->snd_task != task)
891  		goto out;
892  	set_bit(XPRT_SND_IS_COOKIE, &xprt->state);
893  	xprt->snd_task = cookie;
894  	ret = true;
895  out:
896  	spin_unlock(&xprt->transport_lock);
897  	return ret;
898  }
899  EXPORT_SYMBOL_GPL(xprt_lock_connect);
900  
xprt_unlock_connect(struct rpc_xprt * xprt,void * cookie)901  void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
902  {
903  	spin_lock(&xprt->transport_lock);
904  	if (xprt->snd_task != cookie)
905  		goto out;
906  	if (!test_bit(XPRT_LOCKED, &xprt->state))
907  		goto out;
908  	xprt->snd_task =NULL;
909  	clear_bit(XPRT_SND_IS_COOKIE, &xprt->state);
910  	xprt->ops->release_xprt(xprt, NULL);
911  	xprt_schedule_autodisconnect(xprt);
912  out:
913  	spin_unlock(&xprt->transport_lock);
914  	wake_up_bit(&xprt->state, XPRT_LOCKED);
915  }
916  EXPORT_SYMBOL_GPL(xprt_unlock_connect);
917  
918  /**
919   * xprt_connect - schedule a transport connect operation
920   * @task: RPC task that is requesting the connect
921   *
922   */
xprt_connect(struct rpc_task * task)923  void xprt_connect(struct rpc_task *task)
924  {
925  	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;
926  
927  	trace_xprt_connect(xprt);
928  
929  	if (!xprt_bound(xprt)) {
930  		task->tk_status = -EAGAIN;
931  		return;
932  	}
933  	if (!xprt_lock_write(xprt, task))
934  		return;
935  
936  	if (!xprt_connected(xprt) && !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
937  		task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
938  		rpc_sleep_on_timeout(&xprt->pending, task, NULL,
939  				xprt_request_timeout(task->tk_rqstp));
940  
941  		if (test_bit(XPRT_CLOSING, &xprt->state))
942  			return;
943  		if (xprt_test_and_set_connecting(xprt))
944  			return;
945  		/* Race breaker */
946  		if (!xprt_connected(xprt)) {
947  			xprt->stat.connect_start = jiffies;
948  			xprt->ops->connect(xprt, task);
949  		} else {
950  			xprt_clear_connecting(xprt);
951  			task->tk_status = 0;
952  			rpc_wake_up_queued_task(&xprt->pending, task);
953  		}
954  	}
955  	xprt_release_write(xprt, task);
956  }
957  
958  /**
959   * xprt_reconnect_delay - compute the wait before scheduling a connect
960   * @xprt: transport instance
961   *
962   */
xprt_reconnect_delay(const struct rpc_xprt * xprt)963  unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
964  {
965  	unsigned long start, now = jiffies;
966  
967  	start = xprt->stat.connect_start + xprt->reestablish_timeout;
968  	if (time_after(start, now))
969  		return start - now;
970  	return 0;
971  }
972  EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
973  
974  /**
975   * xprt_reconnect_backoff - compute the new re-establish timeout
976   * @xprt: transport instance
977   * @init_to: initial reestablish timeout
978   *
979   */
xprt_reconnect_backoff(struct rpc_xprt * xprt,unsigned long init_to)980  void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
981  {
982  	xprt->reestablish_timeout <<= 1;
983  	if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
984  		xprt->reestablish_timeout = xprt->max_reconnect_timeout;
985  	if (xprt->reestablish_timeout < init_to)
986  		xprt->reestablish_timeout = init_to;
987  }
988  EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
989  
990  enum xprt_xid_rb_cmp {
991  	XID_RB_EQUAL,
992  	XID_RB_LEFT,
993  	XID_RB_RIGHT,
994  };
995  static enum xprt_xid_rb_cmp
xprt_xid_cmp(__be32 xid1,__be32 xid2)996  xprt_xid_cmp(__be32 xid1, __be32 xid2)
997  {
998  	if (xid1 == xid2)
999  		return XID_RB_EQUAL;
1000  	if ((__force u32)xid1 < (__force u32)xid2)
1001  		return XID_RB_LEFT;
1002  	return XID_RB_RIGHT;
1003  }
1004  
1005  static struct rpc_rqst *
xprt_request_rb_find(struct rpc_xprt * xprt,__be32 xid)1006  xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
1007  {
1008  	struct rb_node *n = xprt->recv_queue.rb_node;
1009  	struct rpc_rqst *req;
1010  
1011  	while (n != NULL) {
1012  		req = rb_entry(n, struct rpc_rqst, rq_recv);
1013  		switch (xprt_xid_cmp(xid, req->rq_xid)) {
1014  		case XID_RB_LEFT:
1015  			n = n->rb_left;
1016  			break;
1017  		case XID_RB_RIGHT:
1018  			n = n->rb_right;
1019  			break;
1020  		case XID_RB_EQUAL:
1021  			return req;
1022  		}
1023  	}
1024  	return NULL;
1025  }
1026  
1027  static void
xprt_request_rb_insert(struct rpc_xprt * xprt,struct rpc_rqst * new)1028  xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
1029  {
1030  	struct rb_node **p = &xprt->recv_queue.rb_node;
1031  	struct rb_node *n = NULL;
1032  	struct rpc_rqst *req;
1033  
1034  	while (*p != NULL) {
1035  		n = *p;
1036  		req = rb_entry(n, struct rpc_rqst, rq_recv);
1037  		switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
1038  		case XID_RB_LEFT:
1039  			p = &n->rb_left;
1040  			break;
1041  		case XID_RB_RIGHT:
1042  			p = &n->rb_right;
1043  			break;
1044  		case XID_RB_EQUAL:
1045  			WARN_ON_ONCE(new != req);
1046  			return;
1047  		}
1048  	}
1049  	rb_link_node(&new->rq_recv, n, p);
1050  	rb_insert_color(&new->rq_recv, &xprt->recv_queue);
1051  }
1052  
1053  static void
xprt_request_rb_remove(struct rpc_xprt * xprt,struct rpc_rqst * req)1054  xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
1055  {
1056  	rb_erase(&req->rq_recv, &xprt->recv_queue);
1057  }
1058  
1059  /**
1060   * xprt_lookup_rqst - find an RPC request corresponding to an XID
1061   * @xprt: transport on which the original request was transmitted
1062   * @xid: RPC XID of incoming reply
1063   *
1064   * Caller holds xprt->queue_lock.
1065   */
xprt_lookup_rqst(struct rpc_xprt * xprt,__be32 xid)1066  struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
1067  {
1068  	struct rpc_rqst *entry;
1069  
1070  	entry = xprt_request_rb_find(xprt, xid);
1071  	if (entry != NULL) {
1072  		trace_xprt_lookup_rqst(xprt, xid, 0);
1073  		entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
1074  		return entry;
1075  	}
1076  
1077  	dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
1078  			ntohl(xid));
1079  	trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
1080  	xprt->stat.bad_xids++;
1081  	return NULL;
1082  }
1083  EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
1084  
1085  static bool
xprt_is_pinned_rqst(struct rpc_rqst * req)1086  xprt_is_pinned_rqst(struct rpc_rqst *req)
1087  {
1088  	return atomic_read(&req->rq_pin) != 0;
1089  }
1090  
1091  /**
1092   * xprt_pin_rqst - Pin a request on the transport receive list
1093   * @req: Request to pin
1094   *
1095   * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
1096   * so should be holding xprt->queue_lock.
1097   */
xprt_pin_rqst(struct rpc_rqst * req)1098  void xprt_pin_rqst(struct rpc_rqst *req)
1099  {
1100  	atomic_inc(&req->rq_pin);
1101  }
1102  EXPORT_SYMBOL_GPL(xprt_pin_rqst);
1103  
1104  /**
1105   * xprt_unpin_rqst - Unpin a request on the transport receive list
1106   * @req: Request to pin
1107   *
1108   * Caller should be holding xprt->queue_lock.
1109   */
xprt_unpin_rqst(struct rpc_rqst * req)1110  void xprt_unpin_rqst(struct rpc_rqst *req)
1111  {
1112  	if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
1113  		atomic_dec(&req->rq_pin);
1114  		return;
1115  	}
1116  	if (atomic_dec_and_test(&req->rq_pin))
1117  		wake_up_var(&req->rq_pin);
1118  }
1119  EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
1120  
xprt_wait_on_pinned_rqst(struct rpc_rqst * req)1121  static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
1122  {
1123  	wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
1124  }
1125  
1126  static bool
xprt_request_data_received(struct rpc_task * task)1127  xprt_request_data_received(struct rpc_task *task)
1128  {
1129  	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1130  		READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
1131  }
1132  
1133  static bool
xprt_request_need_enqueue_receive(struct rpc_task * task,struct rpc_rqst * req)1134  xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
1135  {
1136  	return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1137  		READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
1138  }
1139  
1140  /**
1141   * xprt_request_enqueue_receive - Add an request to the receive queue
1142   * @task: RPC task
1143   *
1144   */
1145  int
xprt_request_enqueue_receive(struct rpc_task * task)1146  xprt_request_enqueue_receive(struct rpc_task *task)
1147  {
1148  	struct rpc_rqst *req = task->tk_rqstp;
1149  	struct rpc_xprt *xprt = req->rq_xprt;
1150  	int ret;
1151  
1152  	if (!xprt_request_need_enqueue_receive(task, req))
1153  		return 0;
1154  
1155  	ret = xprt_request_prepare(task->tk_rqstp, &req->rq_rcv_buf);
1156  	if (ret)
1157  		return ret;
1158  	spin_lock(&xprt->queue_lock);
1159  
1160  	/* Update the softirq receive buffer */
1161  	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1162  			sizeof(req->rq_private_buf));
1163  
1164  	/* Add request to the receive list */
1165  	xprt_request_rb_insert(xprt, req);
1166  	set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
1167  	spin_unlock(&xprt->queue_lock);
1168  
1169  	/* Turn off autodisconnect */
1170  	del_timer_sync(&xprt->timer);
1171  	return 0;
1172  }
1173  
1174  /**
1175   * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
1176   * @task: RPC task
1177   *
1178   * Caller must hold xprt->queue_lock.
1179   */
1180  static void
xprt_request_dequeue_receive_locked(struct rpc_task * task)1181  xprt_request_dequeue_receive_locked(struct rpc_task *task)
1182  {
1183  	struct rpc_rqst *req = task->tk_rqstp;
1184  
1185  	if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1186  		xprt_request_rb_remove(req->rq_xprt, req);
1187  }
1188  
1189  /**
1190   * xprt_update_rtt - Update RPC RTT statistics
1191   * @task: RPC request that recently completed
1192   *
1193   * Caller holds xprt->queue_lock.
1194   */
xprt_update_rtt(struct rpc_task * task)1195  void xprt_update_rtt(struct rpc_task *task)
1196  {
1197  	struct rpc_rqst *req = task->tk_rqstp;
1198  	struct rpc_rtt *rtt = task->tk_client->cl_rtt;
1199  	unsigned int timer = task->tk_msg.rpc_proc->p_timer;
1200  	long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
1201  
1202  	if (timer) {
1203  		if (req->rq_ntrans == 1)
1204  			rpc_update_rtt(rtt, timer, m);
1205  		rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
1206  	}
1207  }
1208  EXPORT_SYMBOL_GPL(xprt_update_rtt);
1209  
1210  /**
1211   * xprt_complete_rqst - called when reply processing is complete
1212   * @task: RPC request that recently completed
1213   * @copied: actual number of bytes received from the transport
1214   *
1215   * Caller holds xprt->queue_lock.
1216   */
xprt_complete_rqst(struct rpc_task * task,int copied)1217  void xprt_complete_rqst(struct rpc_task *task, int copied)
1218  {
1219  	struct rpc_rqst *req = task->tk_rqstp;
1220  	struct rpc_xprt *xprt = req->rq_xprt;
1221  
1222  	xprt->stat.recvs++;
1223  
1224  	xdr_free_bvec(&req->rq_rcv_buf);
1225  	req->rq_private_buf.bvec = NULL;
1226  	req->rq_private_buf.len = copied;
1227  	/* Ensure all writes are done before we update */
1228  	/* req->rq_reply_bytes_recvd */
1229  	smp_wmb();
1230  	req->rq_reply_bytes_recvd = copied;
1231  	xprt_request_dequeue_receive_locked(task);
1232  	rpc_wake_up_queued_task(&xprt->pending, task);
1233  }
1234  EXPORT_SYMBOL_GPL(xprt_complete_rqst);
1235  
xprt_timer(struct rpc_task * task)1236  static void xprt_timer(struct rpc_task *task)
1237  {
1238  	struct rpc_rqst *req = task->tk_rqstp;
1239  	struct rpc_xprt *xprt = req->rq_xprt;
1240  
1241  	if (task->tk_status != -ETIMEDOUT)
1242  		return;
1243  
1244  	trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
1245  	if (!req->rq_reply_bytes_recvd) {
1246  		if (xprt->ops->timer)
1247  			xprt->ops->timer(xprt, task);
1248  	} else
1249  		task->tk_status = 0;
1250  }
1251  
1252  /**
1253   * xprt_wait_for_reply_request_def - wait for reply
1254   * @task: pointer to rpc_task
1255   *
1256   * Set a request's retransmit timeout based on the transport's
1257   * default timeout parameters.  Used by transports that don't adjust
1258   * the retransmit timeout based on round-trip time estimation,
1259   * and put the task to sleep on the pending queue.
1260   */
xprt_wait_for_reply_request_def(struct rpc_task * task)1261  void xprt_wait_for_reply_request_def(struct rpc_task *task)
1262  {
1263  	struct rpc_rqst *req = task->tk_rqstp;
1264  
1265  	rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1266  			xprt_request_timeout(req));
1267  }
1268  EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1269  
1270  /**
1271   * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
1272   * @task: pointer to rpc_task
1273   *
1274   * Set a request's retransmit timeout using the RTT estimator,
1275   * and put the task to sleep on the pending queue.
1276   */
xprt_wait_for_reply_request_rtt(struct rpc_task * task)1277  void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1278  {
1279  	int timer = task->tk_msg.rpc_proc->p_timer;
1280  	struct rpc_clnt *clnt = task->tk_client;
1281  	struct rpc_rtt *rtt = clnt->cl_rtt;
1282  	struct rpc_rqst *req = task->tk_rqstp;
1283  	unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1284  	unsigned long timeout;
1285  
1286  	timeout = rpc_calc_rto(rtt, timer);
1287  	timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1288  	if (timeout > max_timeout || timeout == 0)
1289  		timeout = max_timeout;
1290  	rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1291  			jiffies + timeout);
1292  }
1293  EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1294  
1295  /**
1296   * xprt_request_wait_receive - wait for the reply to an RPC request
1297   * @task: RPC task about to send a request
1298   *
1299   */
xprt_request_wait_receive(struct rpc_task * task)1300  void xprt_request_wait_receive(struct rpc_task *task)
1301  {
1302  	struct rpc_rqst *req = task->tk_rqstp;
1303  	struct rpc_xprt *xprt = req->rq_xprt;
1304  
1305  	if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1306  		return;
1307  	/*
1308  	 * Sleep on the pending queue if we're expecting a reply.
1309  	 * The spinlock ensures atomicity between the test of
1310  	 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1311  	 */
1312  	spin_lock(&xprt->queue_lock);
1313  	if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1314  		xprt->ops->wait_for_reply_request(task);
1315  		/*
1316  		 * Send an extra queue wakeup call if the
1317  		 * connection was dropped in case the call to
1318  		 * rpc_sleep_on() raced.
1319  		 */
1320  		if (xprt_request_retransmit_after_disconnect(task))
1321  			rpc_wake_up_queued_task_set_status(&xprt->pending,
1322  					task, -ENOTCONN);
1323  	}
1324  	spin_unlock(&xprt->queue_lock);
1325  }
1326  
1327  static bool
xprt_request_need_enqueue_transmit(struct rpc_task * task,struct rpc_rqst * req)1328  xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1329  {
1330  	return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1331  }
1332  
1333  /**
1334   * xprt_request_enqueue_transmit - queue a task for transmission
1335   * @task: pointer to rpc_task
1336   *
1337   * Add a task to the transmission queue.
1338   */
1339  void
xprt_request_enqueue_transmit(struct rpc_task * task)1340  xprt_request_enqueue_transmit(struct rpc_task *task)
1341  {
1342  	struct rpc_rqst *pos, *req = task->tk_rqstp;
1343  	struct rpc_xprt *xprt = req->rq_xprt;
1344  	int ret;
1345  
1346  	if (xprt_request_need_enqueue_transmit(task, req)) {
1347  		ret = xprt_request_prepare(task->tk_rqstp, &req->rq_snd_buf);
1348  		if (ret) {
1349  			task->tk_status = ret;
1350  			return;
1351  		}
1352  		req->rq_bytes_sent = 0;
1353  		spin_lock(&xprt->queue_lock);
1354  		/*
1355  		 * Requests that carry congestion control credits are added
1356  		 * to the head of the list to avoid starvation issues.
1357  		 */
1358  		if (req->rq_cong) {
1359  			xprt_clear_congestion_window_wait(xprt);
1360  			list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1361  				if (pos->rq_cong)
1362  					continue;
1363  				/* Note: req is added _before_ pos */
1364  				list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1365  				INIT_LIST_HEAD(&req->rq_xmit2);
1366  				goto out;
1367  			}
1368  		} else if (!req->rq_seqno) {
1369  			list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1370  				if (pos->rq_task->tk_owner != task->tk_owner)
1371  					continue;
1372  				list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1373  				INIT_LIST_HEAD(&req->rq_xmit);
1374  				goto out;
1375  			}
1376  		}
1377  		list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1378  		INIT_LIST_HEAD(&req->rq_xmit2);
1379  out:
1380  		atomic_long_inc(&xprt->xmit_queuelen);
1381  		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1382  		spin_unlock(&xprt->queue_lock);
1383  	}
1384  }
1385  
1386  /**
1387   * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
1388   * @task: pointer to rpc_task
1389   *
1390   * Remove a task from the transmission queue
1391   * Caller must hold xprt->queue_lock
1392   */
1393  static void
xprt_request_dequeue_transmit_locked(struct rpc_task * task)1394  xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1395  {
1396  	struct rpc_rqst *req = task->tk_rqstp;
1397  
1398  	if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1399  		return;
1400  	if (!list_empty(&req->rq_xmit)) {
1401  		struct rpc_xprt *xprt = req->rq_xprt;
1402  
1403  		if (list_is_first(&req->rq_xmit, &xprt->xmit_queue) &&
1404  		    xprt->ops->abort_send_request)
1405  			xprt->ops->abort_send_request(req);
1406  
1407  		list_del(&req->rq_xmit);
1408  		if (!list_empty(&req->rq_xmit2)) {
1409  			struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1410  					struct rpc_rqst, rq_xmit2);
1411  			list_del(&req->rq_xmit2);
1412  			list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1413  		}
1414  	} else
1415  		list_del(&req->rq_xmit2);
1416  	atomic_long_dec(&req->rq_xprt->xmit_queuelen);
1417  	xdr_free_bvec(&req->rq_snd_buf);
1418  }
1419  
1420  /**
1421   * xprt_request_dequeue_transmit - remove a task from the transmission queue
1422   * @task: pointer to rpc_task
1423   *
1424   * Remove a task from the transmission queue
1425   */
1426  static void
xprt_request_dequeue_transmit(struct rpc_task * task)1427  xprt_request_dequeue_transmit(struct rpc_task *task)
1428  {
1429  	struct rpc_rqst *req = task->tk_rqstp;
1430  	struct rpc_xprt *xprt = req->rq_xprt;
1431  
1432  	spin_lock(&xprt->queue_lock);
1433  	xprt_request_dequeue_transmit_locked(task);
1434  	spin_unlock(&xprt->queue_lock);
1435  }
1436  
1437  /**
1438   * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
1439   * @task: pointer to rpc_task
1440   *
1441   * Remove a task from the transmit and receive queues, and ensure that
1442   * it is not pinned by the receive work item.
1443   */
1444  void
xprt_request_dequeue_xprt(struct rpc_task * task)1445  xprt_request_dequeue_xprt(struct rpc_task *task)
1446  {
1447  	struct rpc_rqst	*req = task->tk_rqstp;
1448  	struct rpc_xprt *xprt = req->rq_xprt;
1449  
1450  	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1451  	    test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1452  	    xprt_is_pinned_rqst(req)) {
1453  		spin_lock(&xprt->queue_lock);
1454  		while (xprt_is_pinned_rqst(req)) {
1455  			set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1456  			spin_unlock(&xprt->queue_lock);
1457  			xprt_wait_on_pinned_rqst(req);
1458  			spin_lock(&xprt->queue_lock);
1459  			clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1460  		}
1461  		xprt_request_dequeue_transmit_locked(task);
1462  		xprt_request_dequeue_receive_locked(task);
1463  		spin_unlock(&xprt->queue_lock);
1464  		xdr_free_bvec(&req->rq_rcv_buf);
1465  	}
1466  }
1467  
1468  /**
1469   * xprt_request_prepare - prepare an encoded request for transport
1470   * @req: pointer to rpc_rqst
1471   * @buf: pointer to send/rcv xdr_buf
1472   *
1473   * Calls into the transport layer to do whatever is needed to prepare
1474   * the request for transmission or receive.
1475   * Returns error, or zero.
1476   */
1477  static int
xprt_request_prepare(struct rpc_rqst * req,struct xdr_buf * buf)1478  xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf)
1479  {
1480  	struct rpc_xprt *xprt = req->rq_xprt;
1481  
1482  	if (xprt->ops->prepare_request)
1483  		return xprt->ops->prepare_request(req, buf);
1484  	return 0;
1485  }
1486  
1487  /**
1488   * xprt_request_need_retransmit - Test if a task needs retransmission
1489   * @task: pointer to rpc_task
1490   *
1491   * Test for whether a connection breakage requires the task to retransmit
1492   */
1493  bool
xprt_request_need_retransmit(struct rpc_task * task)1494  xprt_request_need_retransmit(struct rpc_task *task)
1495  {
1496  	return xprt_request_retransmit_after_disconnect(task);
1497  }
1498  
1499  /**
1500   * xprt_prepare_transmit - reserve the transport before sending a request
1501   * @task: RPC task about to send a request
1502   *
1503   */
xprt_prepare_transmit(struct rpc_task * task)1504  bool xprt_prepare_transmit(struct rpc_task *task)
1505  {
1506  	struct rpc_rqst	*req = task->tk_rqstp;
1507  	struct rpc_xprt	*xprt = req->rq_xprt;
1508  
1509  	if (!xprt_lock_write(xprt, task)) {
1510  		/* Race breaker: someone may have transmitted us */
1511  		if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1512  			rpc_wake_up_queued_task_set_status(&xprt->sending,
1513  					task, 0);
1514  		return false;
1515  
1516  	}
1517  	if (atomic_read(&xprt->swapper))
1518  		/* This will be clear in __rpc_execute */
1519  		current->flags |= PF_MEMALLOC;
1520  	return true;
1521  }
1522  
xprt_end_transmit(struct rpc_task * task)1523  void xprt_end_transmit(struct rpc_task *task)
1524  {
1525  	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;
1526  
1527  	xprt_inject_disconnect(xprt);
1528  	xprt_release_write(xprt, task);
1529  }
1530  
1531  /**
1532   * xprt_request_transmit - send an RPC request on a transport
1533   * @req: pointer to request to transmit
1534   * @snd_task: RPC task that owns the transport lock
1535   *
1536   * This performs the transmission of a single request.
1537   * Note that if the request is not the same as snd_task, then it
1538   * does need to be pinned.
1539   * Returns '0' on success.
1540   */
1541  static int
xprt_request_transmit(struct rpc_rqst * req,struct rpc_task * snd_task)1542  xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
1543  {
1544  	struct rpc_xprt *xprt = req->rq_xprt;
1545  	struct rpc_task *task = req->rq_task;
1546  	unsigned int connect_cookie;
1547  	int is_retrans = RPC_WAS_SENT(task);
1548  	int status;
1549  
1550  	if (test_bit(XPRT_CLOSE_WAIT, &xprt->state))
1551  		return -ENOTCONN;
1552  
1553  	if (!req->rq_bytes_sent) {
1554  		if (xprt_request_data_received(task)) {
1555  			status = 0;
1556  			goto out_dequeue;
1557  		}
1558  		/* Verify that our message lies in the RPCSEC_GSS window */
1559  		if (rpcauth_xmit_need_reencode(task)) {
1560  			status = -EBADMSG;
1561  			goto out_dequeue;
1562  		}
1563  		if (RPC_SIGNALLED(task)) {
1564  			status = -ERESTARTSYS;
1565  			goto out_dequeue;
1566  		}
1567  	}
1568  
1569  	/*
1570  	 * Update req->rq_ntrans before transmitting to avoid races with
1571  	 * xprt_update_rtt(), which needs to know that it is recording a
1572  	 * reply to the first transmission.
1573  	 */
1574  	req->rq_ntrans++;
1575  
1576  	trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
1577  	connect_cookie = xprt->connect_cookie;
1578  	status = xprt->ops->send_request(req);
1579  	if (status != 0) {
1580  		req->rq_ntrans--;
1581  		trace_xprt_transmit(req, status);
1582  		return status;
1583  	}
1584  
1585  	if (is_retrans) {
1586  		task->tk_client->cl_stats->rpcretrans++;
1587  		trace_xprt_retransmit(req);
1588  	}
1589  
1590  	xprt_inject_disconnect(xprt);
1591  
1592  	task->tk_flags |= RPC_TASK_SENT;
1593  	spin_lock(&xprt->transport_lock);
1594  
1595  	xprt->stat.sends++;
1596  	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
1597  	xprt->stat.bklog_u += xprt->backlog.qlen;
1598  	xprt->stat.sending_u += xprt->sending.qlen;
1599  	xprt->stat.pending_u += xprt->pending.qlen;
1600  	spin_unlock(&xprt->transport_lock);
1601  
1602  	req->rq_connect_cookie = connect_cookie;
1603  out_dequeue:
1604  	trace_xprt_transmit(req, status);
1605  	xprt_request_dequeue_transmit(task);
1606  	rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1607  	return status;
1608  }
1609  
1610  /**
1611   * xprt_transmit - send an RPC request on a transport
1612   * @task: controlling RPC task
1613   *
1614   * Attempts to drain the transmit queue. On exit, either the transport
1615   * signalled an error that needs to be handled before transmission can
1616   * resume, or @task finished transmitting, and detected that it already
1617   * received a reply.
1618   */
1619  void
xprt_transmit(struct rpc_task * task)1620  xprt_transmit(struct rpc_task *task)
1621  {
1622  	struct rpc_rqst *next, *req = task->tk_rqstp;
1623  	struct rpc_xprt	*xprt = req->rq_xprt;
1624  	int status;
1625  
1626  	spin_lock(&xprt->queue_lock);
1627  	for (;;) {
1628  		next = list_first_entry_or_null(&xprt->xmit_queue,
1629  						struct rpc_rqst, rq_xmit);
1630  		if (!next)
1631  			break;
1632  		xprt_pin_rqst(next);
1633  		spin_unlock(&xprt->queue_lock);
1634  		status = xprt_request_transmit(next, task);
1635  		if (status == -EBADMSG && next != req)
1636  			status = 0;
1637  		spin_lock(&xprt->queue_lock);
1638  		xprt_unpin_rqst(next);
1639  		if (status < 0) {
1640  			if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1641  				task->tk_status = status;
1642  			break;
1643  		}
1644  		/* Was @task transmitted, and has it received a reply? */
1645  		if (xprt_request_data_received(task) &&
1646  		    !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1647  			break;
1648  		cond_resched_lock(&xprt->queue_lock);
1649  	}
1650  	spin_unlock(&xprt->queue_lock);
1651  }
1652  
xprt_complete_request_init(struct rpc_task * task)1653  static void xprt_complete_request_init(struct rpc_task *task)
1654  {
1655  	if (task->tk_rqstp)
1656  		xprt_request_init(task);
1657  }
1658  
xprt_add_backlog(struct rpc_xprt * xprt,struct rpc_task * task)1659  void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1660  {
1661  	set_bit(XPRT_CONGESTED, &xprt->state);
1662  	rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
1663  }
1664  EXPORT_SYMBOL_GPL(xprt_add_backlog);
1665  
__xprt_set_rq(struct rpc_task * task,void * data)1666  static bool __xprt_set_rq(struct rpc_task *task, void *data)
1667  {
1668  	struct rpc_rqst *req = data;
1669  
1670  	if (task->tk_rqstp == NULL) {
1671  		memset(req, 0, sizeof(*req));	/* mark unused */
1672  		task->tk_rqstp = req;
1673  		return true;
1674  	}
1675  	return false;
1676  }
1677  
xprt_wake_up_backlog(struct rpc_xprt * xprt,struct rpc_rqst * req)1678  bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
1679  {
1680  	if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
1681  		clear_bit(XPRT_CONGESTED, &xprt->state);
1682  		return false;
1683  	}
1684  	return true;
1685  }
1686  EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
1687  
xprt_throttle_congested(struct rpc_xprt * xprt,struct rpc_task * task)1688  static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1689  {
1690  	bool ret = false;
1691  
1692  	if (!test_bit(XPRT_CONGESTED, &xprt->state))
1693  		goto out;
1694  	spin_lock(&xprt->reserve_lock);
1695  	if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1696  		xprt_add_backlog(xprt, task);
1697  		ret = true;
1698  	}
1699  	spin_unlock(&xprt->reserve_lock);
1700  out:
1701  	return ret;
1702  }
1703  
xprt_dynamic_alloc_slot(struct rpc_xprt * xprt)1704  static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
1705  {
1706  	struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1707  
1708  	if (xprt->num_reqs >= xprt->max_reqs)
1709  		goto out;
1710  	++xprt->num_reqs;
1711  	spin_unlock(&xprt->reserve_lock);
1712  	req = kzalloc(sizeof(*req), rpc_task_gfp_mask());
1713  	spin_lock(&xprt->reserve_lock);
1714  	if (req != NULL)
1715  		goto out;
1716  	--xprt->num_reqs;
1717  	req = ERR_PTR(-ENOMEM);
1718  out:
1719  	return req;
1720  }
1721  
xprt_dynamic_free_slot(struct rpc_xprt * xprt,struct rpc_rqst * req)1722  static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1723  {
1724  	if (xprt->num_reqs > xprt->min_reqs) {
1725  		--xprt->num_reqs;
1726  		kfree(req);
1727  		return true;
1728  	}
1729  	return false;
1730  }
1731  
xprt_alloc_slot(struct rpc_xprt * xprt,struct rpc_task * task)1732  void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1733  {
1734  	struct rpc_rqst *req;
1735  
1736  	spin_lock(&xprt->reserve_lock);
1737  	if (!list_empty(&xprt->free)) {
1738  		req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1739  		list_del(&req->rq_list);
1740  		goto out_init_req;
1741  	}
1742  	req = xprt_dynamic_alloc_slot(xprt);
1743  	if (!IS_ERR(req))
1744  		goto out_init_req;
1745  	switch (PTR_ERR(req)) {
1746  	case -ENOMEM:
1747  		dprintk("RPC:       dynamic allocation of request slot "
1748  				"failed! Retrying\n");
1749  		task->tk_status = -ENOMEM;
1750  		break;
1751  	case -EAGAIN:
1752  		xprt_add_backlog(xprt, task);
1753  		dprintk("RPC:       waiting for request slot\n");
1754  		fallthrough;
1755  	default:
1756  		task->tk_status = -EAGAIN;
1757  	}
1758  	spin_unlock(&xprt->reserve_lock);
1759  	return;
1760  out_init_req:
1761  	xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
1762  				     xprt->num_reqs);
1763  	spin_unlock(&xprt->reserve_lock);
1764  
1765  	task->tk_status = 0;
1766  	task->tk_rqstp = req;
1767  }
1768  EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1769  
xprt_free_slot(struct rpc_xprt * xprt,struct rpc_rqst * req)1770  void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1771  {
1772  	spin_lock(&xprt->reserve_lock);
1773  	if (!xprt_wake_up_backlog(xprt, req) &&
1774  	    !xprt_dynamic_free_slot(xprt, req)) {
1775  		memset(req, 0, sizeof(*req));	/* mark unused */
1776  		list_add(&req->rq_list, &xprt->free);
1777  	}
1778  	spin_unlock(&xprt->reserve_lock);
1779  }
1780  EXPORT_SYMBOL_GPL(xprt_free_slot);
1781  
xprt_free_all_slots(struct rpc_xprt * xprt)1782  static void xprt_free_all_slots(struct rpc_xprt *xprt)
1783  {
1784  	struct rpc_rqst *req;
1785  	while (!list_empty(&xprt->free)) {
1786  		req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1787  		list_del(&req->rq_list);
1788  		kfree(req);
1789  	}
1790  }
1791  
1792  static DEFINE_IDA(rpc_xprt_ids);
1793  
xprt_cleanup_ids(void)1794  void xprt_cleanup_ids(void)
1795  {
1796  	ida_destroy(&rpc_xprt_ids);
1797  }
1798  
xprt_alloc_id(struct rpc_xprt * xprt)1799  static int xprt_alloc_id(struct rpc_xprt *xprt)
1800  {
1801  	int id;
1802  
1803  	id = ida_alloc(&rpc_xprt_ids, GFP_KERNEL);
1804  	if (id < 0)
1805  		return id;
1806  
1807  	xprt->id = id;
1808  	return 0;
1809  }
1810  
xprt_free_id(struct rpc_xprt * xprt)1811  static void xprt_free_id(struct rpc_xprt *xprt)
1812  {
1813  	ida_free(&rpc_xprt_ids, xprt->id);
1814  }
1815  
xprt_alloc(struct net * net,size_t size,unsigned int num_prealloc,unsigned int max_alloc)1816  struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1817  		unsigned int num_prealloc,
1818  		unsigned int max_alloc)
1819  {
1820  	struct rpc_xprt *xprt;
1821  	struct rpc_rqst *req;
1822  	int i;
1823  
1824  	xprt = kzalloc(size, GFP_KERNEL);
1825  	if (xprt == NULL)
1826  		goto out;
1827  
1828  	xprt_alloc_id(xprt);
1829  	xprt_init(xprt, net);
1830  
1831  	for (i = 0; i < num_prealloc; i++) {
1832  		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1833  		if (!req)
1834  			goto out_free;
1835  		list_add(&req->rq_list, &xprt->free);
1836  	}
1837  	xprt->max_reqs = max_t(unsigned int, max_alloc, num_prealloc);
1838  	xprt->min_reqs = num_prealloc;
1839  	xprt->num_reqs = num_prealloc;
1840  
1841  	return xprt;
1842  
1843  out_free:
1844  	xprt_free(xprt);
1845  out:
1846  	return NULL;
1847  }
1848  EXPORT_SYMBOL_GPL(xprt_alloc);
1849  
xprt_free(struct rpc_xprt * xprt)1850  void xprt_free(struct rpc_xprt *xprt)
1851  {
1852  	put_net_track(xprt->xprt_net, &xprt->ns_tracker);
1853  	xprt_free_all_slots(xprt);
1854  	xprt_free_id(xprt);
1855  	rpc_sysfs_xprt_destroy(xprt);
1856  	kfree_rcu(xprt, rcu);
1857  }
1858  EXPORT_SYMBOL_GPL(xprt_free);
1859  
1860  static void
xprt_init_connect_cookie(struct rpc_rqst * req,struct rpc_xprt * xprt)1861  xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1862  {
1863  	req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1864  }
1865  
1866  static __be32
xprt_alloc_xid(struct rpc_xprt * xprt)1867  xprt_alloc_xid(struct rpc_xprt *xprt)
1868  {
1869  	__be32 xid;
1870  
1871  	spin_lock(&xprt->reserve_lock);
1872  	xid = (__force __be32)xprt->xid++;
1873  	spin_unlock(&xprt->reserve_lock);
1874  	return xid;
1875  }
1876  
1877  static void
xprt_init_xid(struct rpc_xprt * xprt)1878  xprt_init_xid(struct rpc_xprt *xprt)
1879  {
1880  	xprt->xid = get_random_u32();
1881  }
1882  
1883  static void
xprt_request_init(struct rpc_task * task)1884  xprt_request_init(struct rpc_task *task)
1885  {
1886  	struct rpc_xprt *xprt = task->tk_xprt;
1887  	struct rpc_rqst	*req = task->tk_rqstp;
1888  
1889  	req->rq_task	= task;
1890  	req->rq_xprt    = xprt;
1891  	req->rq_buffer  = NULL;
1892  	req->rq_xid	= xprt_alloc_xid(xprt);
1893  	xprt_init_connect_cookie(req, xprt);
1894  	req->rq_snd_buf.len = 0;
1895  	req->rq_snd_buf.buflen = 0;
1896  	req->rq_rcv_buf.len = 0;
1897  	req->rq_rcv_buf.buflen = 0;
1898  	req->rq_snd_buf.bvec = NULL;
1899  	req->rq_rcv_buf.bvec = NULL;
1900  	req->rq_release_snd_buf = NULL;
1901  	xprt_init_majortimeo(task, req, task->tk_client->cl_timeout);
1902  
1903  	trace_xprt_reserve(req);
1904  }
1905  
1906  static void
xprt_do_reserve(struct rpc_xprt * xprt,struct rpc_task * task)1907  xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
1908  {
1909  	xprt->ops->alloc_slot(xprt, task);
1910  	if (task->tk_rqstp != NULL)
1911  		xprt_request_init(task);
1912  }
1913  
1914  /**
1915   * xprt_reserve - allocate an RPC request slot
1916   * @task: RPC task requesting a slot allocation
1917   *
1918   * If the transport is marked as being congested, or if no more
1919   * slots are available, place the task on the transport's
1920   * backlog queue.
1921   */
xprt_reserve(struct rpc_task * task)1922  void xprt_reserve(struct rpc_task *task)
1923  {
1924  	struct rpc_xprt *xprt = task->tk_xprt;
1925  
1926  	task->tk_status = 0;
1927  	if (task->tk_rqstp != NULL)
1928  		return;
1929  
1930  	task->tk_status = -EAGAIN;
1931  	if (!xprt_throttle_congested(xprt, task))
1932  		xprt_do_reserve(xprt, task);
1933  }
1934  
1935  /**
1936   * xprt_retry_reserve - allocate an RPC request slot
1937   * @task: RPC task requesting a slot allocation
1938   *
1939   * If no more slots are available, place the task on the transport's
1940   * backlog queue.
1941   * Note that the only difference with xprt_reserve is that we now
1942   * ignore the value of the XPRT_CONGESTED flag.
1943   */
xprt_retry_reserve(struct rpc_task * task)1944  void xprt_retry_reserve(struct rpc_task *task)
1945  {
1946  	struct rpc_xprt *xprt = task->tk_xprt;
1947  
1948  	task->tk_status = 0;
1949  	if (task->tk_rqstp != NULL)
1950  		return;
1951  
1952  	task->tk_status = -EAGAIN;
1953  	xprt_do_reserve(xprt, task);
1954  }
1955  
1956  /**
1957   * xprt_release - release an RPC request slot
1958   * @task: task which is finished with the slot
1959   *
1960   */
xprt_release(struct rpc_task * task)1961  void xprt_release(struct rpc_task *task)
1962  {
1963  	struct rpc_xprt	*xprt;
1964  	struct rpc_rqst	*req = task->tk_rqstp;
1965  
1966  	if (req == NULL) {
1967  		if (task->tk_client) {
1968  			xprt = task->tk_xprt;
1969  			xprt_release_write(xprt, task);
1970  		}
1971  		return;
1972  	}
1973  
1974  	xprt = req->rq_xprt;
1975  	xprt_request_dequeue_xprt(task);
1976  	spin_lock(&xprt->transport_lock);
1977  	xprt->ops->release_xprt(xprt, task);
1978  	if (xprt->ops->release_request)
1979  		xprt->ops->release_request(task);
1980  	xprt_schedule_autodisconnect(xprt);
1981  	spin_unlock(&xprt->transport_lock);
1982  	if (req->rq_buffer)
1983  		xprt->ops->buf_free(task);
1984  	if (req->rq_cred != NULL)
1985  		put_rpccred(req->rq_cred);
1986  	if (req->rq_release_snd_buf)
1987  		req->rq_release_snd_buf(req);
1988  
1989  	task->tk_rqstp = NULL;
1990  	if (likely(!bc_prealloc(req)))
1991  		xprt->ops->free_slot(xprt, req);
1992  	else
1993  		xprt_free_bc_request(req);
1994  }
1995  
1996  #ifdef CONFIG_SUNRPC_BACKCHANNEL
1997  void
xprt_init_bc_request(struct rpc_rqst * req,struct rpc_task * task,const struct rpc_timeout * to)1998  xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task,
1999  		const struct rpc_timeout *to)
2000  {
2001  	struct xdr_buf *xbufp = &req->rq_snd_buf;
2002  
2003  	task->tk_rqstp = req;
2004  	req->rq_task = task;
2005  	xprt_init_connect_cookie(req, req->rq_xprt);
2006  	/*
2007  	 * Set up the xdr_buf length.
2008  	 * This also indicates that the buffer is XDR encoded already.
2009  	 */
2010  	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
2011  		xbufp->tail[0].iov_len;
2012  	/*
2013  	 * Backchannel Replies are sent with !RPC_TASK_SOFT and
2014  	 * RPC_TASK_NO_RETRANS_TIMEOUT. The major timeout setting
2015  	 * affects only how long each Reply waits to be sent when
2016  	 * a transport connection cannot be established.
2017  	 */
2018  	xprt_init_majortimeo(task, req, to);
2019  }
2020  #endif
2021  
xprt_init(struct rpc_xprt * xprt,struct net * net)2022  static void xprt_init(struct rpc_xprt *xprt, struct net *net)
2023  {
2024  	kref_init(&xprt->kref);
2025  
2026  	spin_lock_init(&xprt->transport_lock);
2027  	spin_lock_init(&xprt->reserve_lock);
2028  	spin_lock_init(&xprt->queue_lock);
2029  
2030  	INIT_LIST_HEAD(&xprt->free);
2031  	xprt->recv_queue = RB_ROOT;
2032  	INIT_LIST_HEAD(&xprt->xmit_queue);
2033  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2034  	spin_lock_init(&xprt->bc_pa_lock);
2035  	INIT_LIST_HEAD(&xprt->bc_pa_list);
2036  #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2037  	INIT_LIST_HEAD(&xprt->xprt_switch);
2038  
2039  	xprt->last_used = jiffies;
2040  	xprt->cwnd = RPC_INITCWND;
2041  	xprt->bind_index = 0;
2042  
2043  	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
2044  	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
2045  	rpc_init_wait_queue(&xprt->sending, "xprt_sending");
2046  	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
2047  
2048  	xprt_init_xid(xprt);
2049  
2050  	xprt->xprt_net = get_net_track(net, &xprt->ns_tracker, GFP_KERNEL);
2051  }
2052  
2053  /**
2054   * xprt_create_transport - create an RPC transport
2055   * @args: rpc transport creation arguments
2056   *
2057   */
xprt_create_transport(struct xprt_create * args)2058  struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
2059  {
2060  	struct rpc_xprt	*xprt;
2061  	const struct xprt_class *t;
2062  
2063  	t = xprt_class_find_by_ident(args->ident);
2064  	if (!t) {
2065  		dprintk("RPC: transport (%d) not supported\n", args->ident);
2066  		return ERR_PTR(-EIO);
2067  	}
2068  
2069  	xprt = t->setup(args);
2070  	xprt_class_release(t);
2071  
2072  	if (IS_ERR(xprt))
2073  		goto out;
2074  	if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
2075  		xprt->idle_timeout = 0;
2076  	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
2077  	if (xprt_has_timer(xprt))
2078  		timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
2079  	else
2080  		timer_setup(&xprt->timer, NULL, 0);
2081  
2082  	if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
2083  		xprt_destroy(xprt);
2084  		return ERR_PTR(-EINVAL);
2085  	}
2086  	xprt->servername = kstrdup(args->servername, GFP_KERNEL);
2087  	if (xprt->servername == NULL) {
2088  		xprt_destroy(xprt);
2089  		return ERR_PTR(-ENOMEM);
2090  	}
2091  
2092  	rpc_xprt_debugfs_register(xprt);
2093  
2094  	trace_xprt_create(xprt);
2095  out:
2096  	return xprt;
2097  }
2098  
xprt_destroy_cb(struct work_struct * work)2099  static void xprt_destroy_cb(struct work_struct *work)
2100  {
2101  	struct rpc_xprt *xprt =
2102  		container_of(work, struct rpc_xprt, task_cleanup);
2103  
2104  	trace_xprt_destroy(xprt);
2105  
2106  	rpc_xprt_debugfs_unregister(xprt);
2107  	rpc_destroy_wait_queue(&xprt->binding);
2108  	rpc_destroy_wait_queue(&xprt->pending);
2109  	rpc_destroy_wait_queue(&xprt->sending);
2110  	rpc_destroy_wait_queue(&xprt->backlog);
2111  	kfree(xprt->servername);
2112  	/*
2113  	 * Destroy any existing back channel
2114  	 */
2115  	xprt_destroy_backchannel(xprt, UINT_MAX);
2116  
2117  	/*
2118  	 * Tear down transport state and free the rpc_xprt
2119  	 */
2120  	xprt->ops->destroy(xprt);
2121  }
2122  
2123  /**
2124   * xprt_destroy - destroy an RPC transport, killing off all requests.
2125   * @xprt: transport to destroy
2126   *
2127   */
xprt_destroy(struct rpc_xprt * xprt)2128  static void xprt_destroy(struct rpc_xprt *xprt)
2129  {
2130  	/*
2131  	 * Exclude transport connect/disconnect handlers and autoclose
2132  	 */
2133  	wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
2134  
2135  	/*
2136  	 * xprt_schedule_autodisconnect() can run after XPRT_LOCKED
2137  	 * is cleared.  We use ->transport_lock to ensure the mod_timer()
2138  	 * can only run *before* del_time_sync(), never after.
2139  	 */
2140  	spin_lock(&xprt->transport_lock);
2141  	del_timer_sync(&xprt->timer);
2142  	spin_unlock(&xprt->transport_lock);
2143  
2144  	/*
2145  	 * Destroy sockets etc from the system workqueue so they can
2146  	 * safely flush receive work running on rpciod.
2147  	 */
2148  	INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
2149  	schedule_work(&xprt->task_cleanup);
2150  }
2151  
xprt_destroy_kref(struct kref * kref)2152  static void xprt_destroy_kref(struct kref *kref)
2153  {
2154  	xprt_destroy(container_of(kref, struct rpc_xprt, kref));
2155  }
2156  
2157  /**
2158   * xprt_get - return a reference to an RPC transport.
2159   * @xprt: pointer to the transport
2160   *
2161   */
xprt_get(struct rpc_xprt * xprt)2162  struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
2163  {
2164  	if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
2165  		return xprt;
2166  	return NULL;
2167  }
2168  EXPORT_SYMBOL_GPL(xprt_get);
2169  
2170  /**
2171   * xprt_put - release a reference to an RPC transport.
2172   * @xprt: pointer to the transport
2173   *
2174   */
xprt_put(struct rpc_xprt * xprt)2175  void xprt_put(struct rpc_xprt *xprt)
2176  {
2177  	if (xprt != NULL)
2178  		kref_put(&xprt->kref, xprt_destroy_kref);
2179  }
2180  EXPORT_SYMBOL_GPL(xprt_put);
2181  
xprt_set_offline_locked(struct rpc_xprt * xprt,struct rpc_xprt_switch * xps)2182  void xprt_set_offline_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2183  {
2184  	if (!test_and_set_bit(XPRT_OFFLINE, &xprt->state)) {
2185  		spin_lock(&xps->xps_lock);
2186  		xps->xps_nactive--;
2187  		spin_unlock(&xps->xps_lock);
2188  	}
2189  }
2190  
xprt_set_online_locked(struct rpc_xprt * xprt,struct rpc_xprt_switch * xps)2191  void xprt_set_online_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2192  {
2193  	if (test_and_clear_bit(XPRT_OFFLINE, &xprt->state)) {
2194  		spin_lock(&xps->xps_lock);
2195  		xps->xps_nactive++;
2196  		spin_unlock(&xps->xps_lock);
2197  	}
2198  }
2199  
xprt_delete_locked(struct rpc_xprt * xprt,struct rpc_xprt_switch * xps)2200  void xprt_delete_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2201  {
2202  	if (test_and_set_bit(XPRT_REMOVE, &xprt->state))
2203  		return;
2204  
2205  	xprt_force_disconnect(xprt);
2206  	if (!test_bit(XPRT_CONNECTED, &xprt->state))
2207  		return;
2208  
2209  	if (!xprt->sending.qlen && !xprt->pending.qlen &&
2210  	    !xprt->backlog.qlen && !atomic_long_read(&xprt->queuelen))
2211  		rpc_xprt_switch_remove_xprt(xps, xprt, true);
2212  }
2213