1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   *  linux/net/sunrpc/clnt.c
4   *
5   *  This file contains the high-level RPC interface.
6   *  It is modeled as a finite state machine to support both synchronous
7   *  and asynchronous requests.
8   *
9   *  -	RPC header generation and argument serialization.
10   *  -	Credential refresh.
11   *  -	TCP connect handling.
12   *  -	Retry of operation when it is suspected the operation failed because
13   *	of uid squashing on the server, or when the credentials were stale
14   *	and need to be refreshed, or when a packet was damaged in transit.
15   *	This may be have to be moved to the VFS layer.
16   *
17   *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18   *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19   */
20  
21  
22  #include <linux/module.h>
23  #include <linux/types.h>
24  #include <linux/kallsyms.h>
25  #include <linux/mm.h>
26  #include <linux/namei.h>
27  #include <linux/mount.h>
28  #include <linux/slab.h>
29  #include <linux/rcupdate.h>
30  #include <linux/utsname.h>
31  #include <linux/workqueue.h>
32  #include <linux/in.h>
33  #include <linux/in6.h>
34  #include <linux/un.h>
35  
36  #include <linux/sunrpc/clnt.h>
37  #include <linux/sunrpc/addr.h>
38  #include <linux/sunrpc/rpc_pipe_fs.h>
39  #include <linux/sunrpc/metrics.h>
40  #include <linux/sunrpc/bc_xprt.h>
41  #include <trace/events/sunrpc.h>
42  
43  #include "sunrpc.h"
44  #include "sysfs.h"
45  #include "netns.h"
46  
47  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48  # define RPCDBG_FACILITY	RPCDBG_CALL
49  #endif
50  
51  static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
52  
53  static void	call_start(struct rpc_task *task);
54  static void	call_reserve(struct rpc_task *task);
55  static void	call_reserveresult(struct rpc_task *task);
56  static void	call_allocate(struct rpc_task *task);
57  static void	call_encode(struct rpc_task *task);
58  static void	call_decode(struct rpc_task *task);
59  static void	call_bind(struct rpc_task *task);
60  static void	call_bind_status(struct rpc_task *task);
61  static void	call_transmit(struct rpc_task *task);
62  static void	call_status(struct rpc_task *task);
63  static void	call_transmit_status(struct rpc_task *task);
64  static void	call_refresh(struct rpc_task *task);
65  static void	call_refreshresult(struct rpc_task *task);
66  static void	call_connect(struct rpc_task *task);
67  static void	call_connect_status(struct rpc_task *task);
68  
69  static int	rpc_encode_header(struct rpc_task *task,
70  				  struct xdr_stream *xdr);
71  static int	rpc_decode_header(struct rpc_task *task,
72  				  struct xdr_stream *xdr);
73  static int	rpc_ping(struct rpc_clnt *clnt);
74  static int	rpc_ping_noreply(struct rpc_clnt *clnt);
75  static void	rpc_check_timeout(struct rpc_task *task);
76  
rpc_register_client(struct rpc_clnt * clnt)77  static void rpc_register_client(struct rpc_clnt *clnt)
78  {
79  	struct net *net = rpc_net_ns(clnt);
80  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
81  
82  	spin_lock(&sn->rpc_client_lock);
83  	list_add(&clnt->cl_clients, &sn->all_clients);
84  	spin_unlock(&sn->rpc_client_lock);
85  }
86  
rpc_unregister_client(struct rpc_clnt * clnt)87  static void rpc_unregister_client(struct rpc_clnt *clnt)
88  {
89  	struct net *net = rpc_net_ns(clnt);
90  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
91  
92  	spin_lock(&sn->rpc_client_lock);
93  	list_del(&clnt->cl_clients);
94  	spin_unlock(&sn->rpc_client_lock);
95  }
96  
__rpc_clnt_remove_pipedir(struct rpc_clnt * clnt)97  static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
98  {
99  	rpc_remove_client_dir(clnt);
100  }
101  
rpc_clnt_remove_pipedir(struct rpc_clnt * clnt)102  static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103  {
104  	struct net *net = rpc_net_ns(clnt);
105  	struct super_block *pipefs_sb;
106  
107  	pipefs_sb = rpc_get_sb_net(net);
108  	if (pipefs_sb) {
109  		if (pipefs_sb == clnt->pipefs_sb)
110  			__rpc_clnt_remove_pipedir(clnt);
111  		rpc_put_sb_net(net);
112  	}
113  }
114  
rpc_setup_pipedir_sb(struct super_block * sb,struct rpc_clnt * clnt)115  static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
116  				    struct rpc_clnt *clnt)
117  {
118  	static uint32_t clntid;
119  	const char *dir_name = clnt->cl_program->pipe_dir_name;
120  	char name[15];
121  	struct dentry *dir, *dentry;
122  
123  	dir = rpc_d_lookup_sb(sb, dir_name);
124  	if (dir == NULL) {
125  		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
126  		return dir;
127  	}
128  	for (;;) {
129  		snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
130  		name[sizeof(name) - 1] = '\0';
131  		dentry = rpc_create_client_dir(dir, name, clnt);
132  		if (!IS_ERR(dentry))
133  			break;
134  		if (dentry == ERR_PTR(-EEXIST))
135  			continue;
136  		printk(KERN_INFO "RPC: Couldn't create pipefs entry"
137  				" %s/%s, error %ld\n",
138  				dir_name, name, PTR_ERR(dentry));
139  		break;
140  	}
141  	dput(dir);
142  	return dentry;
143  }
144  
145  static int
rpc_setup_pipedir(struct super_block * pipefs_sb,struct rpc_clnt * clnt)146  rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
147  {
148  	struct dentry *dentry;
149  
150  	clnt->pipefs_sb = pipefs_sb;
151  
152  	if (clnt->cl_program->pipe_dir_name != NULL) {
153  		dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
154  		if (IS_ERR(dentry))
155  			return PTR_ERR(dentry);
156  	}
157  	return 0;
158  }
159  
rpc_clnt_skip_event(struct rpc_clnt * clnt,unsigned long event)160  static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
161  {
162  	if (clnt->cl_program->pipe_dir_name == NULL)
163  		return 1;
164  
165  	switch (event) {
166  	case RPC_PIPEFS_MOUNT:
167  		if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
168  			return 1;
169  		if (refcount_read(&clnt->cl_count) == 0)
170  			return 1;
171  		break;
172  	case RPC_PIPEFS_UMOUNT:
173  		if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
174  			return 1;
175  		break;
176  	}
177  	return 0;
178  }
179  
__rpc_clnt_handle_event(struct rpc_clnt * clnt,unsigned long event,struct super_block * sb)180  static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
181  				   struct super_block *sb)
182  {
183  	struct dentry *dentry;
184  
185  	switch (event) {
186  	case RPC_PIPEFS_MOUNT:
187  		dentry = rpc_setup_pipedir_sb(sb, clnt);
188  		if (!dentry)
189  			return -ENOENT;
190  		if (IS_ERR(dentry))
191  			return PTR_ERR(dentry);
192  		break;
193  	case RPC_PIPEFS_UMOUNT:
194  		__rpc_clnt_remove_pipedir(clnt);
195  		break;
196  	default:
197  		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
198  		return -ENOTSUPP;
199  	}
200  	return 0;
201  }
202  
__rpc_pipefs_event(struct rpc_clnt * clnt,unsigned long event,struct super_block * sb)203  static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
204  				struct super_block *sb)
205  {
206  	int error = 0;
207  
208  	for (;; clnt = clnt->cl_parent) {
209  		if (!rpc_clnt_skip_event(clnt, event))
210  			error = __rpc_clnt_handle_event(clnt, event, sb);
211  		if (error || clnt == clnt->cl_parent)
212  			break;
213  	}
214  	return error;
215  }
216  
rpc_get_client_for_event(struct net * net,int event)217  static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
218  {
219  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
220  	struct rpc_clnt *clnt;
221  
222  	spin_lock(&sn->rpc_client_lock);
223  	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
224  		if (rpc_clnt_skip_event(clnt, event))
225  			continue;
226  		spin_unlock(&sn->rpc_client_lock);
227  		return clnt;
228  	}
229  	spin_unlock(&sn->rpc_client_lock);
230  	return NULL;
231  }
232  
rpc_pipefs_event(struct notifier_block * nb,unsigned long event,void * ptr)233  static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
234  			    void *ptr)
235  {
236  	struct super_block *sb = ptr;
237  	struct rpc_clnt *clnt;
238  	int error = 0;
239  
240  	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
241  		error = __rpc_pipefs_event(clnt, event, sb);
242  		if (error)
243  			break;
244  	}
245  	return error;
246  }
247  
248  static struct notifier_block rpc_clients_block = {
249  	.notifier_call	= rpc_pipefs_event,
250  	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
251  };
252  
rpc_clients_notifier_register(void)253  int rpc_clients_notifier_register(void)
254  {
255  	return rpc_pipefs_notifier_register(&rpc_clients_block);
256  }
257  
rpc_clients_notifier_unregister(void)258  void rpc_clients_notifier_unregister(void)
259  {
260  	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
261  }
262  
rpc_clnt_set_transport(struct rpc_clnt * clnt,struct rpc_xprt * xprt,const struct rpc_timeout * timeout)263  static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
264  		struct rpc_xprt *xprt,
265  		const struct rpc_timeout *timeout)
266  {
267  	struct rpc_xprt *old;
268  
269  	spin_lock(&clnt->cl_lock);
270  	old = rcu_dereference_protected(clnt->cl_xprt,
271  			lockdep_is_held(&clnt->cl_lock));
272  
273  	if (!xprt_bound(xprt))
274  		clnt->cl_autobind = 1;
275  
276  	clnt->cl_timeout = timeout;
277  	rcu_assign_pointer(clnt->cl_xprt, xprt);
278  	spin_unlock(&clnt->cl_lock);
279  
280  	return old;
281  }
282  
rpc_clnt_set_nodename(struct rpc_clnt * clnt,const char * nodename)283  static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
284  {
285  	ssize_t copied;
286  
287  	copied = strscpy(clnt->cl_nodename,
288  			 nodename, sizeof(clnt->cl_nodename));
289  
290  	clnt->cl_nodelen = copied < 0
291  				? sizeof(clnt->cl_nodename) - 1
292  				: copied;
293  }
294  
rpc_client_register(struct rpc_clnt * clnt,rpc_authflavor_t pseudoflavor,const char * client_name)295  static int rpc_client_register(struct rpc_clnt *clnt,
296  			       rpc_authflavor_t pseudoflavor,
297  			       const char *client_name)
298  {
299  	struct rpc_auth_create_args auth_args = {
300  		.pseudoflavor = pseudoflavor,
301  		.target_name = client_name,
302  	};
303  	struct rpc_auth *auth;
304  	struct net *net = rpc_net_ns(clnt);
305  	struct super_block *pipefs_sb;
306  	int err;
307  
308  	rpc_clnt_debugfs_register(clnt);
309  
310  	pipefs_sb = rpc_get_sb_net(net);
311  	if (pipefs_sb) {
312  		err = rpc_setup_pipedir(pipefs_sb, clnt);
313  		if (err)
314  			goto out;
315  	}
316  
317  	rpc_register_client(clnt);
318  	if (pipefs_sb)
319  		rpc_put_sb_net(net);
320  
321  	auth = rpcauth_create(&auth_args, clnt);
322  	if (IS_ERR(auth)) {
323  		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
324  				pseudoflavor);
325  		err = PTR_ERR(auth);
326  		goto err_auth;
327  	}
328  	return 0;
329  err_auth:
330  	pipefs_sb = rpc_get_sb_net(net);
331  	rpc_unregister_client(clnt);
332  	__rpc_clnt_remove_pipedir(clnt);
333  out:
334  	if (pipefs_sb)
335  		rpc_put_sb_net(net);
336  	rpc_sysfs_client_destroy(clnt);
337  	rpc_clnt_debugfs_unregister(clnt);
338  	return err;
339  }
340  
341  static DEFINE_IDA(rpc_clids);
342  
rpc_cleanup_clids(void)343  void rpc_cleanup_clids(void)
344  {
345  	ida_destroy(&rpc_clids);
346  }
347  
rpc_alloc_clid(struct rpc_clnt * clnt)348  static int rpc_alloc_clid(struct rpc_clnt *clnt)
349  {
350  	int clid;
351  
352  	clid = ida_alloc(&rpc_clids, GFP_KERNEL);
353  	if (clid < 0)
354  		return clid;
355  	clnt->cl_clid = clid;
356  	return 0;
357  }
358  
rpc_free_clid(struct rpc_clnt * clnt)359  static void rpc_free_clid(struct rpc_clnt *clnt)
360  {
361  	ida_free(&rpc_clids, clnt->cl_clid);
362  }
363  
rpc_new_client(const struct rpc_create_args * args,struct rpc_xprt_switch * xps,struct rpc_xprt * xprt,struct rpc_clnt * parent)364  static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
365  		struct rpc_xprt_switch *xps,
366  		struct rpc_xprt *xprt,
367  		struct rpc_clnt *parent)
368  {
369  	const struct rpc_program *program = args->program;
370  	const struct rpc_version *version;
371  	struct rpc_clnt *clnt = NULL;
372  	const struct rpc_timeout *timeout;
373  	const char *nodename = args->nodename;
374  	int err;
375  
376  	err = rpciod_up();
377  	if (err)
378  		goto out_no_rpciod;
379  
380  	err = -EINVAL;
381  	if (args->version >= program->nrvers)
382  		goto out_err;
383  	version = program->version[args->version];
384  	if (version == NULL)
385  		goto out_err;
386  
387  	err = -ENOMEM;
388  	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
389  	if (!clnt)
390  		goto out_err;
391  	clnt->cl_parent = parent ? : clnt;
392  	clnt->cl_xprtsec = args->xprtsec;
393  
394  	err = rpc_alloc_clid(clnt);
395  	if (err)
396  		goto out_no_clid;
397  
398  	clnt->cl_cred	  = get_cred(args->cred);
399  	clnt->cl_procinfo = version->procs;
400  	clnt->cl_maxproc  = version->nrprocs;
401  	clnt->cl_prog     = args->prognumber ? : program->number;
402  	clnt->cl_vers     = version->number;
403  	clnt->cl_stats    = args->stats ? : program->stats;
404  	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
405  	rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
406  	err = -ENOMEM;
407  	if (clnt->cl_metrics == NULL)
408  		goto out_no_stats;
409  	clnt->cl_program  = program;
410  	INIT_LIST_HEAD(&clnt->cl_tasks);
411  	spin_lock_init(&clnt->cl_lock);
412  
413  	timeout = xprt->timeout;
414  	if (args->timeout != NULL) {
415  		memcpy(&clnt->cl_timeout_default, args->timeout,
416  				sizeof(clnt->cl_timeout_default));
417  		timeout = &clnt->cl_timeout_default;
418  	}
419  
420  	rpc_clnt_set_transport(clnt, xprt, timeout);
421  	xprt->main = true;
422  	xprt_iter_init(&clnt->cl_xpi, xps);
423  	xprt_switch_put(xps);
424  
425  	clnt->cl_rtt = &clnt->cl_rtt_default;
426  	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
427  
428  	refcount_set(&clnt->cl_count, 1);
429  
430  	if (nodename == NULL)
431  		nodename = utsname()->nodename;
432  	/* save the nodename */
433  	rpc_clnt_set_nodename(clnt, nodename);
434  
435  	rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
436  	err = rpc_client_register(clnt, args->authflavor, args->client_name);
437  	if (err)
438  		goto out_no_path;
439  	if (parent)
440  		refcount_inc(&parent->cl_count);
441  
442  	trace_rpc_clnt_new(clnt, xprt, args);
443  	return clnt;
444  
445  out_no_path:
446  	rpc_free_iostats(clnt->cl_metrics);
447  out_no_stats:
448  	put_cred(clnt->cl_cred);
449  	rpc_free_clid(clnt);
450  out_no_clid:
451  	kfree(clnt);
452  out_err:
453  	rpciod_down();
454  out_no_rpciod:
455  	xprt_switch_put(xps);
456  	xprt_put(xprt);
457  	trace_rpc_clnt_new_err(program->name, args->servername, err);
458  	return ERR_PTR(err);
459  }
460  
rpc_create_xprt(struct rpc_create_args * args,struct rpc_xprt * xprt)461  static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
462  					struct rpc_xprt *xprt)
463  {
464  	struct rpc_clnt *clnt = NULL;
465  	struct rpc_xprt_switch *xps;
466  
467  	if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
468  		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
469  		xps = args->bc_xprt->xpt_bc_xps;
470  		xprt_switch_get(xps);
471  	} else {
472  		xps = xprt_switch_alloc(xprt, GFP_KERNEL);
473  		if (xps == NULL) {
474  			xprt_put(xprt);
475  			return ERR_PTR(-ENOMEM);
476  		}
477  		if (xprt->bc_xprt) {
478  			xprt_switch_get(xps);
479  			xprt->bc_xprt->xpt_bc_xps = xps;
480  		}
481  	}
482  	clnt = rpc_new_client(args, xps, xprt, NULL);
483  	if (IS_ERR(clnt))
484  		return clnt;
485  
486  	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
487  		int err = rpc_ping(clnt);
488  		if (err != 0) {
489  			rpc_shutdown_client(clnt);
490  			return ERR_PTR(err);
491  		}
492  	} else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
493  		int err = rpc_ping_noreply(clnt);
494  		if (err != 0) {
495  			rpc_shutdown_client(clnt);
496  			return ERR_PTR(err);
497  		}
498  	}
499  
500  	clnt->cl_softrtry = 1;
501  	if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
502  		clnt->cl_softrtry = 0;
503  		if (args->flags & RPC_CLNT_CREATE_SOFTERR)
504  			clnt->cl_softerr = 1;
505  	}
506  
507  	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
508  		clnt->cl_autobind = 1;
509  	if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
510  		clnt->cl_noretranstimeo = 1;
511  	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
512  		clnt->cl_discrtry = 1;
513  	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
514  		clnt->cl_chatty = 1;
515  
516  	return clnt;
517  }
518  
519  /**
520   * rpc_create - create an RPC client and transport with one call
521   * @args: rpc_clnt create argument structure
522   *
523   * Creates and initializes an RPC transport and an RPC client.
524   *
525   * It can ping the server in order to determine if it is up, and to see if
526   * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
527   * this behavior so asynchronous tasks can also use rpc_create.
528   */
rpc_create(struct rpc_create_args * args)529  struct rpc_clnt *rpc_create(struct rpc_create_args *args)
530  {
531  	struct rpc_xprt *xprt;
532  	struct xprt_create xprtargs = {
533  		.net = args->net,
534  		.ident = args->protocol,
535  		.srcaddr = args->saddress,
536  		.dstaddr = args->address,
537  		.addrlen = args->addrsize,
538  		.servername = args->servername,
539  		.bc_xprt = args->bc_xprt,
540  		.xprtsec = args->xprtsec,
541  		.connect_timeout = args->connect_timeout,
542  		.reconnect_timeout = args->reconnect_timeout,
543  	};
544  	char servername[RPC_MAXNETNAMELEN];
545  	struct rpc_clnt *clnt;
546  	int i;
547  
548  	if (args->bc_xprt) {
549  		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
550  		xprt = args->bc_xprt->xpt_bc_xprt;
551  		if (xprt) {
552  			xprt_get(xprt);
553  			return rpc_create_xprt(args, xprt);
554  		}
555  	}
556  
557  	if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
558  		xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
559  	if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
560  		xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
561  	/*
562  	 * If the caller chooses not to specify a hostname, whip
563  	 * up a string representation of the passed-in address.
564  	 */
565  	if (xprtargs.servername == NULL) {
566  		struct sockaddr_un *sun =
567  				(struct sockaddr_un *)args->address;
568  		struct sockaddr_in *sin =
569  				(struct sockaddr_in *)args->address;
570  		struct sockaddr_in6 *sin6 =
571  				(struct sockaddr_in6 *)args->address;
572  
573  		servername[0] = '\0';
574  		switch (args->address->sa_family) {
575  		case AF_LOCAL:
576  			if (sun->sun_path[0])
577  				snprintf(servername, sizeof(servername), "%s",
578  					 sun->sun_path);
579  			else
580  				snprintf(servername, sizeof(servername), "@%s",
581  					 sun->sun_path+1);
582  			break;
583  		case AF_INET:
584  			snprintf(servername, sizeof(servername), "%pI4",
585  				 &sin->sin_addr.s_addr);
586  			break;
587  		case AF_INET6:
588  			snprintf(servername, sizeof(servername), "%pI6",
589  				 &sin6->sin6_addr);
590  			break;
591  		default:
592  			/* caller wants default server name, but
593  			 * address family isn't recognized. */
594  			return ERR_PTR(-EINVAL);
595  		}
596  		xprtargs.servername = servername;
597  	}
598  
599  	xprt = xprt_create_transport(&xprtargs);
600  	if (IS_ERR(xprt))
601  		return (struct rpc_clnt *)xprt;
602  
603  	/*
604  	 * By default, kernel RPC client connects from a reserved port.
605  	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
606  	 * but it is always enabled for rpciod, which handles the connect
607  	 * operation.
608  	 */
609  	xprt->resvport = 1;
610  	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
611  		xprt->resvport = 0;
612  	xprt->reuseport = 0;
613  	if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
614  		xprt->reuseport = 1;
615  
616  	clnt = rpc_create_xprt(args, xprt);
617  	if (IS_ERR(clnt) || args->nconnect <= 1)
618  		return clnt;
619  
620  	for (i = 0; i < args->nconnect - 1; i++) {
621  		if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
622  			break;
623  	}
624  	return clnt;
625  }
626  EXPORT_SYMBOL_GPL(rpc_create);
627  
628  /*
629   * This function clones the RPC client structure. It allows us to share the
630   * same transport while varying parameters such as the authentication
631   * flavour.
632   */
__rpc_clone_client(struct rpc_create_args * args,struct rpc_clnt * clnt)633  static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
634  					   struct rpc_clnt *clnt)
635  {
636  	struct rpc_xprt_switch *xps;
637  	struct rpc_xprt *xprt;
638  	struct rpc_clnt *new;
639  	int err;
640  
641  	err = -ENOMEM;
642  	rcu_read_lock();
643  	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
644  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
645  	rcu_read_unlock();
646  	if (xprt == NULL || xps == NULL) {
647  		xprt_put(xprt);
648  		xprt_switch_put(xps);
649  		goto out_err;
650  	}
651  	args->servername = xprt->servername;
652  	args->nodename = clnt->cl_nodename;
653  
654  	new = rpc_new_client(args, xps, xprt, clnt);
655  	if (IS_ERR(new))
656  		return new;
657  
658  	/* Turn off autobind on clones */
659  	new->cl_autobind = 0;
660  	new->cl_softrtry = clnt->cl_softrtry;
661  	new->cl_softerr = clnt->cl_softerr;
662  	new->cl_noretranstimeo = clnt->cl_noretranstimeo;
663  	new->cl_discrtry = clnt->cl_discrtry;
664  	new->cl_chatty = clnt->cl_chatty;
665  	new->cl_principal = clnt->cl_principal;
666  	new->cl_max_connect = clnt->cl_max_connect;
667  	return new;
668  
669  out_err:
670  	trace_rpc_clnt_clone_err(clnt, err);
671  	return ERR_PTR(err);
672  }
673  
674  /**
675   * rpc_clone_client - Clone an RPC client structure
676   *
677   * @clnt: RPC client whose parameters are copied
678   *
679   * Returns a fresh RPC client or an ERR_PTR.
680   */
rpc_clone_client(struct rpc_clnt * clnt)681  struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
682  {
683  	struct rpc_create_args args = {
684  		.program	= clnt->cl_program,
685  		.prognumber	= clnt->cl_prog,
686  		.version	= clnt->cl_vers,
687  		.authflavor	= clnt->cl_auth->au_flavor,
688  		.cred		= clnt->cl_cred,
689  		.stats		= clnt->cl_stats,
690  	};
691  	return __rpc_clone_client(&args, clnt);
692  }
693  EXPORT_SYMBOL_GPL(rpc_clone_client);
694  
695  /**
696   * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
697   *
698   * @clnt: RPC client whose parameters are copied
699   * @flavor: security flavor for new client
700   *
701   * Returns a fresh RPC client or an ERR_PTR.
702   */
703  struct rpc_clnt *
rpc_clone_client_set_auth(struct rpc_clnt * clnt,rpc_authflavor_t flavor)704  rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
705  {
706  	struct rpc_create_args args = {
707  		.program	= clnt->cl_program,
708  		.prognumber	= clnt->cl_prog,
709  		.version	= clnt->cl_vers,
710  		.authflavor	= flavor,
711  		.cred		= clnt->cl_cred,
712  		.stats		= clnt->cl_stats,
713  	};
714  	return __rpc_clone_client(&args, clnt);
715  }
716  EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
717  
718  /**
719   * rpc_switch_client_transport: switch the RPC transport on the fly
720   * @clnt: pointer to a struct rpc_clnt
721   * @args: pointer to the new transport arguments
722   * @timeout: pointer to the new timeout parameters
723   *
724   * This function allows the caller to switch the RPC transport for the
725   * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
726   * server, for instance.  It assumes that the caller has ensured that
727   * there are no active RPC tasks by using some form of locking.
728   *
729   * Returns zero if "clnt" is now using the new xprt.  Otherwise a
730   * negative errno is returned, and "clnt" continues to use the old
731   * xprt.
732   */
rpc_switch_client_transport(struct rpc_clnt * clnt,struct xprt_create * args,const struct rpc_timeout * timeout)733  int rpc_switch_client_transport(struct rpc_clnt *clnt,
734  		struct xprt_create *args,
735  		const struct rpc_timeout *timeout)
736  {
737  	const struct rpc_timeout *old_timeo;
738  	rpc_authflavor_t pseudoflavor;
739  	struct rpc_xprt_switch *xps, *oldxps;
740  	struct rpc_xprt *xprt, *old;
741  	struct rpc_clnt *parent;
742  	int err;
743  
744  	args->xprtsec = clnt->cl_xprtsec;
745  	xprt = xprt_create_transport(args);
746  	if (IS_ERR(xprt))
747  		return PTR_ERR(xprt);
748  
749  	xps = xprt_switch_alloc(xprt, GFP_KERNEL);
750  	if (xps == NULL) {
751  		xprt_put(xprt);
752  		return -ENOMEM;
753  	}
754  
755  	pseudoflavor = clnt->cl_auth->au_flavor;
756  
757  	old_timeo = clnt->cl_timeout;
758  	old = rpc_clnt_set_transport(clnt, xprt, timeout);
759  	oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
760  
761  	rpc_unregister_client(clnt);
762  	__rpc_clnt_remove_pipedir(clnt);
763  	rpc_sysfs_client_destroy(clnt);
764  	rpc_clnt_debugfs_unregister(clnt);
765  
766  	/*
767  	 * A new transport was created.  "clnt" therefore
768  	 * becomes the root of a new cl_parent tree.  clnt's
769  	 * children, if it has any, still point to the old xprt.
770  	 */
771  	parent = clnt->cl_parent;
772  	clnt->cl_parent = clnt;
773  
774  	/*
775  	 * The old rpc_auth cache cannot be re-used.  GSS
776  	 * contexts in particular are between a single
777  	 * client and server.
778  	 */
779  	err = rpc_client_register(clnt, pseudoflavor, NULL);
780  	if (err)
781  		goto out_revert;
782  
783  	synchronize_rcu();
784  	if (parent != clnt)
785  		rpc_release_client(parent);
786  	xprt_switch_put(oldxps);
787  	xprt_put(old);
788  	trace_rpc_clnt_replace_xprt(clnt);
789  	return 0;
790  
791  out_revert:
792  	xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
793  	rpc_clnt_set_transport(clnt, old, old_timeo);
794  	clnt->cl_parent = parent;
795  	rpc_client_register(clnt, pseudoflavor, NULL);
796  	xprt_switch_put(xps);
797  	xprt_put(xprt);
798  	trace_rpc_clnt_replace_xprt_err(clnt);
799  	return err;
800  }
801  EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
802  
rpc_clnt_xprt_switch_get(struct rpc_clnt * clnt)803  static struct rpc_xprt_switch *rpc_clnt_xprt_switch_get(struct rpc_clnt *clnt)
804  {
805  	struct rpc_xprt_switch *xps;
806  
807  	rcu_read_lock();
808  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
809  	rcu_read_unlock();
810  
811  	return xps;
812  }
813  
814  static
_rpc_clnt_xprt_iter_init(struct rpc_clnt * clnt,struct rpc_xprt_iter * xpi,void func (struct rpc_xprt_iter * xpi,struct rpc_xprt_switch * xps))815  int _rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi,
816  			     void func(struct rpc_xprt_iter *xpi, struct rpc_xprt_switch *xps))
817  {
818  	struct rpc_xprt_switch *xps;
819  
820  	xps = rpc_clnt_xprt_switch_get(clnt);
821  	if (xps == NULL)
822  		return -EAGAIN;
823  	func(xpi, xps);
824  	xprt_switch_put(xps);
825  	return 0;
826  }
827  
828  static
rpc_clnt_xprt_iter_init(struct rpc_clnt * clnt,struct rpc_xprt_iter * xpi)829  int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
830  {
831  	return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listall);
832  }
833  
834  static
rpc_clnt_xprt_iter_offline_init(struct rpc_clnt * clnt,struct rpc_xprt_iter * xpi)835  int rpc_clnt_xprt_iter_offline_init(struct rpc_clnt *clnt,
836  				    struct rpc_xprt_iter *xpi)
837  {
838  	return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listoffline);
839  }
840  
841  /**
842   * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
843   * @clnt: pointer to client
844   * @fn: function to apply
845   * @data: void pointer to function data
846   *
847   * Iterates through the list of RPC transports currently attached to the
848   * client and applies the function fn(clnt, xprt, data).
849   *
850   * On error, the iteration stops, and the function returns the error value.
851   */
rpc_clnt_iterate_for_each_xprt(struct rpc_clnt * clnt,int (* fn)(struct rpc_clnt *,struct rpc_xprt *,void *),void * data)852  int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
853  		int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
854  		void *data)
855  {
856  	struct rpc_xprt_iter xpi;
857  	int ret;
858  
859  	ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
860  	if (ret)
861  		return ret;
862  	for (;;) {
863  		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
864  
865  		if (!xprt)
866  			break;
867  		ret = fn(clnt, xprt, data);
868  		xprt_put(xprt);
869  		if (ret < 0)
870  			break;
871  	}
872  	xprt_iter_destroy(&xpi);
873  	return ret;
874  }
875  EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
876  
877  /*
878   * Kill all tasks for the given client.
879   * XXX: kill their descendants as well?
880   */
rpc_killall_tasks(struct rpc_clnt * clnt)881  void rpc_killall_tasks(struct rpc_clnt *clnt)
882  {
883  	struct rpc_task	*rovr;
884  
885  
886  	if (list_empty(&clnt->cl_tasks))
887  		return;
888  
889  	/*
890  	 * Spin lock all_tasks to prevent changes...
891  	 */
892  	trace_rpc_clnt_killall(clnt);
893  	spin_lock(&clnt->cl_lock);
894  	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
895  		rpc_signal_task(rovr);
896  	spin_unlock(&clnt->cl_lock);
897  }
898  EXPORT_SYMBOL_GPL(rpc_killall_tasks);
899  
900  /**
901   * rpc_cancel_tasks - try to cancel a set of RPC tasks
902   * @clnt: Pointer to RPC client
903   * @error: RPC task error value to set
904   * @fnmatch: Pointer to selector function
905   * @data: User data
906   *
907   * Uses @fnmatch to define a set of RPC tasks that are to be cancelled.
908   * The argument @error must be a negative error value.
909   */
rpc_cancel_tasks(struct rpc_clnt * clnt,int error,bool (* fnmatch)(const struct rpc_task *,const void *),const void * data)910  unsigned long rpc_cancel_tasks(struct rpc_clnt *clnt, int error,
911  			       bool (*fnmatch)(const struct rpc_task *,
912  					       const void *),
913  			       const void *data)
914  {
915  	struct rpc_task *task;
916  	unsigned long count = 0;
917  
918  	if (list_empty(&clnt->cl_tasks))
919  		return 0;
920  	/*
921  	 * Spin lock all_tasks to prevent changes...
922  	 */
923  	spin_lock(&clnt->cl_lock);
924  	list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
925  		if (!RPC_IS_ACTIVATED(task))
926  			continue;
927  		if (!fnmatch(task, data))
928  			continue;
929  		rpc_task_try_cancel(task, error);
930  		count++;
931  	}
932  	spin_unlock(&clnt->cl_lock);
933  	return count;
934  }
935  EXPORT_SYMBOL_GPL(rpc_cancel_tasks);
936  
rpc_clnt_disconnect_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * dummy)937  static int rpc_clnt_disconnect_xprt(struct rpc_clnt *clnt,
938  				    struct rpc_xprt *xprt, void *dummy)
939  {
940  	if (xprt_connected(xprt))
941  		xprt_force_disconnect(xprt);
942  	return 0;
943  }
944  
rpc_clnt_disconnect(struct rpc_clnt * clnt)945  void rpc_clnt_disconnect(struct rpc_clnt *clnt)
946  {
947  	rpc_clnt_iterate_for_each_xprt(clnt, rpc_clnt_disconnect_xprt, NULL);
948  }
949  EXPORT_SYMBOL_GPL(rpc_clnt_disconnect);
950  
951  /*
952   * Properly shut down an RPC client, terminating all outstanding
953   * requests.
954   */
rpc_shutdown_client(struct rpc_clnt * clnt)955  void rpc_shutdown_client(struct rpc_clnt *clnt)
956  {
957  	might_sleep();
958  
959  	trace_rpc_clnt_shutdown(clnt);
960  
961  	while (!list_empty(&clnt->cl_tasks)) {
962  		rpc_killall_tasks(clnt);
963  		wait_event_timeout(destroy_wait,
964  			list_empty(&clnt->cl_tasks), 1*HZ);
965  	}
966  
967  	rpc_release_client(clnt);
968  }
969  EXPORT_SYMBOL_GPL(rpc_shutdown_client);
970  
971  /*
972   * Free an RPC client
973   */
rpc_free_client_work(struct work_struct * work)974  static void rpc_free_client_work(struct work_struct *work)
975  {
976  	struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
977  
978  	trace_rpc_clnt_free(clnt);
979  
980  	/* These might block on processes that might allocate memory,
981  	 * so they cannot be called in rpciod, so they are handled separately
982  	 * here.
983  	 */
984  	rpc_sysfs_client_destroy(clnt);
985  	rpc_clnt_debugfs_unregister(clnt);
986  	rpc_free_clid(clnt);
987  	rpc_clnt_remove_pipedir(clnt);
988  	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
989  
990  	kfree(clnt);
991  	rpciod_down();
992  }
993  static struct rpc_clnt *
rpc_free_client(struct rpc_clnt * clnt)994  rpc_free_client(struct rpc_clnt *clnt)
995  {
996  	struct rpc_clnt *parent = NULL;
997  
998  	trace_rpc_clnt_release(clnt);
999  	if (clnt->cl_parent != clnt)
1000  		parent = clnt->cl_parent;
1001  	rpc_unregister_client(clnt);
1002  	rpc_free_iostats(clnt->cl_metrics);
1003  	clnt->cl_metrics = NULL;
1004  	xprt_iter_destroy(&clnt->cl_xpi);
1005  	put_cred(clnt->cl_cred);
1006  
1007  	INIT_WORK(&clnt->cl_work, rpc_free_client_work);
1008  	schedule_work(&clnt->cl_work);
1009  	return parent;
1010  }
1011  
1012  /*
1013   * Free an RPC client
1014   */
1015  static struct rpc_clnt *
rpc_free_auth(struct rpc_clnt * clnt)1016  rpc_free_auth(struct rpc_clnt *clnt)
1017  {
1018  	/*
1019  	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
1020  	 *       release remaining GSS contexts. This mechanism ensures
1021  	 *       that it can do so safely.
1022  	 */
1023  	if (clnt->cl_auth != NULL) {
1024  		rpcauth_release(clnt->cl_auth);
1025  		clnt->cl_auth = NULL;
1026  	}
1027  	if (refcount_dec_and_test(&clnt->cl_count))
1028  		return rpc_free_client(clnt);
1029  	return NULL;
1030  }
1031  
1032  /*
1033   * Release reference to the RPC client
1034   */
1035  void
rpc_release_client(struct rpc_clnt * clnt)1036  rpc_release_client(struct rpc_clnt *clnt)
1037  {
1038  	do {
1039  		if (list_empty(&clnt->cl_tasks))
1040  			wake_up(&destroy_wait);
1041  		if (refcount_dec_not_one(&clnt->cl_count))
1042  			break;
1043  		clnt = rpc_free_auth(clnt);
1044  	} while (clnt != NULL);
1045  }
1046  EXPORT_SYMBOL_GPL(rpc_release_client);
1047  
1048  /**
1049   * rpc_bind_new_program - bind a new RPC program to an existing client
1050   * @old: old rpc_client
1051   * @program: rpc program to set
1052   * @vers: rpc program version
1053   *
1054   * Clones the rpc client and sets up a new RPC program. This is mainly
1055   * of use for enabling different RPC programs to share the same transport.
1056   * The Sun NFSv2/v3 ACL protocol can do this.
1057   */
rpc_bind_new_program(struct rpc_clnt * old,const struct rpc_program * program,u32 vers)1058  struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
1059  				      const struct rpc_program *program,
1060  				      u32 vers)
1061  {
1062  	struct rpc_create_args args = {
1063  		.program	= program,
1064  		.prognumber	= program->number,
1065  		.version	= vers,
1066  		.authflavor	= old->cl_auth->au_flavor,
1067  		.cred		= old->cl_cred,
1068  		.stats		= old->cl_stats,
1069  		.timeout	= old->cl_timeout,
1070  	};
1071  	struct rpc_clnt *clnt;
1072  	int err;
1073  
1074  	clnt = __rpc_clone_client(&args, old);
1075  	if (IS_ERR(clnt))
1076  		goto out;
1077  	err = rpc_ping(clnt);
1078  	if (err != 0) {
1079  		rpc_shutdown_client(clnt);
1080  		clnt = ERR_PTR(err);
1081  	}
1082  out:
1083  	return clnt;
1084  }
1085  EXPORT_SYMBOL_GPL(rpc_bind_new_program);
1086  
1087  struct rpc_xprt *
rpc_task_get_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)1088  rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1089  {
1090  	struct rpc_xprt_switch *xps;
1091  
1092  	if (!xprt)
1093  		return NULL;
1094  	rcu_read_lock();
1095  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1096  	atomic_long_inc(&xps->xps_queuelen);
1097  	rcu_read_unlock();
1098  	atomic_long_inc(&xprt->queuelen);
1099  
1100  	return xprt;
1101  }
1102  
1103  static void
rpc_task_release_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)1104  rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1105  {
1106  	struct rpc_xprt_switch *xps;
1107  
1108  	atomic_long_dec(&xprt->queuelen);
1109  	rcu_read_lock();
1110  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1111  	atomic_long_dec(&xps->xps_queuelen);
1112  	rcu_read_unlock();
1113  
1114  	xprt_put(xprt);
1115  }
1116  
rpc_task_release_transport(struct rpc_task * task)1117  void rpc_task_release_transport(struct rpc_task *task)
1118  {
1119  	struct rpc_xprt *xprt = task->tk_xprt;
1120  
1121  	if (xprt) {
1122  		task->tk_xprt = NULL;
1123  		if (task->tk_client)
1124  			rpc_task_release_xprt(task->tk_client, xprt);
1125  		else
1126  			xprt_put(xprt);
1127  	}
1128  }
1129  EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1130  
rpc_task_release_client(struct rpc_task * task)1131  void rpc_task_release_client(struct rpc_task *task)
1132  {
1133  	struct rpc_clnt *clnt = task->tk_client;
1134  
1135  	rpc_task_release_transport(task);
1136  	if (clnt != NULL) {
1137  		/* Remove from client task list */
1138  		spin_lock(&clnt->cl_lock);
1139  		list_del(&task->tk_task);
1140  		spin_unlock(&clnt->cl_lock);
1141  		task->tk_client = NULL;
1142  
1143  		rpc_release_client(clnt);
1144  	}
1145  }
1146  
1147  static struct rpc_xprt *
rpc_task_get_first_xprt(struct rpc_clnt * clnt)1148  rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1149  {
1150  	struct rpc_xprt *xprt;
1151  
1152  	rcu_read_lock();
1153  	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1154  	rcu_read_unlock();
1155  	return rpc_task_get_xprt(clnt, xprt);
1156  }
1157  
1158  static struct rpc_xprt *
rpc_task_get_next_xprt(struct rpc_clnt * clnt)1159  rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1160  {
1161  	return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1162  }
1163  
1164  static
rpc_task_set_transport(struct rpc_task * task,struct rpc_clnt * clnt)1165  void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1166  {
1167  	if (task->tk_xprt) {
1168  		if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1169  		      (task->tk_flags & RPC_TASK_MOVEABLE)))
1170  			return;
1171  		xprt_release(task);
1172  		xprt_put(task->tk_xprt);
1173  	}
1174  	if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1175  		task->tk_xprt = rpc_task_get_first_xprt(clnt);
1176  	else
1177  		task->tk_xprt = rpc_task_get_next_xprt(clnt);
1178  }
1179  
1180  static
rpc_task_set_client(struct rpc_task * task,struct rpc_clnt * clnt)1181  void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1182  {
1183  	rpc_task_set_transport(task, clnt);
1184  	task->tk_client = clnt;
1185  	refcount_inc(&clnt->cl_count);
1186  	if (clnt->cl_softrtry)
1187  		task->tk_flags |= RPC_TASK_SOFT;
1188  	if (clnt->cl_softerr)
1189  		task->tk_flags |= RPC_TASK_TIMEOUT;
1190  	if (clnt->cl_noretranstimeo)
1191  		task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1192  	/* Add to the client's list of all tasks */
1193  	spin_lock(&clnt->cl_lock);
1194  	list_add_tail(&task->tk_task, &clnt->cl_tasks);
1195  	spin_unlock(&clnt->cl_lock);
1196  }
1197  
1198  static void
rpc_task_set_rpc_message(struct rpc_task * task,const struct rpc_message * msg)1199  rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1200  {
1201  	if (msg != NULL) {
1202  		task->tk_msg.rpc_proc = msg->rpc_proc;
1203  		task->tk_msg.rpc_argp = msg->rpc_argp;
1204  		task->tk_msg.rpc_resp = msg->rpc_resp;
1205  		task->tk_msg.rpc_cred = msg->rpc_cred;
1206  		if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1207  			get_cred(task->tk_msg.rpc_cred);
1208  	}
1209  }
1210  
1211  /*
1212   * Default callback for async RPC calls
1213   */
1214  static void
rpc_default_callback(struct rpc_task * task,void * data)1215  rpc_default_callback(struct rpc_task *task, void *data)
1216  {
1217  }
1218  
1219  static const struct rpc_call_ops rpc_default_ops = {
1220  	.rpc_call_done = rpc_default_callback,
1221  };
1222  
1223  /**
1224   * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1225   * @task_setup_data: pointer to task initialisation data
1226   */
rpc_run_task(const struct rpc_task_setup * task_setup_data)1227  struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1228  {
1229  	struct rpc_task *task;
1230  
1231  	task = rpc_new_task(task_setup_data);
1232  	if (IS_ERR(task))
1233  		return task;
1234  
1235  	if (!RPC_IS_ASYNC(task))
1236  		task->tk_flags |= RPC_TASK_CRED_NOREF;
1237  
1238  	rpc_task_set_client(task, task_setup_data->rpc_client);
1239  	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1240  
1241  	if (task->tk_action == NULL)
1242  		rpc_call_start(task);
1243  
1244  	atomic_inc(&task->tk_count);
1245  	rpc_execute(task);
1246  	return task;
1247  }
1248  EXPORT_SYMBOL_GPL(rpc_run_task);
1249  
1250  /**
1251   * rpc_call_sync - Perform a synchronous RPC call
1252   * @clnt: pointer to RPC client
1253   * @msg: RPC call parameters
1254   * @flags: RPC call flags
1255   */
rpc_call_sync(struct rpc_clnt * clnt,const struct rpc_message * msg,int flags)1256  int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1257  {
1258  	struct rpc_task	*task;
1259  	struct rpc_task_setup task_setup_data = {
1260  		.rpc_client = clnt,
1261  		.rpc_message = msg,
1262  		.callback_ops = &rpc_default_ops,
1263  		.flags = flags,
1264  	};
1265  	int status;
1266  
1267  	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1268  	if (flags & RPC_TASK_ASYNC) {
1269  		rpc_release_calldata(task_setup_data.callback_ops,
1270  			task_setup_data.callback_data);
1271  		return -EINVAL;
1272  	}
1273  
1274  	task = rpc_run_task(&task_setup_data);
1275  	if (IS_ERR(task))
1276  		return PTR_ERR(task);
1277  	status = task->tk_status;
1278  	rpc_put_task(task);
1279  	return status;
1280  }
1281  EXPORT_SYMBOL_GPL(rpc_call_sync);
1282  
1283  /**
1284   * rpc_call_async - Perform an asynchronous RPC call
1285   * @clnt: pointer to RPC client
1286   * @msg: RPC call parameters
1287   * @flags: RPC call flags
1288   * @tk_ops: RPC call ops
1289   * @data: user call data
1290   */
1291  int
rpc_call_async(struct rpc_clnt * clnt,const struct rpc_message * msg,int flags,const struct rpc_call_ops * tk_ops,void * data)1292  rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1293  	       const struct rpc_call_ops *tk_ops, void *data)
1294  {
1295  	struct rpc_task	*task;
1296  	struct rpc_task_setup task_setup_data = {
1297  		.rpc_client = clnt,
1298  		.rpc_message = msg,
1299  		.callback_ops = tk_ops,
1300  		.callback_data = data,
1301  		.flags = flags|RPC_TASK_ASYNC,
1302  	};
1303  
1304  	task = rpc_run_task(&task_setup_data);
1305  	if (IS_ERR(task))
1306  		return PTR_ERR(task);
1307  	rpc_put_task(task);
1308  	return 0;
1309  }
1310  EXPORT_SYMBOL_GPL(rpc_call_async);
1311  
1312  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1313  static void call_bc_encode(struct rpc_task *task);
1314  
1315  /**
1316   * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1317   * rpc_execute against it
1318   * @req: RPC request
1319   * @timeout: timeout values to use for this task
1320   */
rpc_run_bc_task(struct rpc_rqst * req,struct rpc_timeout * timeout)1321  struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
1322  		struct rpc_timeout *timeout)
1323  {
1324  	struct rpc_task *task;
1325  	struct rpc_task_setup task_setup_data = {
1326  		.callback_ops = &rpc_default_ops,
1327  		.flags = RPC_TASK_SOFTCONN |
1328  			RPC_TASK_NO_RETRANS_TIMEOUT,
1329  	};
1330  
1331  	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1332  	/*
1333  	 * Create an rpc_task to send the data
1334  	 */
1335  	task = rpc_new_task(&task_setup_data);
1336  	if (IS_ERR(task)) {
1337  		xprt_free_bc_request(req);
1338  		return task;
1339  	}
1340  
1341  	xprt_init_bc_request(req, task, timeout);
1342  
1343  	task->tk_action = call_bc_encode;
1344  	atomic_inc(&task->tk_count);
1345  	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1346  	rpc_execute(task);
1347  
1348  	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1349  	return task;
1350  }
1351  #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1352  
1353  /**
1354   * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1355   * @req: RPC request to prepare
1356   * @pages: vector of struct page pointers
1357   * @base: offset in first page where receive should start, in bytes
1358   * @len: expected size of the upper layer data payload, in bytes
1359   * @hdrsize: expected size of upper layer reply header, in XDR words
1360   *
1361   */
rpc_prepare_reply_pages(struct rpc_rqst * req,struct page ** pages,unsigned int base,unsigned int len,unsigned int hdrsize)1362  void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1363  			     unsigned int base, unsigned int len,
1364  			     unsigned int hdrsize)
1365  {
1366  	hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1367  
1368  	xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1369  	trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1370  }
1371  EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1372  
1373  void
rpc_call_start(struct rpc_task * task)1374  rpc_call_start(struct rpc_task *task)
1375  {
1376  	task->tk_action = call_start;
1377  }
1378  EXPORT_SYMBOL_GPL(rpc_call_start);
1379  
1380  /**
1381   * rpc_peeraddr - extract remote peer address from clnt's xprt
1382   * @clnt: RPC client structure
1383   * @buf: target buffer
1384   * @bufsize: length of target buffer
1385   *
1386   * Returns the number of bytes that are actually in the stored address.
1387   */
rpc_peeraddr(struct rpc_clnt * clnt,struct sockaddr * buf,size_t bufsize)1388  size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1389  {
1390  	size_t bytes;
1391  	struct rpc_xprt *xprt;
1392  
1393  	rcu_read_lock();
1394  	xprt = rcu_dereference(clnt->cl_xprt);
1395  
1396  	bytes = xprt->addrlen;
1397  	if (bytes > bufsize)
1398  		bytes = bufsize;
1399  	memcpy(buf, &xprt->addr, bytes);
1400  	rcu_read_unlock();
1401  
1402  	return bytes;
1403  }
1404  EXPORT_SYMBOL_GPL(rpc_peeraddr);
1405  
1406  /**
1407   * rpc_peeraddr2str - return remote peer address in printable format
1408   * @clnt: RPC client structure
1409   * @format: address format
1410   *
1411   * NB: the lifetime of the memory referenced by the returned pointer is
1412   * the same as the rpc_xprt itself.  As long as the caller uses this
1413   * pointer, it must hold the RCU read lock.
1414   */
rpc_peeraddr2str(struct rpc_clnt * clnt,enum rpc_display_format_t format)1415  const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1416  			     enum rpc_display_format_t format)
1417  {
1418  	struct rpc_xprt *xprt;
1419  
1420  	xprt = rcu_dereference(clnt->cl_xprt);
1421  
1422  	if (xprt->address_strings[format] != NULL)
1423  		return xprt->address_strings[format];
1424  	else
1425  		return "unprintable";
1426  }
1427  EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1428  
1429  static const struct sockaddr_in rpc_inaddr_loopback = {
1430  	.sin_family		= AF_INET,
1431  	.sin_addr.s_addr	= htonl(INADDR_ANY),
1432  };
1433  
1434  static const struct sockaddr_in6 rpc_in6addr_loopback = {
1435  	.sin6_family		= AF_INET6,
1436  	.sin6_addr		= IN6ADDR_ANY_INIT,
1437  };
1438  
1439  /*
1440   * Try a getsockname() on a connected datagram socket.  Using a
1441   * connected datagram socket prevents leaving a socket in TIME_WAIT.
1442   * This conserves the ephemeral port number space.
1443   *
1444   * Returns zero and fills in "buf" if successful; otherwise, a
1445   * negative errno is returned.
1446   */
rpc_sockname(struct net * net,struct sockaddr * sap,size_t salen,struct sockaddr * buf)1447  static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1448  			struct sockaddr *buf)
1449  {
1450  	struct socket *sock;
1451  	int err;
1452  
1453  	err = __sock_create(net, sap->sa_family,
1454  				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1455  	if (err < 0) {
1456  		dprintk("RPC:       can't create UDP socket (%d)\n", err);
1457  		goto out;
1458  	}
1459  
1460  	switch (sap->sa_family) {
1461  	case AF_INET:
1462  		err = kernel_bind(sock,
1463  				(struct sockaddr *)&rpc_inaddr_loopback,
1464  				sizeof(rpc_inaddr_loopback));
1465  		break;
1466  	case AF_INET6:
1467  		err = kernel_bind(sock,
1468  				(struct sockaddr *)&rpc_in6addr_loopback,
1469  				sizeof(rpc_in6addr_loopback));
1470  		break;
1471  	default:
1472  		err = -EAFNOSUPPORT;
1473  		goto out_release;
1474  	}
1475  	if (err < 0) {
1476  		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1477  		goto out_release;
1478  	}
1479  
1480  	err = kernel_connect(sock, sap, salen, 0);
1481  	if (err < 0) {
1482  		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1483  		goto out_release;
1484  	}
1485  
1486  	err = kernel_getsockname(sock, buf);
1487  	if (err < 0) {
1488  		dprintk("RPC:       getsockname failed (%d)\n", err);
1489  		goto out_release;
1490  	}
1491  
1492  	err = 0;
1493  	if (buf->sa_family == AF_INET6) {
1494  		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1495  		sin6->sin6_scope_id = 0;
1496  	}
1497  	dprintk("RPC:       %s succeeded\n", __func__);
1498  
1499  out_release:
1500  	sock_release(sock);
1501  out:
1502  	return err;
1503  }
1504  
1505  /*
1506   * Scraping a connected socket failed, so we don't have a useable
1507   * local address.  Fallback: generate an address that will prevent
1508   * the server from calling us back.
1509   *
1510   * Returns zero and fills in "buf" if successful; otherwise, a
1511   * negative errno is returned.
1512   */
rpc_anyaddr(int family,struct sockaddr * buf,size_t buflen)1513  static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1514  {
1515  	switch (family) {
1516  	case AF_INET:
1517  		if (buflen < sizeof(rpc_inaddr_loopback))
1518  			return -EINVAL;
1519  		memcpy(buf, &rpc_inaddr_loopback,
1520  				sizeof(rpc_inaddr_loopback));
1521  		break;
1522  	case AF_INET6:
1523  		if (buflen < sizeof(rpc_in6addr_loopback))
1524  			return -EINVAL;
1525  		memcpy(buf, &rpc_in6addr_loopback,
1526  				sizeof(rpc_in6addr_loopback));
1527  		break;
1528  	default:
1529  		dprintk("RPC:       %s: address family not supported\n",
1530  			__func__);
1531  		return -EAFNOSUPPORT;
1532  	}
1533  	dprintk("RPC:       %s: succeeded\n", __func__);
1534  	return 0;
1535  }
1536  
1537  /**
1538   * rpc_localaddr - discover local endpoint address for an RPC client
1539   * @clnt: RPC client structure
1540   * @buf: target buffer
1541   * @buflen: size of target buffer, in bytes
1542   *
1543   * Returns zero and fills in "buf" and "buflen" if successful;
1544   * otherwise, a negative errno is returned.
1545   *
1546   * This works even if the underlying transport is not currently connected,
1547   * or if the upper layer never previously provided a source address.
1548   *
1549   * The result of this function call is transient: multiple calls in
1550   * succession may give different results, depending on how local
1551   * networking configuration changes over time.
1552   */
rpc_localaddr(struct rpc_clnt * clnt,struct sockaddr * buf,size_t buflen)1553  int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1554  {
1555  	struct sockaddr_storage address;
1556  	struct sockaddr *sap = (struct sockaddr *)&address;
1557  	struct rpc_xprt *xprt;
1558  	struct net *net;
1559  	size_t salen;
1560  	int err;
1561  
1562  	rcu_read_lock();
1563  	xprt = rcu_dereference(clnt->cl_xprt);
1564  	salen = xprt->addrlen;
1565  	memcpy(sap, &xprt->addr, salen);
1566  	net = get_net(xprt->xprt_net);
1567  	rcu_read_unlock();
1568  
1569  	rpc_set_port(sap, 0);
1570  	err = rpc_sockname(net, sap, salen, buf);
1571  	put_net(net);
1572  	if (err != 0)
1573  		/* Couldn't discover local address, return ANYADDR */
1574  		return rpc_anyaddr(sap->sa_family, buf, buflen);
1575  	return 0;
1576  }
1577  EXPORT_SYMBOL_GPL(rpc_localaddr);
1578  
1579  void
rpc_setbufsize(struct rpc_clnt * clnt,unsigned int sndsize,unsigned int rcvsize)1580  rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1581  {
1582  	struct rpc_xprt *xprt;
1583  
1584  	rcu_read_lock();
1585  	xprt = rcu_dereference(clnt->cl_xprt);
1586  	if (xprt->ops->set_buffer_size)
1587  		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1588  	rcu_read_unlock();
1589  }
1590  EXPORT_SYMBOL_GPL(rpc_setbufsize);
1591  
1592  /**
1593   * rpc_net_ns - Get the network namespace for this RPC client
1594   * @clnt: RPC client to query
1595   *
1596   */
rpc_net_ns(struct rpc_clnt * clnt)1597  struct net *rpc_net_ns(struct rpc_clnt *clnt)
1598  {
1599  	struct net *ret;
1600  
1601  	rcu_read_lock();
1602  	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1603  	rcu_read_unlock();
1604  	return ret;
1605  }
1606  EXPORT_SYMBOL_GPL(rpc_net_ns);
1607  
1608  /**
1609   * rpc_max_payload - Get maximum payload size for a transport, in bytes
1610   * @clnt: RPC client to query
1611   *
1612   * For stream transports, this is one RPC record fragment (see RFC
1613   * 1831), as we don't support multi-record requests yet.  For datagram
1614   * transports, this is the size of an IP packet minus the IP, UDP, and
1615   * RPC header sizes.
1616   */
rpc_max_payload(struct rpc_clnt * clnt)1617  size_t rpc_max_payload(struct rpc_clnt *clnt)
1618  {
1619  	size_t ret;
1620  
1621  	rcu_read_lock();
1622  	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1623  	rcu_read_unlock();
1624  	return ret;
1625  }
1626  EXPORT_SYMBOL_GPL(rpc_max_payload);
1627  
1628  /**
1629   * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1630   * @clnt: RPC client to query
1631   */
rpc_max_bc_payload(struct rpc_clnt * clnt)1632  size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1633  {
1634  	struct rpc_xprt *xprt;
1635  	size_t ret;
1636  
1637  	rcu_read_lock();
1638  	xprt = rcu_dereference(clnt->cl_xprt);
1639  	ret = xprt->ops->bc_maxpayload(xprt);
1640  	rcu_read_unlock();
1641  	return ret;
1642  }
1643  EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1644  
rpc_num_bc_slots(struct rpc_clnt * clnt)1645  unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1646  {
1647  	struct rpc_xprt *xprt;
1648  	unsigned int ret;
1649  
1650  	rcu_read_lock();
1651  	xprt = rcu_dereference(clnt->cl_xprt);
1652  	ret = xprt->ops->bc_num_slots(xprt);
1653  	rcu_read_unlock();
1654  	return ret;
1655  }
1656  EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1657  
1658  /**
1659   * rpc_force_rebind - force transport to check that remote port is unchanged
1660   * @clnt: client to rebind
1661   *
1662   */
rpc_force_rebind(struct rpc_clnt * clnt)1663  void rpc_force_rebind(struct rpc_clnt *clnt)
1664  {
1665  	if (clnt->cl_autobind) {
1666  		rcu_read_lock();
1667  		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1668  		rcu_read_unlock();
1669  	}
1670  }
1671  EXPORT_SYMBOL_GPL(rpc_force_rebind);
1672  
1673  static int
__rpc_restart_call(struct rpc_task * task,void (* action)(struct rpc_task *))1674  __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1675  {
1676  	task->tk_status = 0;
1677  	task->tk_rpc_status = 0;
1678  	task->tk_action = action;
1679  	return 1;
1680  }
1681  
1682  /*
1683   * Restart an (async) RPC call. Usually called from within the
1684   * exit handler.
1685   */
1686  int
rpc_restart_call(struct rpc_task * task)1687  rpc_restart_call(struct rpc_task *task)
1688  {
1689  	return __rpc_restart_call(task, call_start);
1690  }
1691  EXPORT_SYMBOL_GPL(rpc_restart_call);
1692  
1693  /*
1694   * Restart an (async) RPC call from the call_prepare state.
1695   * Usually called from within the exit handler.
1696   */
1697  int
rpc_restart_call_prepare(struct rpc_task * task)1698  rpc_restart_call_prepare(struct rpc_task *task)
1699  {
1700  	if (task->tk_ops->rpc_call_prepare != NULL)
1701  		return __rpc_restart_call(task, rpc_prepare_task);
1702  	return rpc_restart_call(task);
1703  }
1704  EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1705  
1706  const char
rpc_proc_name(const struct rpc_task * task)1707  *rpc_proc_name(const struct rpc_task *task)
1708  {
1709  	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1710  
1711  	if (proc) {
1712  		if (proc->p_name)
1713  			return proc->p_name;
1714  		else
1715  			return "NULL";
1716  	} else
1717  		return "no proc";
1718  }
1719  
1720  static void
__rpc_call_rpcerror(struct rpc_task * task,int tk_status,int rpc_status)1721  __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1722  {
1723  	trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1724  	rpc_task_set_rpc_status(task, rpc_status);
1725  	rpc_exit(task, tk_status);
1726  }
1727  
1728  static void
rpc_call_rpcerror(struct rpc_task * task,int status)1729  rpc_call_rpcerror(struct rpc_task *task, int status)
1730  {
1731  	__rpc_call_rpcerror(task, status, status);
1732  }
1733  
1734  /*
1735   * 0.  Initial state
1736   *
1737   *     Other FSM states can be visited zero or more times, but
1738   *     this state is visited exactly once for each RPC.
1739   */
1740  static void
call_start(struct rpc_task * task)1741  call_start(struct rpc_task *task)
1742  {
1743  	struct rpc_clnt	*clnt = task->tk_client;
1744  	int idx = task->tk_msg.rpc_proc->p_statidx;
1745  
1746  	trace_rpc_request(task);
1747  
1748  	if (task->tk_client->cl_shutdown) {
1749  		rpc_call_rpcerror(task, -EIO);
1750  		return;
1751  	}
1752  
1753  	/* Increment call count (version might not be valid for ping) */
1754  	if (clnt->cl_program->version[clnt->cl_vers])
1755  		clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1756  	clnt->cl_stats->rpccnt++;
1757  	task->tk_action = call_reserve;
1758  	rpc_task_set_transport(task, clnt);
1759  }
1760  
1761  /*
1762   * 1.	Reserve an RPC call slot
1763   */
1764  static void
call_reserve(struct rpc_task * task)1765  call_reserve(struct rpc_task *task)
1766  {
1767  	task->tk_status  = 0;
1768  	task->tk_action  = call_reserveresult;
1769  	xprt_reserve(task);
1770  }
1771  
1772  static void call_retry_reserve(struct rpc_task *task);
1773  
1774  /*
1775   * 1b.	Grok the result of xprt_reserve()
1776   */
1777  static void
call_reserveresult(struct rpc_task * task)1778  call_reserveresult(struct rpc_task *task)
1779  {
1780  	int status = task->tk_status;
1781  
1782  	/*
1783  	 * After a call to xprt_reserve(), we must have either
1784  	 * a request slot or else an error status.
1785  	 */
1786  	task->tk_status = 0;
1787  	if (status >= 0) {
1788  		if (task->tk_rqstp) {
1789  			task->tk_action = call_refresh;
1790  			return;
1791  		}
1792  
1793  		rpc_call_rpcerror(task, -EIO);
1794  		return;
1795  	}
1796  
1797  	switch (status) {
1798  	case -ENOMEM:
1799  		rpc_delay(task, HZ >> 2);
1800  		fallthrough;
1801  	case -EAGAIN:	/* woken up; retry */
1802  		task->tk_action = call_retry_reserve;
1803  		return;
1804  	default:
1805  		rpc_call_rpcerror(task, status);
1806  	}
1807  }
1808  
1809  /*
1810   * 1c.	Retry reserving an RPC call slot
1811   */
1812  static void
call_retry_reserve(struct rpc_task * task)1813  call_retry_reserve(struct rpc_task *task)
1814  {
1815  	task->tk_status  = 0;
1816  	task->tk_action  = call_reserveresult;
1817  	xprt_retry_reserve(task);
1818  }
1819  
1820  /*
1821   * 2.	Bind and/or refresh the credentials
1822   */
1823  static void
call_refresh(struct rpc_task * task)1824  call_refresh(struct rpc_task *task)
1825  {
1826  	task->tk_action = call_refreshresult;
1827  	task->tk_status = 0;
1828  	task->tk_client->cl_stats->rpcauthrefresh++;
1829  	rpcauth_refreshcred(task);
1830  }
1831  
1832  /*
1833   * 2a.	Process the results of a credential refresh
1834   */
1835  static void
call_refreshresult(struct rpc_task * task)1836  call_refreshresult(struct rpc_task *task)
1837  {
1838  	int status = task->tk_status;
1839  
1840  	task->tk_status = 0;
1841  	task->tk_action = call_refresh;
1842  	switch (status) {
1843  	case 0:
1844  		if (rpcauth_uptodatecred(task)) {
1845  			task->tk_action = call_allocate;
1846  			return;
1847  		}
1848  		/* Use rate-limiting and a max number of retries if refresh
1849  		 * had status 0 but failed to update the cred.
1850  		 */
1851  		fallthrough;
1852  	case -ETIMEDOUT:
1853  		rpc_delay(task, 3*HZ);
1854  		fallthrough;
1855  	case -EAGAIN:
1856  		status = -EACCES;
1857  		fallthrough;
1858  	case -EKEYEXPIRED:
1859  		if (!task->tk_cred_retry)
1860  			break;
1861  		task->tk_cred_retry--;
1862  		trace_rpc_retry_refresh_status(task);
1863  		return;
1864  	case -ENOMEM:
1865  		rpc_delay(task, HZ >> 4);
1866  		return;
1867  	}
1868  	trace_rpc_refresh_status(task);
1869  	rpc_call_rpcerror(task, status);
1870  }
1871  
1872  /*
1873   * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1874   *	(Note: buffer memory is freed in xprt_release).
1875   */
1876  static void
call_allocate(struct rpc_task * task)1877  call_allocate(struct rpc_task *task)
1878  {
1879  	const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1880  	struct rpc_rqst *req = task->tk_rqstp;
1881  	struct rpc_xprt *xprt = req->rq_xprt;
1882  	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1883  	int status;
1884  
1885  	task->tk_status = 0;
1886  	task->tk_action = call_encode;
1887  
1888  	if (req->rq_buffer)
1889  		return;
1890  
1891  	/*
1892  	 * Calculate the size (in quads) of the RPC call
1893  	 * and reply headers, and convert both values
1894  	 * to byte sizes.
1895  	 */
1896  	req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1897  			   proc->p_arglen;
1898  	req->rq_callsize <<= 2;
1899  	/*
1900  	 * Note: the reply buffer must at minimum allocate enough space
1901  	 * for the 'struct accepted_reply' from RFC5531.
1902  	 */
1903  	req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1904  			max_t(size_t, proc->p_replen, 2);
1905  	req->rq_rcvsize <<= 2;
1906  
1907  	status = xprt->ops->buf_alloc(task);
1908  	trace_rpc_buf_alloc(task, status);
1909  	if (status == 0)
1910  		return;
1911  	if (status != -ENOMEM) {
1912  		rpc_call_rpcerror(task, status);
1913  		return;
1914  	}
1915  
1916  	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1917  		task->tk_action = call_allocate;
1918  		rpc_delay(task, HZ>>4);
1919  		return;
1920  	}
1921  
1922  	rpc_call_rpcerror(task, -ERESTARTSYS);
1923  }
1924  
1925  static int
rpc_task_need_encode(struct rpc_task * task)1926  rpc_task_need_encode(struct rpc_task *task)
1927  {
1928  	return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1929  		(!(task->tk_flags & RPC_TASK_SENT) ||
1930  		 !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1931  		 xprt_request_need_retransmit(task));
1932  }
1933  
1934  static void
rpc_xdr_encode(struct rpc_task * task)1935  rpc_xdr_encode(struct rpc_task *task)
1936  {
1937  	struct rpc_rqst	*req = task->tk_rqstp;
1938  	struct xdr_stream xdr;
1939  
1940  	xdr_buf_init(&req->rq_snd_buf,
1941  		     req->rq_buffer,
1942  		     req->rq_callsize);
1943  	xdr_buf_init(&req->rq_rcv_buf,
1944  		     req->rq_rbuffer,
1945  		     req->rq_rcvsize);
1946  
1947  	req->rq_reply_bytes_recvd = 0;
1948  	req->rq_snd_buf.head[0].iov_len = 0;
1949  	xdr_init_encode(&xdr, &req->rq_snd_buf,
1950  			req->rq_snd_buf.head[0].iov_base, req);
1951  	if (rpc_encode_header(task, &xdr))
1952  		return;
1953  
1954  	task->tk_status = rpcauth_wrap_req(task, &xdr);
1955  }
1956  
1957  /*
1958   * 3.	Encode arguments of an RPC call
1959   */
1960  static void
call_encode(struct rpc_task * task)1961  call_encode(struct rpc_task *task)
1962  {
1963  	if (!rpc_task_need_encode(task))
1964  		goto out;
1965  
1966  	/* Dequeue task from the receive queue while we're encoding */
1967  	xprt_request_dequeue_xprt(task);
1968  	/* Encode here so that rpcsec_gss can use correct sequence number. */
1969  	rpc_xdr_encode(task);
1970  	/* Add task to reply queue before transmission to avoid races */
1971  	if (task->tk_status == 0 && rpc_reply_expected(task))
1972  		task->tk_status = xprt_request_enqueue_receive(task);
1973  	/* Did the encode result in an error condition? */
1974  	if (task->tk_status != 0) {
1975  		/* Was the error nonfatal? */
1976  		switch (task->tk_status) {
1977  		case -EAGAIN:
1978  		case -ENOMEM:
1979  			rpc_delay(task, HZ >> 4);
1980  			break;
1981  		case -EKEYEXPIRED:
1982  			if (!task->tk_cred_retry) {
1983  				rpc_call_rpcerror(task, task->tk_status);
1984  			} else {
1985  				task->tk_action = call_refresh;
1986  				task->tk_cred_retry--;
1987  				trace_rpc_retry_refresh_status(task);
1988  			}
1989  			break;
1990  		default:
1991  			rpc_call_rpcerror(task, task->tk_status);
1992  		}
1993  		return;
1994  	}
1995  
1996  	xprt_request_enqueue_transmit(task);
1997  out:
1998  	task->tk_action = call_transmit;
1999  	/* Check that the connection is OK */
2000  	if (!xprt_bound(task->tk_xprt))
2001  		task->tk_action = call_bind;
2002  	else if (!xprt_connected(task->tk_xprt))
2003  		task->tk_action = call_connect;
2004  }
2005  
2006  /*
2007   * Helpers to check if the task was already transmitted, and
2008   * to take action when that is the case.
2009   */
2010  static bool
rpc_task_transmitted(struct rpc_task * task)2011  rpc_task_transmitted(struct rpc_task *task)
2012  {
2013  	return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
2014  }
2015  
2016  static void
rpc_task_handle_transmitted(struct rpc_task * task)2017  rpc_task_handle_transmitted(struct rpc_task *task)
2018  {
2019  	xprt_end_transmit(task);
2020  	task->tk_action = call_transmit_status;
2021  }
2022  
2023  /*
2024   * 4.	Get the server port number if not yet set
2025   */
2026  static void
call_bind(struct rpc_task * task)2027  call_bind(struct rpc_task *task)
2028  {
2029  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2030  
2031  	if (rpc_task_transmitted(task)) {
2032  		rpc_task_handle_transmitted(task);
2033  		return;
2034  	}
2035  
2036  	if (xprt_bound(xprt)) {
2037  		task->tk_action = call_connect;
2038  		return;
2039  	}
2040  
2041  	task->tk_action = call_bind_status;
2042  	if (!xprt_prepare_transmit(task))
2043  		return;
2044  
2045  	xprt->ops->rpcbind(task);
2046  }
2047  
2048  /*
2049   * 4a.	Sort out bind result
2050   */
2051  static void
call_bind_status(struct rpc_task * task)2052  call_bind_status(struct rpc_task *task)
2053  {
2054  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2055  	int status = -EIO;
2056  
2057  	if (rpc_task_transmitted(task)) {
2058  		rpc_task_handle_transmitted(task);
2059  		return;
2060  	}
2061  
2062  	if (task->tk_status >= 0)
2063  		goto out_next;
2064  	if (xprt_bound(xprt)) {
2065  		task->tk_status = 0;
2066  		goto out_next;
2067  	}
2068  
2069  	switch (task->tk_status) {
2070  	case -ENOMEM:
2071  		rpc_delay(task, HZ >> 2);
2072  		goto retry_timeout;
2073  	case -EACCES:
2074  		trace_rpcb_prog_unavail_err(task);
2075  		/* fail immediately if this is an RPC ping */
2076  		if (task->tk_msg.rpc_proc->p_proc == 0) {
2077  			status = -EOPNOTSUPP;
2078  			break;
2079  		}
2080  		rpc_delay(task, 3*HZ);
2081  		goto retry_timeout;
2082  	case -ENOBUFS:
2083  		rpc_delay(task, HZ >> 2);
2084  		goto retry_timeout;
2085  	case -EAGAIN:
2086  		goto retry_timeout;
2087  	case -ETIMEDOUT:
2088  		trace_rpcb_timeout_err(task);
2089  		goto retry_timeout;
2090  	case -EPFNOSUPPORT:
2091  		/* server doesn't support any rpcbind version we know of */
2092  		trace_rpcb_bind_version_err(task);
2093  		break;
2094  	case -EPROTONOSUPPORT:
2095  		trace_rpcb_bind_version_err(task);
2096  		goto retry_timeout;
2097  	case -ECONNREFUSED:		/* connection problems */
2098  	case -ECONNRESET:
2099  	case -ECONNABORTED:
2100  	case -ENOTCONN:
2101  	case -EHOSTDOWN:
2102  	case -ENETDOWN:
2103  	case -EHOSTUNREACH:
2104  	case -ENETUNREACH:
2105  	case -EPIPE:
2106  		trace_rpcb_unreachable_err(task);
2107  		if (!RPC_IS_SOFTCONN(task)) {
2108  			rpc_delay(task, 5*HZ);
2109  			goto retry_timeout;
2110  		}
2111  		status = task->tk_status;
2112  		break;
2113  	default:
2114  		trace_rpcb_unrecognized_err(task);
2115  	}
2116  
2117  	rpc_call_rpcerror(task, status);
2118  	return;
2119  out_next:
2120  	task->tk_action = call_connect;
2121  	return;
2122  retry_timeout:
2123  	task->tk_status = 0;
2124  	task->tk_action = call_bind;
2125  	rpc_check_timeout(task);
2126  }
2127  
2128  /*
2129   * 4b.	Connect to the RPC server
2130   */
2131  static void
call_connect(struct rpc_task * task)2132  call_connect(struct rpc_task *task)
2133  {
2134  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2135  
2136  	if (rpc_task_transmitted(task)) {
2137  		rpc_task_handle_transmitted(task);
2138  		return;
2139  	}
2140  
2141  	if (xprt_connected(xprt)) {
2142  		task->tk_action = call_transmit;
2143  		return;
2144  	}
2145  
2146  	task->tk_action = call_connect_status;
2147  	if (task->tk_status < 0)
2148  		return;
2149  	if (task->tk_flags & RPC_TASK_NOCONNECT) {
2150  		rpc_call_rpcerror(task, -ENOTCONN);
2151  		return;
2152  	}
2153  	if (!xprt_prepare_transmit(task))
2154  		return;
2155  	xprt_connect(task);
2156  }
2157  
2158  /*
2159   * 4c.	Sort out connect result
2160   */
2161  static void
call_connect_status(struct rpc_task * task)2162  call_connect_status(struct rpc_task *task)
2163  {
2164  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2165  	struct rpc_clnt *clnt = task->tk_client;
2166  	int status = task->tk_status;
2167  
2168  	if (rpc_task_transmitted(task)) {
2169  		rpc_task_handle_transmitted(task);
2170  		return;
2171  	}
2172  
2173  	trace_rpc_connect_status(task);
2174  
2175  	if (task->tk_status == 0) {
2176  		clnt->cl_stats->netreconn++;
2177  		goto out_next;
2178  	}
2179  	if (xprt_connected(xprt)) {
2180  		task->tk_status = 0;
2181  		goto out_next;
2182  	}
2183  
2184  	task->tk_status = 0;
2185  	switch (status) {
2186  	case -ECONNREFUSED:
2187  	case -ECONNRESET:
2188  		/* A positive refusal suggests a rebind is needed. */
2189  		if (RPC_IS_SOFTCONN(task))
2190  			break;
2191  		if (clnt->cl_autobind) {
2192  			rpc_force_rebind(clnt);
2193  			goto out_retry;
2194  		}
2195  		fallthrough;
2196  	case -ECONNABORTED:
2197  	case -ENETDOWN:
2198  	case -ENETUNREACH:
2199  	case -EHOSTUNREACH:
2200  	case -EPIPE:
2201  	case -EPROTO:
2202  		xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2203  					    task->tk_rqstp->rq_connect_cookie);
2204  		if (RPC_IS_SOFTCONN(task))
2205  			break;
2206  		/* retry with existing socket, after a delay */
2207  		rpc_delay(task, 3*HZ);
2208  		fallthrough;
2209  	case -EADDRINUSE:
2210  	case -ENOTCONN:
2211  	case -EAGAIN:
2212  	case -ETIMEDOUT:
2213  		if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2214  		    (task->tk_flags & RPC_TASK_MOVEABLE) &&
2215  		    test_bit(XPRT_REMOVE, &xprt->state)) {
2216  			struct rpc_xprt *saved = task->tk_xprt;
2217  			struct rpc_xprt_switch *xps;
2218  
2219  			xps = rpc_clnt_xprt_switch_get(clnt);
2220  			if (xps->xps_nxprts > 1) {
2221  				long value;
2222  
2223  				xprt_release(task);
2224  				value = atomic_long_dec_return(&xprt->queuelen);
2225  				if (value == 0)
2226  					rpc_xprt_switch_remove_xprt(xps, saved,
2227  								    true);
2228  				xprt_put(saved);
2229  				task->tk_xprt = NULL;
2230  				task->tk_action = call_start;
2231  			}
2232  			xprt_switch_put(xps);
2233  			if (!task->tk_xprt)
2234  				goto out;
2235  		}
2236  		goto out_retry;
2237  	case -ENOBUFS:
2238  		rpc_delay(task, HZ >> 2);
2239  		goto out_retry;
2240  	}
2241  	rpc_call_rpcerror(task, status);
2242  	return;
2243  out_next:
2244  	task->tk_action = call_transmit;
2245  	return;
2246  out_retry:
2247  	/* Check for timeouts before looping back to call_bind */
2248  	task->tk_action = call_bind;
2249  out:
2250  	rpc_check_timeout(task);
2251  }
2252  
2253  /*
2254   * 5.	Transmit the RPC request, and wait for reply
2255   */
2256  static void
call_transmit(struct rpc_task * task)2257  call_transmit(struct rpc_task *task)
2258  {
2259  	if (rpc_task_transmitted(task)) {
2260  		rpc_task_handle_transmitted(task);
2261  		return;
2262  	}
2263  
2264  	task->tk_action = call_transmit_status;
2265  	if (!xprt_prepare_transmit(task))
2266  		return;
2267  	task->tk_status = 0;
2268  	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2269  		if (!xprt_connected(task->tk_xprt)) {
2270  			task->tk_status = -ENOTCONN;
2271  			return;
2272  		}
2273  		xprt_transmit(task);
2274  	}
2275  	xprt_end_transmit(task);
2276  }
2277  
2278  /*
2279   * 5a.	Handle cleanup after a transmission
2280   */
2281  static void
call_transmit_status(struct rpc_task * task)2282  call_transmit_status(struct rpc_task *task)
2283  {
2284  	task->tk_action = call_status;
2285  
2286  	/*
2287  	 * Common case: success.  Force the compiler to put this
2288  	 * test first.
2289  	 */
2290  	if (rpc_task_transmitted(task)) {
2291  		task->tk_status = 0;
2292  		xprt_request_wait_receive(task);
2293  		return;
2294  	}
2295  
2296  	switch (task->tk_status) {
2297  	default:
2298  		break;
2299  	case -EBADMSG:
2300  		task->tk_status = 0;
2301  		task->tk_action = call_encode;
2302  		break;
2303  		/*
2304  		 * Special cases: if we've been waiting on the
2305  		 * socket's write_space() callback, or if the
2306  		 * socket just returned a connection error,
2307  		 * then hold onto the transport lock.
2308  		 */
2309  	case -ENOMEM:
2310  	case -ENOBUFS:
2311  		rpc_delay(task, HZ>>2);
2312  		fallthrough;
2313  	case -EBADSLT:
2314  	case -EAGAIN:
2315  		task->tk_action = call_transmit;
2316  		task->tk_status = 0;
2317  		break;
2318  	case -EHOSTDOWN:
2319  	case -ENETDOWN:
2320  	case -EHOSTUNREACH:
2321  	case -ENETUNREACH:
2322  	case -EPERM:
2323  		break;
2324  	case -ECONNREFUSED:
2325  		if (RPC_IS_SOFTCONN(task)) {
2326  			if (!task->tk_msg.rpc_proc->p_proc)
2327  				trace_xprt_ping(task->tk_xprt,
2328  						task->tk_status);
2329  			rpc_call_rpcerror(task, task->tk_status);
2330  			return;
2331  		}
2332  		fallthrough;
2333  	case -ECONNRESET:
2334  	case -ECONNABORTED:
2335  	case -EADDRINUSE:
2336  	case -ENOTCONN:
2337  	case -EPIPE:
2338  		task->tk_action = call_bind;
2339  		task->tk_status = 0;
2340  		break;
2341  	}
2342  	rpc_check_timeout(task);
2343  }
2344  
2345  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2346  static void call_bc_transmit(struct rpc_task *task);
2347  static void call_bc_transmit_status(struct rpc_task *task);
2348  
2349  static void
call_bc_encode(struct rpc_task * task)2350  call_bc_encode(struct rpc_task *task)
2351  {
2352  	xprt_request_enqueue_transmit(task);
2353  	task->tk_action = call_bc_transmit;
2354  }
2355  
2356  /*
2357   * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
2358   * addition, disconnect on connectivity errors.
2359   */
2360  static void
call_bc_transmit(struct rpc_task * task)2361  call_bc_transmit(struct rpc_task *task)
2362  {
2363  	task->tk_action = call_bc_transmit_status;
2364  	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2365  		if (!xprt_prepare_transmit(task))
2366  			return;
2367  		task->tk_status = 0;
2368  		xprt_transmit(task);
2369  	}
2370  	xprt_end_transmit(task);
2371  }
2372  
2373  static void
call_bc_transmit_status(struct rpc_task * task)2374  call_bc_transmit_status(struct rpc_task *task)
2375  {
2376  	struct rpc_rqst *req = task->tk_rqstp;
2377  
2378  	if (rpc_task_transmitted(task))
2379  		task->tk_status = 0;
2380  
2381  	switch (task->tk_status) {
2382  	case 0:
2383  		/* Success */
2384  	case -ENETDOWN:
2385  	case -EHOSTDOWN:
2386  	case -EHOSTUNREACH:
2387  	case -ENETUNREACH:
2388  	case -ECONNRESET:
2389  	case -ECONNREFUSED:
2390  	case -EADDRINUSE:
2391  	case -ENOTCONN:
2392  	case -EPIPE:
2393  		break;
2394  	case -ENOMEM:
2395  	case -ENOBUFS:
2396  		rpc_delay(task, HZ>>2);
2397  		fallthrough;
2398  	case -EBADSLT:
2399  	case -EAGAIN:
2400  		task->tk_status = 0;
2401  		task->tk_action = call_bc_transmit;
2402  		return;
2403  	case -ETIMEDOUT:
2404  		/*
2405  		 * Problem reaching the server.  Disconnect and let the
2406  		 * forechannel reestablish the connection.  The server will
2407  		 * have to retransmit the backchannel request and we'll
2408  		 * reprocess it.  Since these ops are idempotent, there's no
2409  		 * need to cache our reply at this time.
2410  		 */
2411  		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2412  			"error: %d\n", task->tk_status);
2413  		xprt_conditional_disconnect(req->rq_xprt,
2414  			req->rq_connect_cookie);
2415  		break;
2416  	default:
2417  		/*
2418  		 * We were unable to reply and will have to drop the
2419  		 * request.  The server should reconnect and retransmit.
2420  		 */
2421  		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2422  			"error: %d\n", task->tk_status);
2423  		break;
2424  	}
2425  	task->tk_action = rpc_exit_task;
2426  }
2427  #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2428  
2429  /*
2430   * 6.	Sort out the RPC call status
2431   */
2432  static void
call_status(struct rpc_task * task)2433  call_status(struct rpc_task *task)
2434  {
2435  	struct rpc_clnt	*clnt = task->tk_client;
2436  	int		status;
2437  
2438  	if (!task->tk_msg.rpc_proc->p_proc)
2439  		trace_xprt_ping(task->tk_xprt, task->tk_status);
2440  
2441  	status = task->tk_status;
2442  	if (status >= 0) {
2443  		task->tk_action = call_decode;
2444  		return;
2445  	}
2446  
2447  	trace_rpc_call_status(task);
2448  	task->tk_status = 0;
2449  	switch(status) {
2450  	case -EHOSTDOWN:
2451  	case -ENETDOWN:
2452  	case -EHOSTUNREACH:
2453  	case -ENETUNREACH:
2454  	case -EPERM:
2455  		if (RPC_IS_SOFTCONN(task))
2456  			goto out_exit;
2457  		/*
2458  		 * Delay any retries for 3 seconds, then handle as if it
2459  		 * were a timeout.
2460  		 */
2461  		rpc_delay(task, 3*HZ);
2462  		fallthrough;
2463  	case -ETIMEDOUT:
2464  		break;
2465  	case -ECONNREFUSED:
2466  	case -ECONNRESET:
2467  	case -ECONNABORTED:
2468  	case -ENOTCONN:
2469  		rpc_force_rebind(clnt);
2470  		break;
2471  	case -EADDRINUSE:
2472  		rpc_delay(task, 3*HZ);
2473  		fallthrough;
2474  	case -EPIPE:
2475  	case -EAGAIN:
2476  		break;
2477  	case -ENFILE:
2478  	case -ENOBUFS:
2479  	case -ENOMEM:
2480  		rpc_delay(task, HZ>>2);
2481  		break;
2482  	case -EIO:
2483  		/* shutdown or soft timeout */
2484  		goto out_exit;
2485  	default:
2486  		if (clnt->cl_chatty)
2487  			printk("%s: RPC call returned error %d\n",
2488  			       clnt->cl_program->name, -status);
2489  		goto out_exit;
2490  	}
2491  	task->tk_action = call_encode;
2492  	rpc_check_timeout(task);
2493  	return;
2494  out_exit:
2495  	rpc_call_rpcerror(task, status);
2496  }
2497  
2498  static bool
rpc_check_connected(const struct rpc_rqst * req)2499  rpc_check_connected(const struct rpc_rqst *req)
2500  {
2501  	/* No allocated request or transport? return true */
2502  	if (!req || !req->rq_xprt)
2503  		return true;
2504  	return xprt_connected(req->rq_xprt);
2505  }
2506  
2507  static void
rpc_check_timeout(struct rpc_task * task)2508  rpc_check_timeout(struct rpc_task *task)
2509  {
2510  	struct rpc_clnt	*clnt = task->tk_client;
2511  
2512  	if (RPC_SIGNALLED(task))
2513  		return;
2514  
2515  	if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2516  		return;
2517  
2518  	trace_rpc_timeout_status(task);
2519  	task->tk_timeouts++;
2520  
2521  	if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2522  		rpc_call_rpcerror(task, -ETIMEDOUT);
2523  		return;
2524  	}
2525  
2526  	if (RPC_IS_SOFT(task)) {
2527  		/*
2528  		 * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2529  		 * been sent, it should time out only if the transport
2530  		 * connection gets terminally broken.
2531  		 */
2532  		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2533  		    rpc_check_connected(task->tk_rqstp))
2534  			return;
2535  
2536  		if (clnt->cl_chatty) {
2537  			pr_notice_ratelimited(
2538  				"%s: server %s not responding, timed out\n",
2539  				clnt->cl_program->name,
2540  				task->tk_xprt->servername);
2541  		}
2542  		if (task->tk_flags & RPC_TASK_TIMEOUT)
2543  			rpc_call_rpcerror(task, -ETIMEDOUT);
2544  		else
2545  			__rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2546  		return;
2547  	}
2548  
2549  	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2550  		task->tk_flags |= RPC_CALL_MAJORSEEN;
2551  		if (clnt->cl_chatty) {
2552  			pr_notice_ratelimited(
2553  				"%s: server %s not responding, still trying\n",
2554  				clnt->cl_program->name,
2555  				task->tk_xprt->servername);
2556  		}
2557  	}
2558  	rpc_force_rebind(clnt);
2559  	/*
2560  	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
2561  	 * event? RFC2203 requires the server to drop all such requests.
2562  	 */
2563  	rpcauth_invalcred(task);
2564  }
2565  
2566  /*
2567   * 7.	Decode the RPC reply
2568   */
2569  static void
call_decode(struct rpc_task * task)2570  call_decode(struct rpc_task *task)
2571  {
2572  	struct rpc_clnt	*clnt = task->tk_client;
2573  	struct rpc_rqst	*req = task->tk_rqstp;
2574  	struct xdr_stream xdr;
2575  	int err;
2576  
2577  	if (!task->tk_msg.rpc_proc->p_decode) {
2578  		task->tk_action = rpc_exit_task;
2579  		return;
2580  	}
2581  
2582  	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2583  		if (clnt->cl_chatty) {
2584  			pr_notice_ratelimited("%s: server %s OK\n",
2585  				clnt->cl_program->name,
2586  				task->tk_xprt->servername);
2587  		}
2588  		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2589  	}
2590  
2591  	/*
2592  	 * Did we ever call xprt_complete_rqst()? If not, we should assume
2593  	 * the message is incomplete.
2594  	 */
2595  	err = -EAGAIN;
2596  	if (!req->rq_reply_bytes_recvd)
2597  		goto out;
2598  
2599  	/* Ensure that we see all writes made by xprt_complete_rqst()
2600  	 * before it changed req->rq_reply_bytes_recvd.
2601  	 */
2602  	smp_rmb();
2603  
2604  	req->rq_rcv_buf.len = req->rq_private_buf.len;
2605  	trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2606  
2607  	/* Check that the softirq receive buffer is valid */
2608  	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2609  				sizeof(req->rq_rcv_buf)) != 0);
2610  
2611  	xdr_init_decode(&xdr, &req->rq_rcv_buf,
2612  			req->rq_rcv_buf.head[0].iov_base, req);
2613  	err = rpc_decode_header(task, &xdr);
2614  out:
2615  	switch (err) {
2616  	case 0:
2617  		task->tk_action = rpc_exit_task;
2618  		task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2619  		xdr_finish_decode(&xdr);
2620  		return;
2621  	case -EAGAIN:
2622  		task->tk_status = 0;
2623  		if (task->tk_client->cl_discrtry)
2624  			xprt_conditional_disconnect(req->rq_xprt,
2625  						    req->rq_connect_cookie);
2626  		task->tk_action = call_encode;
2627  		rpc_check_timeout(task);
2628  		break;
2629  	case -EKEYREJECTED:
2630  		task->tk_action = call_reserve;
2631  		rpc_check_timeout(task);
2632  		rpcauth_invalcred(task);
2633  		/* Ensure we obtain a new XID if we retry! */
2634  		xprt_release(task);
2635  	}
2636  }
2637  
2638  static int
rpc_encode_header(struct rpc_task * task,struct xdr_stream * xdr)2639  rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2640  {
2641  	struct rpc_clnt *clnt = task->tk_client;
2642  	struct rpc_rqst	*req = task->tk_rqstp;
2643  	__be32 *p;
2644  	int error;
2645  
2646  	error = -EMSGSIZE;
2647  	p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2648  	if (!p)
2649  		goto out_fail;
2650  	*p++ = req->rq_xid;
2651  	*p++ = rpc_call;
2652  	*p++ = cpu_to_be32(RPC_VERSION);
2653  	*p++ = cpu_to_be32(clnt->cl_prog);
2654  	*p++ = cpu_to_be32(clnt->cl_vers);
2655  	*p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2656  
2657  	error = rpcauth_marshcred(task, xdr);
2658  	if (error < 0)
2659  		goto out_fail;
2660  	return 0;
2661  out_fail:
2662  	trace_rpc_bad_callhdr(task);
2663  	rpc_call_rpcerror(task, error);
2664  	return error;
2665  }
2666  
2667  static noinline int
rpc_decode_header(struct rpc_task * task,struct xdr_stream * xdr)2668  rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2669  {
2670  	struct rpc_clnt *clnt = task->tk_client;
2671  	int error;
2672  	__be32 *p;
2673  
2674  	/* RFC-1014 says that the representation of XDR data must be a
2675  	 * multiple of four bytes
2676  	 * - if it isn't pointer subtraction in the NFS client may give
2677  	 *   undefined results
2678  	 */
2679  	if (task->tk_rqstp->rq_rcv_buf.len & 3)
2680  		goto out_unparsable;
2681  
2682  	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2683  	if (!p)
2684  		goto out_unparsable;
2685  	p++;	/* skip XID */
2686  	if (*p++ != rpc_reply)
2687  		goto out_unparsable;
2688  	if (*p++ != rpc_msg_accepted)
2689  		goto out_msg_denied;
2690  
2691  	error = rpcauth_checkverf(task, xdr);
2692  	if (error) {
2693  		struct rpc_cred *cred = task->tk_rqstp->rq_cred;
2694  
2695  		if (!test_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags)) {
2696  			rpcauth_invalcred(task);
2697  			if (!task->tk_cred_retry)
2698  				goto out_err;
2699  			task->tk_cred_retry--;
2700  			trace_rpc__stale_creds(task);
2701  			return -EKEYREJECTED;
2702  		}
2703  		goto out_verifier;
2704  	}
2705  
2706  	p = xdr_inline_decode(xdr, sizeof(*p));
2707  	if (!p)
2708  		goto out_unparsable;
2709  	switch (*p) {
2710  	case rpc_success:
2711  		return 0;
2712  	case rpc_prog_unavail:
2713  		trace_rpc__prog_unavail(task);
2714  		error = -EPFNOSUPPORT;
2715  		goto out_err;
2716  	case rpc_prog_mismatch:
2717  		trace_rpc__prog_mismatch(task);
2718  		error = -EPROTONOSUPPORT;
2719  		goto out_err;
2720  	case rpc_proc_unavail:
2721  		trace_rpc__proc_unavail(task);
2722  		error = -EOPNOTSUPP;
2723  		goto out_err;
2724  	case rpc_garbage_args:
2725  	case rpc_system_err:
2726  		trace_rpc__garbage_args(task);
2727  		error = -EIO;
2728  		break;
2729  	default:
2730  		goto out_unparsable;
2731  	}
2732  
2733  out_garbage:
2734  	clnt->cl_stats->rpcgarbage++;
2735  	if (task->tk_garb_retry) {
2736  		task->tk_garb_retry--;
2737  		task->tk_action = call_encode;
2738  		return -EAGAIN;
2739  	}
2740  out_err:
2741  	rpc_call_rpcerror(task, error);
2742  	return error;
2743  
2744  out_unparsable:
2745  	trace_rpc__unparsable(task);
2746  	error = -EIO;
2747  	goto out_garbage;
2748  
2749  out_verifier:
2750  	trace_rpc_bad_verifier(task);
2751  	switch (error) {
2752  	case -EPROTONOSUPPORT:
2753  		goto out_err;
2754  	case -EACCES:
2755  		/* Re-encode with a fresh cred */
2756  		fallthrough;
2757  	default:
2758  		goto out_garbage;
2759  	}
2760  
2761  out_msg_denied:
2762  	error = -EACCES;
2763  	p = xdr_inline_decode(xdr, sizeof(*p));
2764  	if (!p)
2765  		goto out_unparsable;
2766  	switch (*p++) {
2767  	case rpc_auth_error:
2768  		break;
2769  	case rpc_mismatch:
2770  		trace_rpc__mismatch(task);
2771  		error = -EPROTONOSUPPORT;
2772  		goto out_err;
2773  	default:
2774  		goto out_unparsable;
2775  	}
2776  
2777  	p = xdr_inline_decode(xdr, sizeof(*p));
2778  	if (!p)
2779  		goto out_unparsable;
2780  	switch (*p++) {
2781  	case rpc_autherr_rejectedcred:
2782  	case rpc_autherr_rejectedverf:
2783  	case rpcsec_gsserr_credproblem:
2784  	case rpcsec_gsserr_ctxproblem:
2785  		rpcauth_invalcred(task);
2786  		if (!task->tk_cred_retry)
2787  			break;
2788  		task->tk_cred_retry--;
2789  		trace_rpc__stale_creds(task);
2790  		return -EKEYREJECTED;
2791  	case rpc_autherr_badcred:
2792  	case rpc_autherr_badverf:
2793  		/* possibly garbled cred/verf? */
2794  		if (!task->tk_garb_retry)
2795  			break;
2796  		task->tk_garb_retry--;
2797  		trace_rpc__bad_creds(task);
2798  		task->tk_action = call_encode;
2799  		return -EAGAIN;
2800  	case rpc_autherr_tooweak:
2801  		trace_rpc__auth_tooweak(task);
2802  		pr_warn("RPC: server %s requires stronger authentication.\n",
2803  			task->tk_xprt->servername);
2804  		break;
2805  	default:
2806  		goto out_unparsable;
2807  	}
2808  	goto out_err;
2809  }
2810  
rpcproc_encode_null(struct rpc_rqst * rqstp,struct xdr_stream * xdr,const void * obj)2811  static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2812  		const void *obj)
2813  {
2814  }
2815  
rpcproc_decode_null(struct rpc_rqst * rqstp,struct xdr_stream * xdr,void * obj)2816  static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2817  		void *obj)
2818  {
2819  	return 0;
2820  }
2821  
2822  static const struct rpc_procinfo rpcproc_null = {
2823  	.p_encode = rpcproc_encode_null,
2824  	.p_decode = rpcproc_decode_null,
2825  };
2826  
2827  static const struct rpc_procinfo rpcproc_null_noreply = {
2828  	.p_encode = rpcproc_encode_null,
2829  };
2830  
2831  static void
rpc_null_call_prepare(struct rpc_task * task,void * data)2832  rpc_null_call_prepare(struct rpc_task *task, void *data)
2833  {
2834  	task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2835  	rpc_call_start(task);
2836  }
2837  
2838  static const struct rpc_call_ops rpc_null_ops = {
2839  	.rpc_call_prepare = rpc_null_call_prepare,
2840  	.rpc_call_done = rpc_default_callback,
2841  };
2842  
2843  static
rpc_call_null_helper(struct rpc_clnt * clnt,struct rpc_xprt * xprt,struct rpc_cred * cred,int flags,const struct rpc_call_ops * ops,void * data)2844  struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2845  		struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2846  		const struct rpc_call_ops *ops, void *data)
2847  {
2848  	struct rpc_message msg = {
2849  		.rpc_proc = &rpcproc_null,
2850  	};
2851  	struct rpc_task_setup task_setup_data = {
2852  		.rpc_client = clnt,
2853  		.rpc_xprt = xprt,
2854  		.rpc_message = &msg,
2855  		.rpc_op_cred = cred,
2856  		.callback_ops = ops ?: &rpc_null_ops,
2857  		.callback_data = data,
2858  		.flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2859  			 RPC_TASK_NULLCREDS,
2860  	};
2861  
2862  	return rpc_run_task(&task_setup_data);
2863  }
2864  
rpc_call_null(struct rpc_clnt * clnt,struct rpc_cred * cred,int flags)2865  struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2866  {
2867  	return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2868  }
2869  EXPORT_SYMBOL_GPL(rpc_call_null);
2870  
rpc_ping(struct rpc_clnt * clnt)2871  static int rpc_ping(struct rpc_clnt *clnt)
2872  {
2873  	struct rpc_task	*task;
2874  	int status;
2875  
2876  	if (clnt->cl_auth->au_ops->ping)
2877  		return clnt->cl_auth->au_ops->ping(clnt);
2878  
2879  	task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2880  	if (IS_ERR(task))
2881  		return PTR_ERR(task);
2882  	status = task->tk_status;
2883  	rpc_put_task(task);
2884  	return status;
2885  }
2886  
rpc_ping_noreply(struct rpc_clnt * clnt)2887  static int rpc_ping_noreply(struct rpc_clnt *clnt)
2888  {
2889  	struct rpc_message msg = {
2890  		.rpc_proc = &rpcproc_null_noreply,
2891  	};
2892  	struct rpc_task_setup task_setup_data = {
2893  		.rpc_client = clnt,
2894  		.rpc_message = &msg,
2895  		.callback_ops = &rpc_null_ops,
2896  		.flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2897  	};
2898  	struct rpc_task	*task;
2899  	int status;
2900  
2901  	task = rpc_run_task(&task_setup_data);
2902  	if (IS_ERR(task))
2903  		return PTR_ERR(task);
2904  	status = task->tk_status;
2905  	rpc_put_task(task);
2906  	return status;
2907  }
2908  
2909  struct rpc_cb_add_xprt_calldata {
2910  	struct rpc_xprt_switch *xps;
2911  	struct rpc_xprt *xprt;
2912  };
2913  
rpc_cb_add_xprt_done(struct rpc_task * task,void * calldata)2914  static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2915  {
2916  	struct rpc_cb_add_xprt_calldata *data = calldata;
2917  
2918  	if (task->tk_status == 0)
2919  		rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2920  }
2921  
rpc_cb_add_xprt_release(void * calldata)2922  static void rpc_cb_add_xprt_release(void *calldata)
2923  {
2924  	struct rpc_cb_add_xprt_calldata *data = calldata;
2925  
2926  	xprt_put(data->xprt);
2927  	xprt_switch_put(data->xps);
2928  	kfree(data);
2929  }
2930  
2931  static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2932  	.rpc_call_prepare = rpc_null_call_prepare,
2933  	.rpc_call_done = rpc_cb_add_xprt_done,
2934  	.rpc_release = rpc_cb_add_xprt_release,
2935  };
2936  
2937  /**
2938   * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2939   * @clnt: pointer to struct rpc_clnt
2940   * @xps: pointer to struct rpc_xprt_switch,
2941   * @xprt: pointer struct rpc_xprt
2942   * @in_max_connect: pointer to the max_connect value for the passed in xprt transport
2943   */
rpc_clnt_test_and_add_xprt(struct rpc_clnt * clnt,struct rpc_xprt_switch * xps,struct rpc_xprt * xprt,void * in_max_connect)2944  int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2945  		struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2946  		void *in_max_connect)
2947  {
2948  	struct rpc_cb_add_xprt_calldata *data;
2949  	struct rpc_task *task;
2950  	int max_connect = clnt->cl_max_connect;
2951  
2952  	if (in_max_connect)
2953  		max_connect = *(int *)in_max_connect;
2954  	if (xps->xps_nunique_destaddr_xprts + 1 > max_connect) {
2955  		rcu_read_lock();
2956  		pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2957  			"transport to server: %s\n", max_connect,
2958  			rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2959  		rcu_read_unlock();
2960  		return -EINVAL;
2961  	}
2962  
2963  	data = kmalloc(sizeof(*data), GFP_KERNEL);
2964  	if (!data)
2965  		return -ENOMEM;
2966  	data->xps = xprt_switch_get(xps);
2967  	data->xprt = xprt_get(xprt);
2968  	if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2969  		rpc_cb_add_xprt_release(data);
2970  		goto success;
2971  	}
2972  
2973  	task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2974  			&rpc_cb_add_xprt_call_ops, data);
2975  	if (IS_ERR(task))
2976  		return PTR_ERR(task);
2977  
2978  	data->xps->xps_nunique_destaddr_xprts++;
2979  	rpc_put_task(task);
2980  success:
2981  	return 1;
2982  }
2983  EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2984  
rpc_clnt_add_xprt_helper(struct rpc_clnt * clnt,struct rpc_xprt * xprt,struct rpc_add_xprt_test * data)2985  static int rpc_clnt_add_xprt_helper(struct rpc_clnt *clnt,
2986  				    struct rpc_xprt *xprt,
2987  				    struct rpc_add_xprt_test *data)
2988  {
2989  	struct rpc_task *task;
2990  	int status = -EADDRINUSE;
2991  
2992  	/* Test the connection */
2993  	task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2994  	if (IS_ERR(task))
2995  		return PTR_ERR(task);
2996  
2997  	status = task->tk_status;
2998  	rpc_put_task(task);
2999  
3000  	if (status < 0)
3001  		return status;
3002  
3003  	/* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
3004  	data->add_xprt_test(clnt, xprt, data->data);
3005  
3006  	return 0;
3007  }
3008  
3009  /**
3010   * rpc_clnt_setup_test_and_add_xprt()
3011   *
3012   * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
3013   *   1) caller of the test function must dereference the rpc_xprt_switch
3014   *   and the rpc_xprt.
3015   *   2) test function must call rpc_xprt_switch_add_xprt, usually in
3016   *   the rpc_call_done routine.
3017   *
3018   * Upon success (return of 1), the test function adds the new
3019   * transport to the rpc_clnt xprt switch
3020   *
3021   * @clnt: struct rpc_clnt to get the new transport
3022   * @xps:  the rpc_xprt_switch to hold the new transport
3023   * @xprt: the rpc_xprt to test
3024   * @data: a struct rpc_add_xprt_test pointer that holds the test function
3025   *        and test function call data
3026   */
rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt * clnt,struct rpc_xprt_switch * xps,struct rpc_xprt * xprt,void * data)3027  int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
3028  				     struct rpc_xprt_switch *xps,
3029  				     struct rpc_xprt *xprt,
3030  				     void *data)
3031  {
3032  	int status = -EADDRINUSE;
3033  
3034  	xprt = xprt_get(xprt);
3035  	xprt_switch_get(xps);
3036  
3037  	if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
3038  		goto out_err;
3039  
3040  	status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3041  	if (status < 0)
3042  		goto out_err;
3043  
3044  	status = 1;
3045  out_err:
3046  	xprt_put(xprt);
3047  	xprt_switch_put(xps);
3048  	if (status < 0)
3049  		pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not "
3050  			"added\n", status,
3051  			xprt->address_strings[RPC_DISPLAY_ADDR]);
3052  	/* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
3053  	return status;
3054  }
3055  EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
3056  
3057  /**
3058   * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
3059   * @clnt: pointer to struct rpc_clnt
3060   * @xprtargs: pointer to struct xprt_create
3061   * @setup: callback to test and/or set up the connection
3062   * @data: pointer to setup function data
3063   *
3064   * Creates a new transport using the parameters set in args and
3065   * adds it to clnt.
3066   * If ping is set, then test that connectivity succeeds before
3067   * adding the new transport.
3068   *
3069   */
rpc_clnt_add_xprt(struct rpc_clnt * clnt,struct xprt_create * xprtargs,int (* setup)(struct rpc_clnt *,struct rpc_xprt_switch *,struct rpc_xprt *,void *),void * data)3070  int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
3071  		struct xprt_create *xprtargs,
3072  		int (*setup)(struct rpc_clnt *,
3073  			struct rpc_xprt_switch *,
3074  			struct rpc_xprt *,
3075  			void *),
3076  		void *data)
3077  {
3078  	struct rpc_xprt_switch *xps;
3079  	struct rpc_xprt *xprt;
3080  	unsigned long connect_timeout;
3081  	unsigned long reconnect_timeout;
3082  	unsigned char resvport, reuseport;
3083  	int ret = 0, ident;
3084  
3085  	rcu_read_lock();
3086  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3087  	xprt = xprt_iter_xprt(&clnt->cl_xpi);
3088  	if (xps == NULL || xprt == NULL) {
3089  		rcu_read_unlock();
3090  		xprt_switch_put(xps);
3091  		return -EAGAIN;
3092  	}
3093  	resvport = xprt->resvport;
3094  	reuseport = xprt->reuseport;
3095  	connect_timeout = xprt->connect_timeout;
3096  	reconnect_timeout = xprt->max_reconnect_timeout;
3097  	ident = xprt->xprt_class->ident;
3098  	rcu_read_unlock();
3099  
3100  	if (!xprtargs->ident)
3101  		xprtargs->ident = ident;
3102  	xprtargs->xprtsec = clnt->cl_xprtsec;
3103  	xprt = xprt_create_transport(xprtargs);
3104  	if (IS_ERR(xprt)) {
3105  		ret = PTR_ERR(xprt);
3106  		goto out_put_switch;
3107  	}
3108  	xprt->resvport = resvport;
3109  	xprt->reuseport = reuseport;
3110  
3111  	if (xprtargs->connect_timeout)
3112  		connect_timeout = xprtargs->connect_timeout;
3113  	if (xprtargs->reconnect_timeout)
3114  		reconnect_timeout = xprtargs->reconnect_timeout;
3115  	if (xprt->ops->set_connect_timeout != NULL)
3116  		xprt->ops->set_connect_timeout(xprt,
3117  				connect_timeout,
3118  				reconnect_timeout);
3119  
3120  	rpc_xprt_switch_set_roundrobin(xps);
3121  	if (setup) {
3122  		ret = setup(clnt, xps, xprt, data);
3123  		if (ret != 0)
3124  			goto out_put_xprt;
3125  	}
3126  	rpc_xprt_switch_add_xprt(xps, xprt);
3127  out_put_xprt:
3128  	xprt_put(xprt);
3129  out_put_switch:
3130  	xprt_switch_put(xps);
3131  	return ret;
3132  }
3133  EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3134  
rpc_xprt_probe_trunked(struct rpc_clnt * clnt,struct rpc_xprt * xprt,struct rpc_add_xprt_test * data)3135  static int rpc_xprt_probe_trunked(struct rpc_clnt *clnt,
3136  				  struct rpc_xprt *xprt,
3137  				  struct rpc_add_xprt_test *data)
3138  {
3139  	struct rpc_xprt *main_xprt;
3140  	int status = 0;
3141  
3142  	xprt_get(xprt);
3143  
3144  	rcu_read_lock();
3145  	main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3146  	status = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3147  				   (struct sockaddr *)&main_xprt->addr);
3148  	rcu_read_unlock();
3149  	xprt_put(main_xprt);
3150  	if (status || !test_bit(XPRT_OFFLINE, &xprt->state))
3151  		goto out;
3152  
3153  	status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3154  out:
3155  	xprt_put(xprt);
3156  	return status;
3157  }
3158  
3159  /* rpc_clnt_probe_trunked_xprt -- probe offlined transport for session trunking
3160   * @clnt rpc_clnt structure
3161   *
3162   * For each offlined transport found in the rpc_clnt structure call
3163   * the function rpc_xprt_probe_trunked() which will determine if this
3164   * transport still belongs to the trunking group.
3165   */
rpc_clnt_probe_trunked_xprts(struct rpc_clnt * clnt,struct rpc_add_xprt_test * data)3166  void rpc_clnt_probe_trunked_xprts(struct rpc_clnt *clnt,
3167  				  struct rpc_add_xprt_test *data)
3168  {
3169  	struct rpc_xprt_iter xpi;
3170  	int ret;
3171  
3172  	ret = rpc_clnt_xprt_iter_offline_init(clnt, &xpi);
3173  	if (ret)
3174  		return;
3175  	for (;;) {
3176  		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
3177  
3178  		if (!xprt)
3179  			break;
3180  		ret = rpc_xprt_probe_trunked(clnt, xprt, data);
3181  		xprt_put(xprt);
3182  		if (ret < 0)
3183  			break;
3184  		xprt_iter_rewind(&xpi);
3185  	}
3186  	xprt_iter_destroy(&xpi);
3187  }
3188  EXPORT_SYMBOL_GPL(rpc_clnt_probe_trunked_xprts);
3189  
rpc_xprt_offline(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * data)3190  static int rpc_xprt_offline(struct rpc_clnt *clnt,
3191  			    struct rpc_xprt *xprt,
3192  			    void *data)
3193  {
3194  	struct rpc_xprt *main_xprt;
3195  	struct rpc_xprt_switch *xps;
3196  	int err = 0;
3197  
3198  	xprt_get(xprt);
3199  
3200  	rcu_read_lock();
3201  	main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3202  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3203  	err = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3204  				(struct sockaddr *)&main_xprt->addr);
3205  	rcu_read_unlock();
3206  	xprt_put(main_xprt);
3207  	if (err)
3208  		goto out;
3209  
3210  	if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) {
3211  		err = -EINTR;
3212  		goto out;
3213  	}
3214  	xprt_set_offline_locked(xprt, xps);
3215  
3216  	xprt_release_write(xprt, NULL);
3217  out:
3218  	xprt_put(xprt);
3219  	xprt_switch_put(xps);
3220  	return err;
3221  }
3222  
3223  /* rpc_clnt_manage_trunked_xprts -- offline trunked transports
3224   * @clnt rpc_clnt structure
3225   *
3226   * For each active transport found in the rpc_clnt structure call
3227   * the function rpc_xprt_offline() which will identify trunked transports
3228   * and will mark them offline.
3229   */
rpc_clnt_manage_trunked_xprts(struct rpc_clnt * clnt)3230  void rpc_clnt_manage_trunked_xprts(struct rpc_clnt *clnt)
3231  {
3232  	rpc_clnt_iterate_for_each_xprt(clnt, rpc_xprt_offline, NULL);
3233  }
3234  EXPORT_SYMBOL_GPL(rpc_clnt_manage_trunked_xprts);
3235  
3236  struct connect_timeout_data {
3237  	unsigned long connect_timeout;
3238  	unsigned long reconnect_timeout;
3239  };
3240  
3241  static int
rpc_xprt_set_connect_timeout(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * data)3242  rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3243  		struct rpc_xprt *xprt,
3244  		void *data)
3245  {
3246  	struct connect_timeout_data *timeo = data;
3247  
3248  	if (xprt->ops->set_connect_timeout)
3249  		xprt->ops->set_connect_timeout(xprt,
3250  				timeo->connect_timeout,
3251  				timeo->reconnect_timeout);
3252  	return 0;
3253  }
3254  
3255  void
rpc_set_connect_timeout(struct rpc_clnt * clnt,unsigned long connect_timeout,unsigned long reconnect_timeout)3256  rpc_set_connect_timeout(struct rpc_clnt *clnt,
3257  		unsigned long connect_timeout,
3258  		unsigned long reconnect_timeout)
3259  {
3260  	struct connect_timeout_data timeout = {
3261  		.connect_timeout = connect_timeout,
3262  		.reconnect_timeout = reconnect_timeout,
3263  	};
3264  	rpc_clnt_iterate_for_each_xprt(clnt,
3265  			rpc_xprt_set_connect_timeout,
3266  			&timeout);
3267  }
3268  EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3269  
rpc_clnt_xprt_set_online(struct rpc_clnt * clnt,struct rpc_xprt * xprt)3270  void rpc_clnt_xprt_set_online(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3271  {
3272  	struct rpc_xprt_switch *xps;
3273  
3274  	xps = rpc_clnt_xprt_switch_get(clnt);
3275  	xprt_set_online_locked(xprt, xps);
3276  	xprt_switch_put(xps);
3277  }
3278  
rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)3279  void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3280  {
3281  	struct rpc_xprt_switch *xps;
3282  
3283  	if (rpc_clnt_xprt_switch_has_addr(clnt,
3284  		(const struct sockaddr *)&xprt->addr)) {
3285  		return rpc_clnt_xprt_set_online(clnt, xprt);
3286  	}
3287  
3288  	xps = rpc_clnt_xprt_switch_get(clnt);
3289  	rpc_xprt_switch_add_xprt(xps, xprt);
3290  	xprt_switch_put(xps);
3291  }
3292  EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3293  
rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)3294  void rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3295  {
3296  	struct rpc_xprt_switch *xps;
3297  
3298  	rcu_read_lock();
3299  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3300  	rpc_xprt_switch_remove_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3301  				    xprt, 0);
3302  	xps->xps_nunique_destaddr_xprts--;
3303  	rcu_read_unlock();
3304  }
3305  EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_remove_xprt);
3306  
rpc_clnt_xprt_switch_has_addr(struct rpc_clnt * clnt,const struct sockaddr * sap)3307  bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3308  				   const struct sockaddr *sap)
3309  {
3310  	struct rpc_xprt_switch *xps;
3311  	bool ret;
3312  
3313  	rcu_read_lock();
3314  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3315  	ret = rpc_xprt_switch_has_addr(xps, sap);
3316  	rcu_read_unlock();
3317  	return ret;
3318  }
3319  EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3320  
3321  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
rpc_show_header(void)3322  static void rpc_show_header(void)
3323  {
3324  	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3325  		"-timeout ---ops--\n");
3326  }
3327  
rpc_show_task(const struct rpc_clnt * clnt,const struct rpc_task * task)3328  static void rpc_show_task(const struct rpc_clnt *clnt,
3329  			  const struct rpc_task *task)
3330  {
3331  	const char *rpc_waitq = "none";
3332  
3333  	if (RPC_IS_QUEUED(task))
3334  		rpc_waitq = rpc_qname(task->tk_waitqueue);
3335  
3336  	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3337  		task->tk_pid, task->tk_flags, task->tk_status,
3338  		clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3339  		clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3340  		task->tk_action, rpc_waitq);
3341  }
3342  
rpc_show_tasks(struct net * net)3343  void rpc_show_tasks(struct net *net)
3344  {
3345  	struct rpc_clnt *clnt;
3346  	struct rpc_task *task;
3347  	int header = 0;
3348  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3349  
3350  	spin_lock(&sn->rpc_client_lock);
3351  	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3352  		spin_lock(&clnt->cl_lock);
3353  		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3354  			if (!header) {
3355  				rpc_show_header();
3356  				header++;
3357  			}
3358  			rpc_show_task(clnt, task);
3359  		}
3360  		spin_unlock(&clnt->cl_lock);
3361  	}
3362  	spin_unlock(&sn->rpc_client_lock);
3363  }
3364  #endif
3365  
3366  #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3367  static int
rpc_clnt_swap_activate_callback(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * dummy)3368  rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3369  		struct rpc_xprt *xprt,
3370  		void *dummy)
3371  {
3372  	return xprt_enable_swap(xprt);
3373  }
3374  
3375  int
rpc_clnt_swap_activate(struct rpc_clnt * clnt)3376  rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3377  {
3378  	while (clnt != clnt->cl_parent)
3379  		clnt = clnt->cl_parent;
3380  	if (atomic_inc_return(&clnt->cl_swapper) == 1)
3381  		return rpc_clnt_iterate_for_each_xprt(clnt,
3382  				rpc_clnt_swap_activate_callback, NULL);
3383  	return 0;
3384  }
3385  EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3386  
3387  static int
rpc_clnt_swap_deactivate_callback(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * dummy)3388  rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3389  		struct rpc_xprt *xprt,
3390  		void *dummy)
3391  {
3392  	xprt_disable_swap(xprt);
3393  	return 0;
3394  }
3395  
3396  void
rpc_clnt_swap_deactivate(struct rpc_clnt * clnt)3397  rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3398  {
3399  	while (clnt != clnt->cl_parent)
3400  		clnt = clnt->cl_parent;
3401  	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3402  		rpc_clnt_iterate_for_each_xprt(clnt,
3403  				rpc_clnt_swap_deactivate_callback, NULL);
3404  }
3405  EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3406  #endif /* CONFIG_SUNRPC_SWAP */
3407