1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * linux/net/sunrpc/svc.c
4   *
5   * High-level RPC service routines
6   *
7   * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
8   *
9   * Multiple threads pools and NUMAisation
10   * Copyright (c) 2006 Silicon Graphics, Inc.
11   * by Greg Banks <gnb@melbourne.sgi.com>
12   */
13  
14  #include <linux/linkage.h>
15  #include <linux/sched/signal.h>
16  #include <linux/errno.h>
17  #include <linux/net.h>
18  #include <linux/in.h>
19  #include <linux/mm.h>
20  #include <linux/interrupt.h>
21  #include <linux/module.h>
22  #include <linux/kthread.h>
23  #include <linux/slab.h>
24  
25  #include <linux/sunrpc/types.h>
26  #include <linux/sunrpc/xdr.h>
27  #include <linux/sunrpc/stats.h>
28  #include <linux/sunrpc/svcsock.h>
29  #include <linux/sunrpc/clnt.h>
30  #include <linux/sunrpc/bc_xprt.h>
31  
32  #include <trace/events/sunrpc.h>
33  
34  #include "fail.h"
35  #include "sunrpc.h"
36  
37  #define RPCDBG_FACILITY	RPCDBG_SVCDSP
38  
39  static void svc_unregister(const struct svc_serv *serv, struct net *net);
40  
41  #define SVC_POOL_DEFAULT	SVC_POOL_GLOBAL
42  
43  /*
44   * Mode for mapping cpus to pools.
45   */
46  enum {
47  	SVC_POOL_AUTO = -1,	/* choose one of the others */
48  	SVC_POOL_GLOBAL,	/* no mapping, just a single global pool
49  				 * (legacy & UP mode) */
50  	SVC_POOL_PERCPU,	/* one pool per cpu */
51  	SVC_POOL_PERNODE	/* one pool per numa node */
52  };
53  
54  /*
55   * Structure for mapping cpus to pools and vice versa.
56   * Setup once during sunrpc initialisation.
57   */
58  
59  struct svc_pool_map {
60  	int count;			/* How many svc_servs use us */
61  	int mode;			/* Note: int not enum to avoid
62  					 * warnings about "enumeration value
63  					 * not handled in switch" */
64  	unsigned int npools;
65  	unsigned int *pool_to;		/* maps pool id to cpu or node */
66  	unsigned int *to_pool;		/* maps cpu or node to pool id */
67  };
68  
69  static struct svc_pool_map svc_pool_map = {
70  	.mode = SVC_POOL_DEFAULT
71  };
72  
73  static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
74  
75  static int
__param_set_pool_mode(const char * val,struct svc_pool_map * m)76  __param_set_pool_mode(const char *val, struct svc_pool_map *m)
77  {
78  	int err, mode;
79  
80  	mutex_lock(&svc_pool_map_mutex);
81  
82  	err = 0;
83  	if (!strncmp(val, "auto", 4))
84  		mode = SVC_POOL_AUTO;
85  	else if (!strncmp(val, "global", 6))
86  		mode = SVC_POOL_GLOBAL;
87  	else if (!strncmp(val, "percpu", 6))
88  		mode = SVC_POOL_PERCPU;
89  	else if (!strncmp(val, "pernode", 7))
90  		mode = SVC_POOL_PERNODE;
91  	else
92  		err = -EINVAL;
93  
94  	if (err)
95  		goto out;
96  
97  	if (m->count == 0)
98  		m->mode = mode;
99  	else if (mode != m->mode)
100  		err = -EBUSY;
101  out:
102  	mutex_unlock(&svc_pool_map_mutex);
103  	return err;
104  }
105  
106  static int
param_set_pool_mode(const char * val,const struct kernel_param * kp)107  param_set_pool_mode(const char *val, const struct kernel_param *kp)
108  {
109  	struct svc_pool_map *m = kp->arg;
110  
111  	return __param_set_pool_mode(val, m);
112  }
113  
sunrpc_set_pool_mode(const char * val)114  int sunrpc_set_pool_mode(const char *val)
115  {
116  	return __param_set_pool_mode(val, &svc_pool_map);
117  }
118  EXPORT_SYMBOL(sunrpc_set_pool_mode);
119  
120  /**
121   * sunrpc_get_pool_mode - get the current pool_mode for the host
122   * @buf: where to write the current pool_mode
123   * @size: size of @buf
124   *
125   * Grab the current pool_mode from the svc_pool_map and write
126   * the resulting string to @buf. Returns the number of characters
127   * written to @buf (a'la snprintf()).
128   */
129  int
sunrpc_get_pool_mode(char * buf,size_t size)130  sunrpc_get_pool_mode(char *buf, size_t size)
131  {
132  	struct svc_pool_map *m = &svc_pool_map;
133  
134  	switch (m->mode)
135  	{
136  	case SVC_POOL_AUTO:
137  		return snprintf(buf, size, "auto");
138  	case SVC_POOL_GLOBAL:
139  		return snprintf(buf, size, "global");
140  	case SVC_POOL_PERCPU:
141  		return snprintf(buf, size, "percpu");
142  	case SVC_POOL_PERNODE:
143  		return snprintf(buf, size, "pernode");
144  	default:
145  		return snprintf(buf, size, "%d", m->mode);
146  	}
147  }
148  EXPORT_SYMBOL(sunrpc_get_pool_mode);
149  
150  static int
param_get_pool_mode(char * buf,const struct kernel_param * kp)151  param_get_pool_mode(char *buf, const struct kernel_param *kp)
152  {
153  	char str[16];
154  	int len;
155  
156  	len = sunrpc_get_pool_mode(str, ARRAY_SIZE(str));
157  
158  	/* Ensure we have room for newline and NUL */
159  	len = min_t(int, len, ARRAY_SIZE(str) - 2);
160  
161  	/* tack on the newline */
162  	str[len] = '\n';
163  	str[len + 1] = '\0';
164  
165  	return sysfs_emit(buf, "%s", str);
166  }
167  
168  module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
169  		  &svc_pool_map, 0644);
170  
171  /*
172   * Detect best pool mapping mode heuristically,
173   * according to the machine's topology.
174   */
175  static int
svc_pool_map_choose_mode(void)176  svc_pool_map_choose_mode(void)
177  {
178  	unsigned int node;
179  
180  	if (nr_online_nodes > 1) {
181  		/*
182  		 * Actually have multiple NUMA nodes,
183  		 * so split pools on NUMA node boundaries
184  		 */
185  		return SVC_POOL_PERNODE;
186  	}
187  
188  	node = first_online_node;
189  	if (nr_cpus_node(node) > 2) {
190  		/*
191  		 * Non-trivial SMP, or CONFIG_NUMA on
192  		 * non-NUMA hardware, e.g. with a generic
193  		 * x86_64 kernel on Xeons.  In this case we
194  		 * want to divide the pools on cpu boundaries.
195  		 */
196  		return SVC_POOL_PERCPU;
197  	}
198  
199  	/* default: one global pool */
200  	return SVC_POOL_GLOBAL;
201  }
202  
203  /*
204   * Allocate the to_pool[] and pool_to[] arrays.
205   * Returns 0 on success or an errno.
206   */
207  static int
svc_pool_map_alloc_arrays(struct svc_pool_map * m,unsigned int maxpools)208  svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
209  {
210  	m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
211  	if (!m->to_pool)
212  		goto fail;
213  	m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
214  	if (!m->pool_to)
215  		goto fail_free;
216  
217  	return 0;
218  
219  fail_free:
220  	kfree(m->to_pool);
221  	m->to_pool = NULL;
222  fail:
223  	return -ENOMEM;
224  }
225  
226  /*
227   * Initialise the pool map for SVC_POOL_PERCPU mode.
228   * Returns number of pools or <0 on error.
229   */
230  static int
svc_pool_map_init_percpu(struct svc_pool_map * m)231  svc_pool_map_init_percpu(struct svc_pool_map *m)
232  {
233  	unsigned int maxpools = nr_cpu_ids;
234  	unsigned int pidx = 0;
235  	unsigned int cpu;
236  	int err;
237  
238  	err = svc_pool_map_alloc_arrays(m, maxpools);
239  	if (err)
240  		return err;
241  
242  	for_each_online_cpu(cpu) {
243  		BUG_ON(pidx >= maxpools);
244  		m->to_pool[cpu] = pidx;
245  		m->pool_to[pidx] = cpu;
246  		pidx++;
247  	}
248  	/* cpus brought online later all get mapped to pool0, sorry */
249  
250  	return pidx;
251  };
252  
253  
254  /*
255   * Initialise the pool map for SVC_POOL_PERNODE mode.
256   * Returns number of pools or <0 on error.
257   */
258  static int
svc_pool_map_init_pernode(struct svc_pool_map * m)259  svc_pool_map_init_pernode(struct svc_pool_map *m)
260  {
261  	unsigned int maxpools = nr_node_ids;
262  	unsigned int pidx = 0;
263  	unsigned int node;
264  	int err;
265  
266  	err = svc_pool_map_alloc_arrays(m, maxpools);
267  	if (err)
268  		return err;
269  
270  	for_each_node_with_cpus(node) {
271  		/* some architectures (e.g. SN2) have cpuless nodes */
272  		BUG_ON(pidx > maxpools);
273  		m->to_pool[node] = pidx;
274  		m->pool_to[pidx] = node;
275  		pidx++;
276  	}
277  	/* nodes brought online later all get mapped to pool0, sorry */
278  
279  	return pidx;
280  }
281  
282  
283  /*
284   * Add a reference to the global map of cpus to pools (and
285   * vice versa) if pools are in use.
286   * Initialise the map if we're the first user.
287   * Returns the number of pools. If this is '1', no reference
288   * was taken.
289   */
290  static unsigned int
svc_pool_map_get(void)291  svc_pool_map_get(void)
292  {
293  	struct svc_pool_map *m = &svc_pool_map;
294  	int npools = -1;
295  
296  	mutex_lock(&svc_pool_map_mutex);
297  	if (m->count++) {
298  		mutex_unlock(&svc_pool_map_mutex);
299  		return m->npools;
300  	}
301  
302  	if (m->mode == SVC_POOL_AUTO)
303  		m->mode = svc_pool_map_choose_mode();
304  
305  	switch (m->mode) {
306  	case SVC_POOL_PERCPU:
307  		npools = svc_pool_map_init_percpu(m);
308  		break;
309  	case SVC_POOL_PERNODE:
310  		npools = svc_pool_map_init_pernode(m);
311  		break;
312  	}
313  
314  	if (npools <= 0) {
315  		/* default, or memory allocation failure */
316  		npools = 1;
317  		m->mode = SVC_POOL_GLOBAL;
318  	}
319  	m->npools = npools;
320  	mutex_unlock(&svc_pool_map_mutex);
321  	return npools;
322  }
323  
324  /*
325   * Drop a reference to the global map of cpus to pools.
326   * When the last reference is dropped, the map data is
327   * freed; this allows the sysadmin to change the pool.
328   */
329  static void
svc_pool_map_put(void)330  svc_pool_map_put(void)
331  {
332  	struct svc_pool_map *m = &svc_pool_map;
333  
334  	mutex_lock(&svc_pool_map_mutex);
335  	if (!--m->count) {
336  		kfree(m->to_pool);
337  		m->to_pool = NULL;
338  		kfree(m->pool_to);
339  		m->pool_to = NULL;
340  		m->npools = 0;
341  	}
342  	mutex_unlock(&svc_pool_map_mutex);
343  }
344  
svc_pool_map_get_node(unsigned int pidx)345  static int svc_pool_map_get_node(unsigned int pidx)
346  {
347  	const struct svc_pool_map *m = &svc_pool_map;
348  
349  	if (m->count) {
350  		if (m->mode == SVC_POOL_PERCPU)
351  			return cpu_to_node(m->pool_to[pidx]);
352  		if (m->mode == SVC_POOL_PERNODE)
353  			return m->pool_to[pidx];
354  	}
355  	return NUMA_NO_NODE;
356  }
357  /*
358   * Set the given thread's cpus_allowed mask so that it
359   * will only run on cpus in the given pool.
360   */
361  static inline void
svc_pool_map_set_cpumask(struct task_struct * task,unsigned int pidx)362  svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
363  {
364  	struct svc_pool_map *m = &svc_pool_map;
365  	unsigned int node = m->pool_to[pidx];
366  
367  	/*
368  	 * The caller checks for sv_nrpools > 1, which
369  	 * implies that we've been initialized.
370  	 */
371  	WARN_ON_ONCE(m->count == 0);
372  	if (m->count == 0)
373  		return;
374  
375  	switch (m->mode) {
376  	case SVC_POOL_PERCPU:
377  	{
378  		set_cpus_allowed_ptr(task, cpumask_of(node));
379  		break;
380  	}
381  	case SVC_POOL_PERNODE:
382  	{
383  		set_cpus_allowed_ptr(task, cpumask_of_node(node));
384  		break;
385  	}
386  	}
387  }
388  
389  /**
390   * svc_pool_for_cpu - Select pool to run a thread on this cpu
391   * @serv: An RPC service
392   *
393   * Use the active CPU and the svc_pool_map's mode setting to
394   * select the svc thread pool to use. Once initialized, the
395   * svc_pool_map does not change.
396   *
397   * Return value:
398   *   A pointer to an svc_pool
399   */
svc_pool_for_cpu(struct svc_serv * serv)400  struct svc_pool *svc_pool_for_cpu(struct svc_serv *serv)
401  {
402  	struct svc_pool_map *m = &svc_pool_map;
403  	int cpu = raw_smp_processor_id();
404  	unsigned int pidx = 0;
405  
406  	if (serv->sv_nrpools <= 1)
407  		return serv->sv_pools;
408  
409  	switch (m->mode) {
410  	case SVC_POOL_PERCPU:
411  		pidx = m->to_pool[cpu];
412  		break;
413  	case SVC_POOL_PERNODE:
414  		pidx = m->to_pool[cpu_to_node(cpu)];
415  		break;
416  	}
417  
418  	return &serv->sv_pools[pidx % serv->sv_nrpools];
419  }
420  
svc_rpcb_setup(struct svc_serv * serv,struct net * net)421  static int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
422  {
423  	int err;
424  
425  	err = rpcb_create_local(net);
426  	if (err)
427  		return err;
428  
429  	/* Remove any stale portmap registrations */
430  	svc_unregister(serv, net);
431  	return 0;
432  }
433  
svc_rpcb_cleanup(struct svc_serv * serv,struct net * net)434  void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
435  {
436  	svc_unregister(serv, net);
437  	rpcb_put_local(net);
438  }
439  EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
440  
svc_uses_rpcbind(struct svc_serv * serv)441  static int svc_uses_rpcbind(struct svc_serv *serv)
442  {
443  	unsigned int		p, i;
444  
445  	for (p = 0; p < serv->sv_nprogs; p++) {
446  		struct svc_program *progp = &serv->sv_programs[p];
447  
448  		for (i = 0; i < progp->pg_nvers; i++) {
449  			if (progp->pg_vers[i] == NULL)
450  				continue;
451  			if (!progp->pg_vers[i]->vs_hidden)
452  				return 1;
453  		}
454  	}
455  
456  	return 0;
457  }
458  
svc_bind(struct svc_serv * serv,struct net * net)459  int svc_bind(struct svc_serv *serv, struct net *net)
460  {
461  	if (!svc_uses_rpcbind(serv))
462  		return 0;
463  	return svc_rpcb_setup(serv, net);
464  }
465  EXPORT_SYMBOL_GPL(svc_bind);
466  
467  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
468  static void
__svc_init_bc(struct svc_serv * serv)469  __svc_init_bc(struct svc_serv *serv)
470  {
471  	lwq_init(&serv->sv_cb_list);
472  }
473  #else
474  static void
__svc_init_bc(struct svc_serv * serv)475  __svc_init_bc(struct svc_serv *serv)
476  {
477  }
478  #endif
479  
480  /*
481   * Create an RPC service
482   */
483  static struct svc_serv *
__svc_create(struct svc_program * prog,int nprogs,struct svc_stat * stats,unsigned int bufsize,int npools,int (* threadfn)(void * data))484  __svc_create(struct svc_program *prog, int nprogs, struct svc_stat *stats,
485  	     unsigned int bufsize, int npools, int (*threadfn)(void *data))
486  {
487  	struct svc_serv	*serv;
488  	unsigned int vers;
489  	unsigned int xdrsize;
490  	unsigned int i;
491  
492  	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
493  		return NULL;
494  	serv->sv_name      = prog->pg_name;
495  	serv->sv_programs  = prog;
496  	serv->sv_nprogs    = nprogs;
497  	serv->sv_stats     = stats;
498  	if (bufsize > RPCSVC_MAXPAYLOAD)
499  		bufsize = RPCSVC_MAXPAYLOAD;
500  	serv->sv_max_payload = bufsize? bufsize : 4096;
501  	serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
502  	serv->sv_threadfn = threadfn;
503  	xdrsize = 0;
504  	for (i = 0; i < nprogs; i++) {
505  		struct svc_program *progp = &prog[i];
506  
507  		progp->pg_lovers = progp->pg_nvers-1;
508  		for (vers = 0; vers < progp->pg_nvers ; vers++)
509  			if (progp->pg_vers[vers]) {
510  				progp->pg_hivers = vers;
511  				if (progp->pg_lovers > vers)
512  					progp->pg_lovers = vers;
513  				if (progp->pg_vers[vers]->vs_xdrsize > xdrsize)
514  					xdrsize = progp->pg_vers[vers]->vs_xdrsize;
515  			}
516  	}
517  	serv->sv_xdrsize   = xdrsize;
518  	INIT_LIST_HEAD(&serv->sv_tempsocks);
519  	INIT_LIST_HEAD(&serv->sv_permsocks);
520  	timer_setup(&serv->sv_temptimer, NULL, 0);
521  	spin_lock_init(&serv->sv_lock);
522  
523  	__svc_init_bc(serv);
524  
525  	serv->sv_nrpools = npools;
526  	serv->sv_pools =
527  		kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
528  			GFP_KERNEL);
529  	if (!serv->sv_pools) {
530  		kfree(serv);
531  		return NULL;
532  	}
533  
534  	for (i = 0; i < serv->sv_nrpools; i++) {
535  		struct svc_pool *pool = &serv->sv_pools[i];
536  
537  		dprintk("svc: initialising pool %u for %s\n",
538  				i, serv->sv_name);
539  
540  		pool->sp_id = i;
541  		lwq_init(&pool->sp_xprts);
542  		INIT_LIST_HEAD(&pool->sp_all_threads);
543  		init_llist_head(&pool->sp_idle_threads);
544  
545  		percpu_counter_init(&pool->sp_messages_arrived, 0, GFP_KERNEL);
546  		percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL);
547  		percpu_counter_init(&pool->sp_threads_woken, 0, GFP_KERNEL);
548  	}
549  
550  	return serv;
551  }
552  
553  /**
554   * svc_create - Create an RPC service
555   * @prog: the RPC program the new service will handle
556   * @bufsize: maximum message size for @prog
557   * @threadfn: a function to service RPC requests for @prog
558   *
559   * Returns an instantiated struct svc_serv object or NULL.
560   */
svc_create(struct svc_program * prog,unsigned int bufsize,int (* threadfn)(void * data))561  struct svc_serv *svc_create(struct svc_program *prog, unsigned int bufsize,
562  			    int (*threadfn)(void *data))
563  {
564  	return __svc_create(prog, 1, NULL, bufsize, 1, threadfn);
565  }
566  EXPORT_SYMBOL_GPL(svc_create);
567  
568  /**
569   * svc_create_pooled - Create an RPC service with pooled threads
570   * @prog:  Array of RPC programs the new service will handle
571   * @nprogs: Number of programs in the array
572   * @stats: the stats struct if desired
573   * @bufsize: maximum message size for @prog
574   * @threadfn: a function to service RPC requests for @prog
575   *
576   * Returns an instantiated struct svc_serv object or NULL.
577   */
svc_create_pooled(struct svc_program * prog,unsigned int nprogs,struct svc_stat * stats,unsigned int bufsize,int (* threadfn)(void * data))578  struct svc_serv *svc_create_pooled(struct svc_program *prog,
579  				   unsigned int nprogs,
580  				   struct svc_stat *stats,
581  				   unsigned int bufsize,
582  				   int (*threadfn)(void *data))
583  {
584  	struct svc_serv *serv;
585  	unsigned int npools = svc_pool_map_get();
586  
587  	serv = __svc_create(prog, nprogs, stats, bufsize, npools, threadfn);
588  	if (!serv)
589  		goto out_err;
590  	serv->sv_is_pooled = true;
591  	return serv;
592  out_err:
593  	svc_pool_map_put();
594  	return NULL;
595  }
596  EXPORT_SYMBOL_GPL(svc_create_pooled);
597  
598  /*
599   * Destroy an RPC service. Should be called with appropriate locking to
600   * protect sv_permsocks and sv_tempsocks.
601   */
602  void
svc_destroy(struct svc_serv ** servp)603  svc_destroy(struct svc_serv **servp)
604  {
605  	struct svc_serv *serv = *servp;
606  	unsigned int i;
607  
608  	*servp = NULL;
609  
610  	dprintk("svc: svc_destroy(%s)\n", serv->sv_programs->pg_name);
611  	timer_shutdown_sync(&serv->sv_temptimer);
612  
613  	/*
614  	 * Remaining transports at this point are not expected.
615  	 */
616  	WARN_ONCE(!list_empty(&serv->sv_permsocks),
617  		  "SVC: permsocks remain for %s\n", serv->sv_programs->pg_name);
618  	WARN_ONCE(!list_empty(&serv->sv_tempsocks),
619  		  "SVC: tempsocks remain for %s\n", serv->sv_programs->pg_name);
620  
621  	cache_clean_deferred(serv);
622  
623  	if (serv->sv_is_pooled)
624  		svc_pool_map_put();
625  
626  	for (i = 0; i < serv->sv_nrpools; i++) {
627  		struct svc_pool *pool = &serv->sv_pools[i];
628  
629  		percpu_counter_destroy(&pool->sp_messages_arrived);
630  		percpu_counter_destroy(&pool->sp_sockets_queued);
631  		percpu_counter_destroy(&pool->sp_threads_woken);
632  	}
633  	kfree(serv->sv_pools);
634  	kfree(serv);
635  }
636  EXPORT_SYMBOL_GPL(svc_destroy);
637  
638  static bool
svc_init_buffer(struct svc_rqst * rqstp,unsigned int size,int node)639  svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
640  {
641  	unsigned long pages, ret;
642  
643  	/* bc_xprt uses fore channel allocated buffers */
644  	if (svc_is_backchannel(rqstp))
645  		return true;
646  
647  	pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
648  				       * We assume one is at most one page
649  				       */
650  	WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
651  	if (pages > RPCSVC_MAXPAGES)
652  		pages = RPCSVC_MAXPAGES;
653  
654  	ret = alloc_pages_bulk_array_node(GFP_KERNEL, node, pages,
655  					  rqstp->rq_pages);
656  	return ret == pages;
657  }
658  
659  /*
660   * Release an RPC server buffer
661   */
662  static void
svc_release_buffer(struct svc_rqst * rqstp)663  svc_release_buffer(struct svc_rqst *rqstp)
664  {
665  	unsigned int i;
666  
667  	for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
668  		if (rqstp->rq_pages[i])
669  			put_page(rqstp->rq_pages[i]);
670  }
671  
672  static void
svc_rqst_free(struct svc_rqst * rqstp)673  svc_rqst_free(struct svc_rqst *rqstp)
674  {
675  	folio_batch_release(&rqstp->rq_fbatch);
676  	svc_release_buffer(rqstp);
677  	if (rqstp->rq_scratch_page)
678  		put_page(rqstp->rq_scratch_page);
679  	kfree(rqstp->rq_resp);
680  	kfree(rqstp->rq_argp);
681  	kfree(rqstp->rq_auth_data);
682  	kfree_rcu(rqstp, rq_rcu_head);
683  }
684  
685  static struct svc_rqst *
svc_prepare_thread(struct svc_serv * serv,struct svc_pool * pool,int node)686  svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
687  {
688  	struct svc_rqst	*rqstp;
689  
690  	rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
691  	if (!rqstp)
692  		return rqstp;
693  
694  	folio_batch_init(&rqstp->rq_fbatch);
695  
696  	rqstp->rq_server = serv;
697  	rqstp->rq_pool = pool;
698  
699  	rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0);
700  	if (!rqstp->rq_scratch_page)
701  		goto out_enomem;
702  
703  	rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
704  	if (!rqstp->rq_argp)
705  		goto out_enomem;
706  
707  	rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
708  	if (!rqstp->rq_resp)
709  		goto out_enomem;
710  
711  	if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
712  		goto out_enomem;
713  
714  	rqstp->rq_err = -EAGAIN; /* No error yet */
715  
716  	serv->sv_nrthreads += 1;
717  	pool->sp_nrthreads += 1;
718  
719  	/* Protected by whatever lock the service uses when calling
720  	 * svc_set_num_threads()
721  	 */
722  	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
723  
724  	return rqstp;
725  
726  out_enomem:
727  	svc_rqst_free(rqstp);
728  	return NULL;
729  }
730  
731  /**
732   * svc_pool_wake_idle_thread - Awaken an idle thread in @pool
733   * @pool: service thread pool
734   *
735   * Can be called from soft IRQ or process context. Finding an idle
736   * service thread and marking it BUSY is atomic with respect to
737   * other calls to svc_pool_wake_idle_thread().
738   *
739   */
svc_pool_wake_idle_thread(struct svc_pool * pool)740  void svc_pool_wake_idle_thread(struct svc_pool *pool)
741  {
742  	struct svc_rqst	*rqstp;
743  	struct llist_node *ln;
744  
745  	rcu_read_lock();
746  	ln = READ_ONCE(pool->sp_idle_threads.first);
747  	if (ln) {
748  		rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
749  		WRITE_ONCE(rqstp->rq_qtime, ktime_get());
750  		if (!task_is_running(rqstp->rq_task)) {
751  			wake_up_process(rqstp->rq_task);
752  			trace_svc_wake_up(rqstp->rq_task->pid);
753  			percpu_counter_inc(&pool->sp_threads_woken);
754  		}
755  		rcu_read_unlock();
756  		return;
757  	}
758  	rcu_read_unlock();
759  
760  }
761  EXPORT_SYMBOL_GPL(svc_pool_wake_idle_thread);
762  
763  static struct svc_pool *
svc_pool_next(struct svc_serv * serv,struct svc_pool * pool,unsigned int * state)764  svc_pool_next(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
765  {
766  	return pool ? pool : &serv->sv_pools[(*state)++ % serv->sv_nrpools];
767  }
768  
769  static struct svc_pool *
svc_pool_victim(struct svc_serv * serv,struct svc_pool * target_pool,unsigned int * state)770  svc_pool_victim(struct svc_serv *serv, struct svc_pool *target_pool,
771  		unsigned int *state)
772  {
773  	struct svc_pool *pool;
774  	unsigned int i;
775  
776  	pool = target_pool;
777  
778  	if (!pool) {
779  		for (i = 0; i < serv->sv_nrpools; i++) {
780  			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
781  			if (pool->sp_nrthreads)
782  				break;
783  		}
784  	}
785  
786  	if (pool && pool->sp_nrthreads) {
787  		set_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
788  		set_bit(SP_NEED_VICTIM, &pool->sp_flags);
789  		return pool;
790  	}
791  	return NULL;
792  }
793  
794  static int
svc_start_kthreads(struct svc_serv * serv,struct svc_pool * pool,int nrservs)795  svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
796  {
797  	struct svc_rqst	*rqstp;
798  	struct task_struct *task;
799  	struct svc_pool *chosen_pool;
800  	unsigned int state = serv->sv_nrthreads-1;
801  	int node;
802  	int err;
803  
804  	do {
805  		nrservs--;
806  		chosen_pool = svc_pool_next(serv, pool, &state);
807  		node = svc_pool_map_get_node(chosen_pool->sp_id);
808  
809  		rqstp = svc_prepare_thread(serv, chosen_pool, node);
810  		if (!rqstp)
811  			return -ENOMEM;
812  		task = kthread_create_on_node(serv->sv_threadfn, rqstp,
813  					      node, "%s", serv->sv_name);
814  		if (IS_ERR(task)) {
815  			svc_exit_thread(rqstp);
816  			return PTR_ERR(task);
817  		}
818  
819  		rqstp->rq_task = task;
820  		if (serv->sv_nrpools > 1)
821  			svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
822  
823  		svc_sock_update_bufs(serv);
824  		wake_up_process(task);
825  
826  		wait_var_event(&rqstp->rq_err, rqstp->rq_err != -EAGAIN);
827  		err = rqstp->rq_err;
828  		if (err) {
829  			svc_exit_thread(rqstp);
830  			return err;
831  		}
832  	} while (nrservs > 0);
833  
834  	return 0;
835  }
836  
837  static int
svc_stop_kthreads(struct svc_serv * serv,struct svc_pool * pool,int nrservs)838  svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
839  {
840  	unsigned int state = serv->sv_nrthreads-1;
841  	struct svc_pool *victim;
842  
843  	do {
844  		victim = svc_pool_victim(serv, pool, &state);
845  		if (!victim)
846  			break;
847  		svc_pool_wake_idle_thread(victim);
848  		wait_on_bit(&victim->sp_flags, SP_VICTIM_REMAINS,
849  			    TASK_IDLE);
850  		nrservs++;
851  	} while (nrservs < 0);
852  	return 0;
853  }
854  
855  /**
856   * svc_set_num_threads - adjust number of threads per RPC service
857   * @serv: RPC service to adjust
858   * @pool: Specific pool from which to choose threads, or NULL
859   * @nrservs: New number of threads for @serv (0 or less means kill all threads)
860   *
861   * Create or destroy threads to make the number of threads for @serv the
862   * given number. If @pool is non-NULL, change only threads in that pool;
863   * otherwise, round-robin between all pools for @serv. @serv's
864   * sv_nrthreads is adjusted for each thread created or destroyed.
865   *
866   * Caller must ensure mutual exclusion between this and server startup or
867   * shutdown.
868   *
869   * Returns zero on success or a negative errno if an error occurred while
870   * starting a thread.
871   */
872  int
svc_set_num_threads(struct svc_serv * serv,struct svc_pool * pool,int nrservs)873  svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
874  {
875  	if (!pool)
876  		nrservs -= serv->sv_nrthreads;
877  	else
878  		nrservs -= pool->sp_nrthreads;
879  
880  	if (nrservs > 0)
881  		return svc_start_kthreads(serv, pool, nrservs);
882  	if (nrservs < 0)
883  		return svc_stop_kthreads(serv, pool, nrservs);
884  	return 0;
885  }
886  EXPORT_SYMBOL_GPL(svc_set_num_threads);
887  
888  /**
889   * svc_rqst_replace_page - Replace one page in rq_pages[]
890   * @rqstp: svc_rqst with pages to replace
891   * @page: replacement page
892   *
893   * When replacing a page in rq_pages, batch the release of the
894   * replaced pages to avoid hammering the page allocator.
895   *
896   * Return values:
897   *   %true: page replaced
898   *   %false: array bounds checking failed
899   */
svc_rqst_replace_page(struct svc_rqst * rqstp,struct page * page)900  bool svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page)
901  {
902  	struct page **begin = rqstp->rq_pages;
903  	struct page **end = &rqstp->rq_pages[RPCSVC_MAXPAGES];
904  
905  	if (unlikely(rqstp->rq_next_page < begin || rqstp->rq_next_page > end)) {
906  		trace_svc_replace_page_err(rqstp);
907  		return false;
908  	}
909  
910  	if (*rqstp->rq_next_page) {
911  		if (!folio_batch_add(&rqstp->rq_fbatch,
912  				page_folio(*rqstp->rq_next_page)))
913  			__folio_batch_release(&rqstp->rq_fbatch);
914  	}
915  
916  	get_page(page);
917  	*(rqstp->rq_next_page++) = page;
918  	return true;
919  }
920  EXPORT_SYMBOL_GPL(svc_rqst_replace_page);
921  
922  /**
923   * svc_rqst_release_pages - Release Reply buffer pages
924   * @rqstp: RPC transaction context
925   *
926   * Release response pages that might still be in flight after
927   * svc_send, and any spliced filesystem-owned pages.
928   */
svc_rqst_release_pages(struct svc_rqst * rqstp)929  void svc_rqst_release_pages(struct svc_rqst *rqstp)
930  {
931  	int i, count = rqstp->rq_next_page - rqstp->rq_respages;
932  
933  	if (count) {
934  		release_pages(rqstp->rq_respages, count);
935  		for (i = 0; i < count; i++)
936  			rqstp->rq_respages[i] = NULL;
937  	}
938  }
939  
940  /**
941   * svc_exit_thread - finalise the termination of a sunrpc server thread
942   * @rqstp: the svc_rqst which represents the thread.
943   *
944   * When a thread started with svc_new_thread() exits it must call
945   * svc_exit_thread() as its last act.  This must be done with the
946   * service mutex held.  Normally this is held by a DIFFERENT thread, the
947   * one that is calling svc_set_num_threads() and which will wait for
948   * SP_VICTIM_REMAINS to be cleared before dropping the mutex.  If the
949   * thread exits for any reason other than svc_thread_should_stop()
950   * returning %true (which indicated that svc_set_num_threads() is
951   * waiting for it to exit), then it must take the service mutex itself,
952   * which can only safely be done using mutex_try_lock().
953   */
954  void
svc_exit_thread(struct svc_rqst * rqstp)955  svc_exit_thread(struct svc_rqst *rqstp)
956  {
957  	struct svc_serv	*serv = rqstp->rq_server;
958  	struct svc_pool	*pool = rqstp->rq_pool;
959  
960  	list_del_rcu(&rqstp->rq_all);
961  
962  	pool->sp_nrthreads -= 1;
963  	serv->sv_nrthreads -= 1;
964  	svc_sock_update_bufs(serv);
965  
966  	svc_rqst_free(rqstp);
967  
968  	clear_and_wake_up_bit(SP_VICTIM_REMAINS, &pool->sp_flags);
969  }
970  EXPORT_SYMBOL_GPL(svc_exit_thread);
971  
972  /*
973   * Register an "inet" protocol family netid with the local
974   * rpcbind daemon via an rpcbind v4 SET request.
975   *
976   * No netconfig infrastructure is available in the kernel, so
977   * we map IP_ protocol numbers to netids by hand.
978   *
979   * Returns zero on success; a negative errno value is returned
980   * if any error occurs.
981   */
__svc_rpcb_register4(struct net * net,const u32 program,const u32 version,const unsigned short protocol,const unsigned short port)982  static int __svc_rpcb_register4(struct net *net, const u32 program,
983  				const u32 version,
984  				const unsigned short protocol,
985  				const unsigned short port)
986  {
987  	const struct sockaddr_in sin = {
988  		.sin_family		= AF_INET,
989  		.sin_addr.s_addr	= htonl(INADDR_ANY),
990  		.sin_port		= htons(port),
991  	};
992  	const char *netid;
993  	int error;
994  
995  	switch (protocol) {
996  	case IPPROTO_UDP:
997  		netid = RPCBIND_NETID_UDP;
998  		break;
999  	case IPPROTO_TCP:
1000  		netid = RPCBIND_NETID_TCP;
1001  		break;
1002  	default:
1003  		return -ENOPROTOOPT;
1004  	}
1005  
1006  	error = rpcb_v4_register(net, program, version,
1007  					(const struct sockaddr *)&sin, netid);
1008  
1009  	/*
1010  	 * User space didn't support rpcbind v4, so retry this
1011  	 * registration request with the legacy rpcbind v2 protocol.
1012  	 */
1013  	if (error == -EPROTONOSUPPORT)
1014  		error = rpcb_register(net, program, version, protocol, port);
1015  
1016  	return error;
1017  }
1018  
1019  #if IS_ENABLED(CONFIG_IPV6)
1020  /*
1021   * Register an "inet6" protocol family netid with the local
1022   * rpcbind daemon via an rpcbind v4 SET request.
1023   *
1024   * No netconfig infrastructure is available in the kernel, so
1025   * we map IP_ protocol numbers to netids by hand.
1026   *
1027   * Returns zero on success; a negative errno value is returned
1028   * if any error occurs.
1029   */
__svc_rpcb_register6(struct net * net,const u32 program,const u32 version,const unsigned short protocol,const unsigned short port)1030  static int __svc_rpcb_register6(struct net *net, const u32 program,
1031  				const u32 version,
1032  				const unsigned short protocol,
1033  				const unsigned short port)
1034  {
1035  	const struct sockaddr_in6 sin6 = {
1036  		.sin6_family		= AF_INET6,
1037  		.sin6_addr		= IN6ADDR_ANY_INIT,
1038  		.sin6_port		= htons(port),
1039  	};
1040  	const char *netid;
1041  	int error;
1042  
1043  	switch (protocol) {
1044  	case IPPROTO_UDP:
1045  		netid = RPCBIND_NETID_UDP6;
1046  		break;
1047  	case IPPROTO_TCP:
1048  		netid = RPCBIND_NETID_TCP6;
1049  		break;
1050  	default:
1051  		return -ENOPROTOOPT;
1052  	}
1053  
1054  	error = rpcb_v4_register(net, program, version,
1055  					(const struct sockaddr *)&sin6, netid);
1056  
1057  	/*
1058  	 * User space didn't support rpcbind version 4, so we won't
1059  	 * use a PF_INET6 listener.
1060  	 */
1061  	if (error == -EPROTONOSUPPORT)
1062  		error = -EAFNOSUPPORT;
1063  
1064  	return error;
1065  }
1066  #endif	/* IS_ENABLED(CONFIG_IPV6) */
1067  
1068  /*
1069   * Register a kernel RPC service via rpcbind version 4.
1070   *
1071   * Returns zero on success; a negative errno value is returned
1072   * if any error occurs.
1073   */
__svc_register(struct net * net,const char * progname,const u32 program,const u32 version,const int family,const unsigned short protocol,const unsigned short port)1074  static int __svc_register(struct net *net, const char *progname,
1075  			  const u32 program, const u32 version,
1076  			  const int family,
1077  			  const unsigned short protocol,
1078  			  const unsigned short port)
1079  {
1080  	int error = -EAFNOSUPPORT;
1081  
1082  	switch (family) {
1083  	case PF_INET:
1084  		error = __svc_rpcb_register4(net, program, version,
1085  						protocol, port);
1086  		break;
1087  #if IS_ENABLED(CONFIG_IPV6)
1088  	case PF_INET6:
1089  		error = __svc_rpcb_register6(net, program, version,
1090  						protocol, port);
1091  #endif
1092  	}
1093  
1094  	trace_svc_register(progname, version, family, protocol, port, error);
1095  	return error;
1096  }
1097  
1098  static
svc_rpcbind_set_version(struct net * net,const struct svc_program * progp,u32 version,int family,unsigned short proto,unsigned short port)1099  int svc_rpcbind_set_version(struct net *net,
1100  			    const struct svc_program *progp,
1101  			    u32 version, int family,
1102  			    unsigned short proto,
1103  			    unsigned short port)
1104  {
1105  	return __svc_register(net, progp->pg_name, progp->pg_prog,
1106  				version, family, proto, port);
1107  
1108  }
1109  
svc_generic_rpcbind_set(struct net * net,const struct svc_program * progp,u32 version,int family,unsigned short proto,unsigned short port)1110  int svc_generic_rpcbind_set(struct net *net,
1111  			    const struct svc_program *progp,
1112  			    u32 version, int family,
1113  			    unsigned short proto,
1114  			    unsigned short port)
1115  {
1116  	const struct svc_version *vers = progp->pg_vers[version];
1117  	int error;
1118  
1119  	if (vers == NULL)
1120  		return 0;
1121  
1122  	if (vers->vs_hidden) {
1123  		trace_svc_noregister(progp->pg_name, version, proto,
1124  				     port, family, 0);
1125  		return 0;
1126  	}
1127  
1128  	/*
1129  	 * Don't register a UDP port if we need congestion
1130  	 * control.
1131  	 */
1132  	if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1133  		return 0;
1134  
1135  	error = svc_rpcbind_set_version(net, progp, version,
1136  					family, proto, port);
1137  
1138  	return (vers->vs_rpcb_optnl) ? 0 : error;
1139  }
1140  EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set);
1141  
1142  /**
1143   * svc_register - register an RPC service with the local portmapper
1144   * @serv: svc_serv struct for the service to register
1145   * @net: net namespace for the service to register
1146   * @family: protocol family of service's listener socket
1147   * @proto: transport protocol number to advertise
1148   * @port: port to advertise
1149   *
1150   * Service is registered for any address in the passed-in protocol family
1151   */
svc_register(const struct svc_serv * serv,struct net * net,const int family,const unsigned short proto,const unsigned short port)1152  int svc_register(const struct svc_serv *serv, struct net *net,
1153  		 const int family, const unsigned short proto,
1154  		 const unsigned short port)
1155  {
1156  	unsigned int		p, i;
1157  	int			error = 0;
1158  
1159  	WARN_ON_ONCE(proto == 0 && port == 0);
1160  	if (proto == 0 && port == 0)
1161  		return -EINVAL;
1162  
1163  	for (p = 0; p < serv->sv_nprogs; p++) {
1164  		struct svc_program *progp = &serv->sv_programs[p];
1165  
1166  		for (i = 0; i < progp->pg_nvers; i++) {
1167  
1168  			error = progp->pg_rpcbind_set(net, progp, i,
1169  					family, proto, port);
1170  			if (error < 0) {
1171  				printk(KERN_WARNING "svc: failed to register "
1172  					"%sv%u RPC service (errno %d).\n",
1173  					progp->pg_name, i, -error);
1174  				break;
1175  			}
1176  		}
1177  	}
1178  
1179  	return error;
1180  }
1181  
1182  /*
1183   * If user space is running rpcbind, it should take the v4 UNSET
1184   * and clear everything for this [program, version].  If user space
1185   * is running portmap, it will reject the v4 UNSET, but won't have
1186   * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1187   * in this case to clear all existing entries for [program, version].
1188   */
__svc_unregister(struct net * net,const u32 program,const u32 version,const char * progname)1189  static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1190  			     const char *progname)
1191  {
1192  	int error;
1193  
1194  	error = rpcb_v4_register(net, program, version, NULL, "");
1195  
1196  	/*
1197  	 * User space didn't support rpcbind v4, so retry this
1198  	 * request with the legacy rpcbind v2 protocol.
1199  	 */
1200  	if (error == -EPROTONOSUPPORT)
1201  		error = rpcb_register(net, program, version, 0, 0);
1202  
1203  	trace_svc_unregister(progname, version, error);
1204  }
1205  
1206  /*
1207   * All netids, bind addresses and ports registered for [program, version]
1208   * are removed from the local rpcbind database (if the service is not
1209   * hidden) to make way for a new instance of the service.
1210   *
1211   * The result of unregistration is reported via dprintk for those who want
1212   * verification of the result, but is otherwise not important.
1213   */
svc_unregister(const struct svc_serv * serv,struct net * net)1214  static void svc_unregister(const struct svc_serv *serv, struct net *net)
1215  {
1216  	struct sighand_struct *sighand;
1217  	unsigned long flags;
1218  	unsigned int p, i;
1219  
1220  	clear_thread_flag(TIF_SIGPENDING);
1221  
1222  	for (p = 0; p < serv->sv_nprogs; p++) {
1223  		struct svc_program *progp = &serv->sv_programs[p];
1224  
1225  		for (i = 0; i < progp->pg_nvers; i++) {
1226  			if (progp->pg_vers[i] == NULL)
1227  				continue;
1228  			if (progp->pg_vers[i]->vs_hidden)
1229  				continue;
1230  			__svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1231  		}
1232  	}
1233  
1234  	rcu_read_lock();
1235  	sighand = rcu_dereference(current->sighand);
1236  	spin_lock_irqsave(&sighand->siglock, flags);
1237  	recalc_sigpending();
1238  	spin_unlock_irqrestore(&sighand->siglock, flags);
1239  	rcu_read_unlock();
1240  }
1241  
1242  /*
1243   * dprintk the given error with the address of the client that caused it.
1244   */
1245  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1246  static __printf(2, 3)
svc_printk(struct svc_rqst * rqstp,const char * fmt,...)1247  void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1248  {
1249  	struct va_format vaf;
1250  	va_list args;
1251  	char 	buf[RPC_MAX_ADDRBUFLEN];
1252  
1253  	va_start(args, fmt);
1254  
1255  	vaf.fmt = fmt;
1256  	vaf.va = &args;
1257  
1258  	dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1259  
1260  	va_end(args);
1261  }
1262  #else
svc_printk(struct svc_rqst * rqstp,const char * fmt,...)1263  static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1264  #endif
1265  
1266  __be32
svc_generic_init_request(struct svc_rqst * rqstp,const struct svc_program * progp,struct svc_process_info * ret)1267  svc_generic_init_request(struct svc_rqst *rqstp,
1268  		const struct svc_program *progp,
1269  		struct svc_process_info *ret)
1270  {
1271  	const struct svc_version *versp = NULL;	/* compiler food */
1272  	const struct svc_procedure *procp = NULL;
1273  
1274  	if (rqstp->rq_vers >= progp->pg_nvers )
1275  		goto err_bad_vers;
1276  	versp = progp->pg_vers[rqstp->rq_vers];
1277  	if (!versp)
1278  		goto err_bad_vers;
1279  
1280  	/*
1281  	 * Some protocol versions (namely NFSv4) require some form of
1282  	 * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
1283  	 * In other words, UDP is not allowed. We mark those when setting
1284  	 * up the svc_xprt, and verify that here.
1285  	 *
1286  	 * The spec is not very clear about what error should be returned
1287  	 * when someone tries to access a server that is listening on UDP
1288  	 * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1289  	 * fit.
1290  	 */
1291  	if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
1292  	    !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1293  		goto err_bad_vers;
1294  
1295  	if (rqstp->rq_proc >= versp->vs_nproc)
1296  		goto err_bad_proc;
1297  	rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
1298  
1299  	/* Initialize storage for argp and resp */
1300  	memset(rqstp->rq_argp, 0, procp->pc_argzero);
1301  	memset(rqstp->rq_resp, 0, procp->pc_ressize);
1302  
1303  	/* Bump per-procedure stats counter */
1304  	this_cpu_inc(versp->vs_count[rqstp->rq_proc]);
1305  
1306  	ret->dispatch = versp->vs_dispatch;
1307  	return rpc_success;
1308  err_bad_vers:
1309  	ret->mismatch.lovers = progp->pg_lovers;
1310  	ret->mismatch.hivers = progp->pg_hivers;
1311  	return rpc_prog_mismatch;
1312  err_bad_proc:
1313  	return rpc_proc_unavail;
1314  }
1315  EXPORT_SYMBOL_GPL(svc_generic_init_request);
1316  
1317  /*
1318   * Common routine for processing the RPC request.
1319   */
1320  static int
svc_process_common(struct svc_rqst * rqstp)1321  svc_process_common(struct svc_rqst *rqstp)
1322  {
1323  	struct xdr_stream	*xdr = &rqstp->rq_res_stream;
1324  	struct svc_program	*progp = NULL;
1325  	const struct svc_procedure *procp = NULL;
1326  	struct svc_serv		*serv = rqstp->rq_server;
1327  	struct svc_process_info process;
1328  	enum svc_auth_status	auth_res;
1329  	unsigned int		aoffset;
1330  	int			pr, rc;
1331  	__be32			*p;
1332  
1333  	/* Will be turned off only when NFSv4 Sessions are used */
1334  	set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1335  	clear_bit(RQ_DROPME, &rqstp->rq_flags);
1336  
1337  	/* Construct the first words of the reply: */
1338  	svcxdr_init_encode(rqstp);
1339  	xdr_stream_encode_be32(xdr, rqstp->rq_xid);
1340  	xdr_stream_encode_be32(xdr, rpc_reply);
1341  
1342  	p = xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 4);
1343  	if (unlikely(!p))
1344  		goto err_short_len;
1345  	if (*p++ != cpu_to_be32(RPC_VERSION))
1346  		goto err_bad_rpc;
1347  
1348  	xdr_stream_encode_be32(xdr, rpc_msg_accepted);
1349  
1350  	rqstp->rq_prog = be32_to_cpup(p++);
1351  	rqstp->rq_vers = be32_to_cpup(p++);
1352  	rqstp->rq_proc = be32_to_cpup(p);
1353  
1354  	for (pr = 0; pr < serv->sv_nprogs; pr++)
1355  		if (rqstp->rq_prog == serv->sv_programs[pr].pg_prog)
1356  			progp = &serv->sv_programs[pr];
1357  
1358  	/*
1359  	 * Decode auth data, and add verifier to reply buffer.
1360  	 * We do this before anything else in order to get a decent
1361  	 * auth verifier.
1362  	 */
1363  	auth_res = svc_authenticate(rqstp);
1364  	/* Also give the program a chance to reject this call: */
1365  	if (auth_res == SVC_OK && progp)
1366  		auth_res = progp->pg_authenticate(rqstp);
1367  	trace_svc_authenticate(rqstp, auth_res);
1368  	switch (auth_res) {
1369  	case SVC_OK:
1370  		break;
1371  	case SVC_GARBAGE:
1372  		goto err_garbage_args;
1373  	case SVC_SYSERR:
1374  		goto err_system_err;
1375  	case SVC_DENIED:
1376  		goto err_bad_auth;
1377  	case SVC_CLOSE:
1378  		goto close;
1379  	case SVC_DROP:
1380  		goto dropit;
1381  	case SVC_COMPLETE:
1382  		goto sendit;
1383  	default:
1384  		pr_warn_once("Unexpected svc_auth_status (%d)\n", auth_res);
1385  		goto err_system_err;
1386  	}
1387  
1388  	if (progp == NULL)
1389  		goto err_bad_prog;
1390  
1391  	switch (progp->pg_init_request(rqstp, progp, &process)) {
1392  	case rpc_success:
1393  		break;
1394  	case rpc_prog_unavail:
1395  		goto err_bad_prog;
1396  	case rpc_prog_mismatch:
1397  		goto err_bad_vers;
1398  	case rpc_proc_unavail:
1399  		goto err_bad_proc;
1400  	}
1401  
1402  	procp = rqstp->rq_procinfo;
1403  	/* Should this check go into the dispatcher? */
1404  	if (!procp || !procp->pc_func)
1405  		goto err_bad_proc;
1406  
1407  	/* Syntactic check complete */
1408  	if (serv->sv_stats)
1409  		serv->sv_stats->rpccnt++;
1410  	trace_svc_process(rqstp, progp->pg_name);
1411  
1412  	aoffset = xdr_stream_pos(xdr);
1413  
1414  	/* un-reserve some of the out-queue now that we have a
1415  	 * better idea of reply size
1416  	 */
1417  	if (procp->pc_xdrressize)
1418  		svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1419  
1420  	/* Call the function that processes the request. */
1421  	rc = process.dispatch(rqstp);
1422  	if (procp->pc_release)
1423  		procp->pc_release(rqstp);
1424  	xdr_finish_decode(xdr);
1425  
1426  	if (!rc)
1427  		goto dropit;
1428  	if (rqstp->rq_auth_stat != rpc_auth_ok)
1429  		goto err_bad_auth;
1430  
1431  	if (*rqstp->rq_accept_statp != rpc_success)
1432  		xdr_truncate_encode(xdr, aoffset);
1433  
1434  	if (procp->pc_encode == NULL)
1435  		goto dropit;
1436  
1437   sendit:
1438  	if (svc_authorise(rqstp))
1439  		goto close_xprt;
1440  	return 1;		/* Caller can now send it */
1441  
1442   dropit:
1443  	svc_authorise(rqstp);	/* doesn't hurt to call this twice */
1444  	dprintk("svc: svc_process dropit\n");
1445  	return 0;
1446  
1447   close:
1448  	svc_authorise(rqstp);
1449  close_xprt:
1450  	if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1451  		svc_xprt_close(rqstp->rq_xprt);
1452  	dprintk("svc: svc_process close\n");
1453  	return 0;
1454  
1455  err_short_len:
1456  	svc_printk(rqstp, "short len %u, dropping request\n",
1457  		   rqstp->rq_arg.len);
1458  	goto close_xprt;
1459  
1460  err_bad_rpc:
1461  	if (serv->sv_stats)
1462  		serv->sv_stats->rpcbadfmt++;
1463  	xdr_stream_encode_u32(xdr, RPC_MSG_DENIED);
1464  	xdr_stream_encode_u32(xdr, RPC_MISMATCH);
1465  	/* Only RPCv2 supported */
1466  	xdr_stream_encode_u32(xdr, RPC_VERSION);
1467  	xdr_stream_encode_u32(xdr, RPC_VERSION);
1468  	return 1;	/* don't wrap */
1469  
1470  err_bad_auth:
1471  	dprintk("svc: authentication failed (%d)\n",
1472  		be32_to_cpu(rqstp->rq_auth_stat));
1473  	if (serv->sv_stats)
1474  		serv->sv_stats->rpcbadauth++;
1475  	/* Restore write pointer to location of reply status: */
1476  	xdr_truncate_encode(xdr, XDR_UNIT * 2);
1477  	xdr_stream_encode_u32(xdr, RPC_MSG_DENIED);
1478  	xdr_stream_encode_u32(xdr, RPC_AUTH_ERROR);
1479  	xdr_stream_encode_be32(xdr, rqstp->rq_auth_stat);
1480  	goto sendit;
1481  
1482  err_bad_prog:
1483  	dprintk("svc: unknown program %d\n", rqstp->rq_prog);
1484  	if (serv->sv_stats)
1485  		serv->sv_stats->rpcbadfmt++;
1486  	*rqstp->rq_accept_statp = rpc_prog_unavail;
1487  	goto sendit;
1488  
1489  err_bad_vers:
1490  	svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1491  		       rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
1492  
1493  	if (serv->sv_stats)
1494  		serv->sv_stats->rpcbadfmt++;
1495  	*rqstp->rq_accept_statp = rpc_prog_mismatch;
1496  
1497  	/*
1498  	 * svc_authenticate() has already added the verifier and
1499  	 * advanced the stream just past rq_accept_statp.
1500  	 */
1501  	xdr_stream_encode_u32(xdr, process.mismatch.lovers);
1502  	xdr_stream_encode_u32(xdr, process.mismatch.hivers);
1503  	goto sendit;
1504  
1505  err_bad_proc:
1506  	svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
1507  
1508  	if (serv->sv_stats)
1509  		serv->sv_stats->rpcbadfmt++;
1510  	*rqstp->rq_accept_statp = rpc_proc_unavail;
1511  	goto sendit;
1512  
1513  err_garbage_args:
1514  	svc_printk(rqstp, "failed to decode RPC header\n");
1515  
1516  	if (serv->sv_stats)
1517  		serv->sv_stats->rpcbadfmt++;
1518  	*rqstp->rq_accept_statp = rpc_garbage_args;
1519  	goto sendit;
1520  
1521  err_system_err:
1522  	if (serv->sv_stats)
1523  		serv->sv_stats->rpcbadfmt++;
1524  	*rqstp->rq_accept_statp = rpc_system_err;
1525  	goto sendit;
1526  }
1527  
1528  /*
1529   * Drop request
1530   */
svc_drop(struct svc_rqst * rqstp)1531  static void svc_drop(struct svc_rqst *rqstp)
1532  {
1533  	trace_svc_drop(rqstp);
1534  }
1535  
1536  /**
1537   * svc_process - Execute one RPC transaction
1538   * @rqstp: RPC transaction context
1539   *
1540   */
svc_process(struct svc_rqst * rqstp)1541  void svc_process(struct svc_rqst *rqstp)
1542  {
1543  	struct kvec		*resv = &rqstp->rq_res.head[0];
1544  	__be32 *p;
1545  
1546  #if IS_ENABLED(CONFIG_FAIL_SUNRPC)
1547  	if (!fail_sunrpc.ignore_server_disconnect &&
1548  	    should_fail(&fail_sunrpc.attr, 1))
1549  		svc_xprt_deferred_close(rqstp->rq_xprt);
1550  #endif
1551  
1552  	/*
1553  	 * Setup response xdr_buf.
1554  	 * Initially it has just one page
1555  	 */
1556  	rqstp->rq_next_page = &rqstp->rq_respages[1];
1557  	resv->iov_base = page_address(rqstp->rq_respages[0]);
1558  	resv->iov_len = 0;
1559  	rqstp->rq_res.pages = rqstp->rq_next_page;
1560  	rqstp->rq_res.len = 0;
1561  	rqstp->rq_res.page_base = 0;
1562  	rqstp->rq_res.page_len = 0;
1563  	rqstp->rq_res.buflen = PAGE_SIZE;
1564  	rqstp->rq_res.tail[0].iov_base = NULL;
1565  	rqstp->rq_res.tail[0].iov_len = 0;
1566  
1567  	svcxdr_init_decode(rqstp);
1568  	p = xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 2);
1569  	if (unlikely(!p))
1570  		goto out_drop;
1571  	rqstp->rq_xid = *p++;
1572  	if (unlikely(*p != rpc_call))
1573  		goto out_baddir;
1574  
1575  	if (!svc_process_common(rqstp))
1576  		goto out_drop;
1577  	svc_send(rqstp);
1578  	return;
1579  
1580  out_baddir:
1581  	svc_printk(rqstp, "bad direction 0x%08x, dropping request\n",
1582  		   be32_to_cpu(*p));
1583  	if (rqstp->rq_server->sv_stats)
1584  		rqstp->rq_server->sv_stats->rpcbadfmt++;
1585  out_drop:
1586  	svc_drop(rqstp);
1587  }
1588  
1589  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1590  /**
1591   * svc_process_bc - process a reverse-direction RPC request
1592   * @req: RPC request to be used for client-side processing
1593   * @rqstp: server-side execution context
1594   *
1595   */
svc_process_bc(struct rpc_rqst * req,struct svc_rqst * rqstp)1596  void svc_process_bc(struct rpc_rqst *req, struct svc_rqst *rqstp)
1597  {
1598  	struct rpc_timeout timeout = {
1599  		.to_increment		= 0,
1600  	};
1601  	struct rpc_task *task;
1602  	int proc_error;
1603  
1604  	/* Build the svc_rqst used by the common processing routine */
1605  	rqstp->rq_xid = req->rq_xid;
1606  	rqstp->rq_prot = req->rq_xprt->prot;
1607  	rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1608  
1609  	rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1610  	memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1611  	memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1612  	memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1613  
1614  	/* Adjust the argument buffer length */
1615  	rqstp->rq_arg.len = req->rq_private_buf.len;
1616  	if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1617  		rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1618  		rqstp->rq_arg.page_len = 0;
1619  	} else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1620  			rqstp->rq_arg.page_len)
1621  		rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1622  			rqstp->rq_arg.head[0].iov_len;
1623  	else
1624  		rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1625  			rqstp->rq_arg.page_len;
1626  
1627  	/* Reset the response buffer */
1628  	rqstp->rq_res.head[0].iov_len = 0;
1629  
1630  	/*
1631  	 * Skip the XID and calldir fields because they've already
1632  	 * been processed by the caller.
1633  	 */
1634  	svcxdr_init_decode(rqstp);
1635  	if (!xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 2))
1636  		return;
1637  
1638  	/* Parse and execute the bc call */
1639  	proc_error = svc_process_common(rqstp);
1640  
1641  	atomic_dec(&req->rq_xprt->bc_slot_count);
1642  	if (!proc_error) {
1643  		/* Processing error: drop the request */
1644  		xprt_free_bc_request(req);
1645  		return;
1646  	}
1647  	/* Finally, send the reply synchronously */
1648  	if (rqstp->bc_to_initval > 0) {
1649  		timeout.to_initval = rqstp->bc_to_initval;
1650  		timeout.to_retries = rqstp->bc_to_retries;
1651  	} else {
1652  		timeout.to_initval = req->rq_xprt->timeout->to_initval;
1653  		timeout.to_retries = req->rq_xprt->timeout->to_retries;
1654  	}
1655  	timeout.to_maxval = timeout.to_initval;
1656  	memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1657  	task = rpc_run_bc_task(req, &timeout);
1658  
1659  	if (IS_ERR(task))
1660  		return;
1661  
1662  	WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1663  	rpc_put_task(task);
1664  }
1665  #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1666  
1667  /**
1668   * svc_max_payload - Return transport-specific limit on the RPC payload
1669   * @rqstp: RPC transaction context
1670   *
1671   * Returns the maximum number of payload bytes the current transport
1672   * allows.
1673   */
svc_max_payload(const struct svc_rqst * rqstp)1674  u32 svc_max_payload(const struct svc_rqst *rqstp)
1675  {
1676  	u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1677  
1678  	if (rqstp->rq_server->sv_max_payload < max)
1679  		max = rqstp->rq_server->sv_max_payload;
1680  	return max;
1681  }
1682  EXPORT_SYMBOL_GPL(svc_max_payload);
1683  
1684  /**
1685   * svc_proc_name - Return RPC procedure name in string form
1686   * @rqstp: svc_rqst to operate on
1687   *
1688   * Return value:
1689   *   Pointer to a NUL-terminated string
1690   */
svc_proc_name(const struct svc_rqst * rqstp)1691  const char *svc_proc_name(const struct svc_rqst *rqstp)
1692  {
1693  	if (rqstp && rqstp->rq_procinfo)
1694  		return rqstp->rq_procinfo->pc_name;
1695  	return "unknown";
1696  }
1697  
1698  
1699  /**
1700   * svc_encode_result_payload - mark a range of bytes as a result payload
1701   * @rqstp: svc_rqst to operate on
1702   * @offset: payload's byte offset in rqstp->rq_res
1703   * @length: size of payload, in bytes
1704   *
1705   * Returns zero on success, or a negative errno if a permanent
1706   * error occurred.
1707   */
svc_encode_result_payload(struct svc_rqst * rqstp,unsigned int offset,unsigned int length)1708  int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
1709  			      unsigned int length)
1710  {
1711  	return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
1712  							   length);
1713  }
1714  EXPORT_SYMBOL_GPL(svc_encode_result_payload);
1715  
1716  /**
1717   * svc_fill_write_vector - Construct data argument for VFS write call
1718   * @rqstp: svc_rqst to operate on
1719   * @payload: xdr_buf containing only the write data payload
1720   *
1721   * Fills in rqstp::rq_vec, and returns the number of elements.
1722   */
svc_fill_write_vector(struct svc_rqst * rqstp,struct xdr_buf * payload)1723  unsigned int svc_fill_write_vector(struct svc_rqst *rqstp,
1724  				   struct xdr_buf *payload)
1725  {
1726  	struct page **pages = payload->pages;
1727  	struct kvec *first = payload->head;
1728  	struct kvec *vec = rqstp->rq_vec;
1729  	size_t total = payload->len;
1730  	unsigned int i;
1731  
1732  	/* Some types of transport can present the write payload
1733  	 * entirely in rq_arg.pages. In this case, @first is empty.
1734  	 */
1735  	i = 0;
1736  	if (first->iov_len) {
1737  		vec[i].iov_base = first->iov_base;
1738  		vec[i].iov_len = min_t(size_t, total, first->iov_len);
1739  		total -= vec[i].iov_len;
1740  		++i;
1741  	}
1742  
1743  	while (total) {
1744  		vec[i].iov_base = page_address(*pages);
1745  		vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
1746  		total -= vec[i].iov_len;
1747  		++i;
1748  		++pages;
1749  	}
1750  
1751  	WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
1752  	return i;
1753  }
1754  EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1755  
1756  /**
1757   * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
1758   * @rqstp: svc_rqst to operate on
1759   * @first: buffer containing first section of pathname
1760   * @p: buffer containing remaining section of pathname
1761   * @total: total length of the pathname argument
1762   *
1763   * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
1764   * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
1765   * the returned string.
1766   */
svc_fill_symlink_pathname(struct svc_rqst * rqstp,struct kvec * first,void * p,size_t total)1767  char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1768  				void *p, size_t total)
1769  {
1770  	size_t len, remaining;
1771  	char *result, *dst;
1772  
1773  	result = kmalloc(total + 1, GFP_KERNEL);
1774  	if (!result)
1775  		return ERR_PTR(-ESERVERFAULT);
1776  
1777  	dst = result;
1778  	remaining = total;
1779  
1780  	len = min_t(size_t, total, first->iov_len);
1781  	if (len) {
1782  		memcpy(dst, first->iov_base, len);
1783  		dst += len;
1784  		remaining -= len;
1785  	}
1786  
1787  	if (remaining) {
1788  		len = min_t(size_t, remaining, PAGE_SIZE);
1789  		memcpy(dst, p, len);
1790  		dst += len;
1791  	}
1792  
1793  	*dst = '\0';
1794  
1795  	/* Sanity check: Linux doesn't allow the pathname argument to
1796  	 * contain a NUL byte.
1797  	 */
1798  	if (strlen(result) != total) {
1799  		kfree(result);
1800  		return ERR_PTR(-EINVAL);
1801  	}
1802  	return result;
1803  }
1804  EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);
1805