1  // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2  /*
3   * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4   * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5   *
6   * This software is available to you under a choice of one of two
7   * licenses.  You may choose to be licensed under the terms of the GNU
8   * General Public License (GPL) Version 2, available from the file
9   * COPYING in the main directory of this source tree, or the BSD-type
10   * license below:
11   *
12   * Redistribution and use in source and binary forms, with or without
13   * modification, are permitted provided that the following conditions
14   * are met:
15   *
16   *      Redistributions of source code must retain the above copyright
17   *      notice, this list of conditions and the following disclaimer.
18   *
19   *      Redistributions in binary form must reproduce the above
20   *      copyright notice, this list of conditions and the following
21   *      disclaimer in the documentation and/or other materials provided
22   *      with the distribution.
23   *
24   *      Neither the name of the Network Appliance, Inc. nor the names of
25   *      its contributors may be used to endorse or promote products
26   *      derived from this software without specific prior written
27   *      permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30   * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31   * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32   * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33   * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39   * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40   */
41  
42  /*
43   * transport.c
44   *
45   * This file contains the top-level implementation of an RPC RDMA
46   * transport.
47   *
48   * Naming convention: functions beginning with xprt_ are part of the
49   * transport switch. All others are RPC RDMA internal.
50   */
51  
52  #include <linux/module.h>
53  #include <linux/slab.h>
54  #include <linux/seq_file.h>
55  #include <linux/smp.h>
56  
57  #include <linux/sunrpc/addr.h>
58  #include <linux/sunrpc/svc_rdma.h>
59  
60  #include "xprt_rdma.h"
61  #include <trace/events/rpcrdma.h>
62  
63  /*
64   * tunables
65   */
66  
67  static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
68  unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
69  unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
70  unsigned int xprt_rdma_memreg_strategy		= RPCRDMA_FRWR;
71  int xprt_rdma_pad_optimize;
72  static struct xprt_class xprt_rdma;
73  
74  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
75  
76  static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
77  static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
78  static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
79  static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
80  static unsigned int max_padding = PAGE_SIZE;
81  static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
82  static unsigned int max_memreg = RPCRDMA_LAST - 1;
83  static unsigned int dummy;
84  
85  static struct ctl_table_header *sunrpc_table_header;
86  
87  static struct ctl_table xr_tunables_table[] = {
88  	{
89  		.procname	= "rdma_slot_table_entries",
90  		.data		= &xprt_rdma_slot_table_entries,
91  		.maxlen		= sizeof(unsigned int),
92  		.mode		= 0644,
93  		.proc_handler	= proc_dointvec_minmax,
94  		.extra1		= &min_slot_table_size,
95  		.extra2		= &max_slot_table_size
96  	},
97  	{
98  		.procname	= "rdma_max_inline_read",
99  		.data		= &xprt_rdma_max_inline_read,
100  		.maxlen		= sizeof(unsigned int),
101  		.mode		= 0644,
102  		.proc_handler	= proc_dointvec_minmax,
103  		.extra1		= &min_inline_size,
104  		.extra2		= &max_inline_size,
105  	},
106  	{
107  		.procname	= "rdma_max_inline_write",
108  		.data		= &xprt_rdma_max_inline_write,
109  		.maxlen		= sizeof(unsigned int),
110  		.mode		= 0644,
111  		.proc_handler	= proc_dointvec_minmax,
112  		.extra1		= &min_inline_size,
113  		.extra2		= &max_inline_size,
114  	},
115  	{
116  		.procname	= "rdma_inline_write_padding",
117  		.data		= &dummy,
118  		.maxlen		= sizeof(unsigned int),
119  		.mode		= 0644,
120  		.proc_handler	= proc_dointvec_minmax,
121  		.extra1		= SYSCTL_ZERO,
122  		.extra2		= &max_padding,
123  	},
124  	{
125  		.procname	= "rdma_memreg_strategy",
126  		.data		= &xprt_rdma_memreg_strategy,
127  		.maxlen		= sizeof(unsigned int),
128  		.mode		= 0644,
129  		.proc_handler	= proc_dointvec_minmax,
130  		.extra1		= &min_memreg,
131  		.extra2		= &max_memreg,
132  	},
133  	{
134  		.procname	= "rdma_pad_optimize",
135  		.data		= &xprt_rdma_pad_optimize,
136  		.maxlen		= sizeof(unsigned int),
137  		.mode		= 0644,
138  		.proc_handler	= proc_dointvec,
139  	},
140  };
141  
142  #endif
143  
144  static const struct rpc_xprt_ops xprt_rdma_procs;
145  
146  static void
xprt_rdma_format_addresses4(struct rpc_xprt * xprt,struct sockaddr * sap)147  xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
148  {
149  	struct sockaddr_in *sin = (struct sockaddr_in *)sap;
150  	char buf[20];
151  
152  	snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
153  	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
154  
155  	xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
156  }
157  
158  static void
xprt_rdma_format_addresses6(struct rpc_xprt * xprt,struct sockaddr * sap)159  xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
160  {
161  	struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
162  	char buf[40];
163  
164  	snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
165  	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
166  
167  	xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
168  }
169  
170  void
xprt_rdma_format_addresses(struct rpc_xprt * xprt,struct sockaddr * sap)171  xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
172  {
173  	char buf[128];
174  
175  	switch (sap->sa_family) {
176  	case AF_INET:
177  		xprt_rdma_format_addresses4(xprt, sap);
178  		break;
179  	case AF_INET6:
180  		xprt_rdma_format_addresses6(xprt, sap);
181  		break;
182  	default:
183  		pr_err("rpcrdma: Unrecognized address family\n");
184  		return;
185  	}
186  
187  	(void)rpc_ntop(sap, buf, sizeof(buf));
188  	xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
189  
190  	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
191  	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
192  
193  	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
194  	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
195  
196  	xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
197  }
198  
199  void
xprt_rdma_free_addresses(struct rpc_xprt * xprt)200  xprt_rdma_free_addresses(struct rpc_xprt *xprt)
201  {
202  	unsigned int i;
203  
204  	for (i = 0; i < RPC_DISPLAY_MAX; i++)
205  		switch (i) {
206  		case RPC_DISPLAY_PROTO:
207  		case RPC_DISPLAY_NETID:
208  			continue;
209  		default:
210  			kfree(xprt->address_strings[i]);
211  		}
212  }
213  
214  /**
215   * xprt_rdma_connect_worker - establish connection in the background
216   * @work: worker thread context
217   *
218   * Requester holds the xprt's send lock to prevent activity on this
219   * transport while a fresh connection is being established. RPC tasks
220   * sleep on the xprt's pending queue waiting for connect to complete.
221   */
222  static void
xprt_rdma_connect_worker(struct work_struct * work)223  xprt_rdma_connect_worker(struct work_struct *work)
224  {
225  	struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
226  						   rx_connect_worker.work);
227  	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
228  	unsigned int pflags = current->flags;
229  	int rc;
230  
231  	if (atomic_read(&xprt->swapper))
232  		current->flags |= PF_MEMALLOC;
233  	rc = rpcrdma_xprt_connect(r_xprt);
234  	xprt_clear_connecting(xprt);
235  	if (!rc) {
236  		xprt->connect_cookie++;
237  		xprt->stat.connect_count++;
238  		xprt->stat.connect_time += (long)jiffies -
239  					   xprt->stat.connect_start;
240  		xprt_set_connected(xprt);
241  		rc = -EAGAIN;
242  	} else
243  		rpcrdma_xprt_disconnect(r_xprt);
244  	xprt_unlock_connect(xprt, r_xprt);
245  	xprt_wake_pending_tasks(xprt, rc);
246  	current_restore_flags(pflags, PF_MEMALLOC);
247  }
248  
249  /**
250   * xprt_rdma_inject_disconnect - inject a connection fault
251   * @xprt: transport context
252   *
253   * If @xprt is connected, disconnect it to simulate spurious
254   * connection loss. Caller must hold @xprt's send lock to
255   * ensure that data structures and hardware resources are
256   * stable during the rdma_disconnect() call.
257   */
258  static void
xprt_rdma_inject_disconnect(struct rpc_xprt * xprt)259  xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
260  {
261  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
262  
263  	trace_xprtrdma_op_inject_dsc(r_xprt);
264  	rdma_disconnect(r_xprt->rx_ep->re_id);
265  }
266  
267  /**
268   * xprt_rdma_destroy - Full tear down of transport
269   * @xprt: doomed transport context
270   *
271   * Caller guarantees there will be no more calls to us with
272   * this @xprt.
273   */
274  static void
xprt_rdma_destroy(struct rpc_xprt * xprt)275  xprt_rdma_destroy(struct rpc_xprt *xprt)
276  {
277  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
278  
279  	cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
280  
281  	rpcrdma_xprt_disconnect(r_xprt);
282  	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
283  
284  	xprt_rdma_free_addresses(xprt);
285  	xprt_free(xprt);
286  
287  	module_put(THIS_MODULE);
288  }
289  
290  /* 60 second timeout, no retries */
291  static const struct rpc_timeout xprt_rdma_default_timeout = {
292  	.to_initval = 60 * HZ,
293  	.to_maxval = 60 * HZ,
294  };
295  
296  /**
297   * xprt_setup_rdma - Set up transport to use RDMA
298   *
299   * @args: rpc transport arguments
300   */
301  static struct rpc_xprt *
xprt_setup_rdma(struct xprt_create * args)302  xprt_setup_rdma(struct xprt_create *args)
303  {
304  	struct rpc_xprt *xprt;
305  	struct rpcrdma_xprt *new_xprt;
306  	struct sockaddr *sap;
307  	int rc;
308  
309  	if (args->addrlen > sizeof(xprt->addr))
310  		return ERR_PTR(-EBADF);
311  
312  	if (!try_module_get(THIS_MODULE))
313  		return ERR_PTR(-EIO);
314  
315  	xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0,
316  			  xprt_rdma_slot_table_entries);
317  	if (!xprt) {
318  		module_put(THIS_MODULE);
319  		return ERR_PTR(-ENOMEM);
320  	}
321  
322  	xprt->timeout = &xprt_rdma_default_timeout;
323  	xprt->connect_timeout = xprt->timeout->to_initval;
324  	xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
325  	xprt->bind_timeout = RPCRDMA_BIND_TO;
326  	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
327  	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
328  
329  	xprt->resvport = 0;		/* privileged port not needed */
330  	xprt->ops = &xprt_rdma_procs;
331  
332  	/*
333  	 * Set up RDMA-specific connect data.
334  	 */
335  	sap = args->dstaddr;
336  
337  	/* Ensure xprt->addr holds valid server TCP (not RDMA)
338  	 * address, for any side protocols which peek at it */
339  	xprt->prot = IPPROTO_TCP;
340  	xprt->xprt_class = &xprt_rdma;
341  	xprt->addrlen = args->addrlen;
342  	memcpy(&xprt->addr, sap, xprt->addrlen);
343  
344  	if (rpc_get_port(sap))
345  		xprt_set_bound(xprt);
346  	xprt_rdma_format_addresses(xprt, sap);
347  
348  	new_xprt = rpcx_to_rdmax(xprt);
349  	rc = rpcrdma_buffer_create(new_xprt);
350  	if (rc) {
351  		xprt_rdma_free_addresses(xprt);
352  		xprt_free(xprt);
353  		module_put(THIS_MODULE);
354  		return ERR_PTR(rc);
355  	}
356  
357  	INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
358  			  xprt_rdma_connect_worker);
359  
360  	xprt->max_payload = RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
361  
362  	return xprt;
363  }
364  
365  /**
366   * xprt_rdma_close - close a transport connection
367   * @xprt: transport context
368   *
369   * Called during autoclose or device removal.
370   *
371   * Caller holds @xprt's send lock to prevent activity on this
372   * transport while the connection is torn down.
373   */
xprt_rdma_close(struct rpc_xprt * xprt)374  void xprt_rdma_close(struct rpc_xprt *xprt)
375  {
376  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
377  
378  	rpcrdma_xprt_disconnect(r_xprt);
379  
380  	xprt->reestablish_timeout = 0;
381  	++xprt->connect_cookie;
382  	xprt_disconnect_done(xprt);
383  }
384  
385  /**
386   * xprt_rdma_set_port - update server port with rpcbind result
387   * @xprt: controlling RPC transport
388   * @port: new port value
389   *
390   * Transport connect status is unchanged.
391   */
392  static void
xprt_rdma_set_port(struct rpc_xprt * xprt,u16 port)393  xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
394  {
395  	struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
396  	char buf[8];
397  
398  	rpc_set_port(sap, port);
399  
400  	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
401  	snprintf(buf, sizeof(buf), "%u", port);
402  	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
403  
404  	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
405  	snprintf(buf, sizeof(buf), "%4hx", port);
406  	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
407  }
408  
409  /**
410   * xprt_rdma_timer - invoked when an RPC times out
411   * @xprt: controlling RPC transport
412   * @task: RPC task that timed out
413   *
414   * Invoked when the transport is still connected, but an RPC
415   * retransmit timeout occurs.
416   *
417   * Since RDMA connections don't have a keep-alive, forcibly
418   * disconnect and retry to connect. This drives full
419   * detection of the network path, and retransmissions of
420   * all pending RPCs.
421   */
422  static void
xprt_rdma_timer(struct rpc_xprt * xprt,struct rpc_task * task)423  xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
424  {
425  	xprt_force_disconnect(xprt);
426  }
427  
428  /**
429   * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
430   * @xprt: controlling transport instance
431   * @connect_timeout: reconnect timeout after client disconnects
432   * @reconnect_timeout: reconnect timeout after server disconnects
433   *
434   */
xprt_rdma_set_connect_timeout(struct rpc_xprt * xprt,unsigned long connect_timeout,unsigned long reconnect_timeout)435  static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
436  					  unsigned long connect_timeout,
437  					  unsigned long reconnect_timeout)
438  {
439  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
440  
441  	trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
442  
443  	spin_lock(&xprt->transport_lock);
444  
445  	if (connect_timeout < xprt->connect_timeout) {
446  		struct rpc_timeout to;
447  		unsigned long initval;
448  
449  		to = *xprt->timeout;
450  		initval = connect_timeout;
451  		if (initval < RPCRDMA_INIT_REEST_TO << 1)
452  			initval = RPCRDMA_INIT_REEST_TO << 1;
453  		to.to_initval = initval;
454  		to.to_maxval = initval;
455  		r_xprt->rx_timeout = to;
456  		xprt->timeout = &r_xprt->rx_timeout;
457  		xprt->connect_timeout = connect_timeout;
458  	}
459  
460  	if (reconnect_timeout < xprt->max_reconnect_timeout)
461  		xprt->max_reconnect_timeout = reconnect_timeout;
462  
463  	spin_unlock(&xprt->transport_lock);
464  }
465  
466  /**
467   * xprt_rdma_connect - schedule an attempt to reconnect
468   * @xprt: transport state
469   * @task: RPC scheduler context (unused)
470   *
471   */
472  static void
xprt_rdma_connect(struct rpc_xprt * xprt,struct rpc_task * task)473  xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
474  {
475  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
476  	struct rpcrdma_ep *ep = r_xprt->rx_ep;
477  	unsigned long delay;
478  
479  	WARN_ON_ONCE(!xprt_lock_connect(xprt, task, r_xprt));
480  
481  	delay = 0;
482  	if (ep && ep->re_connect_status != 0) {
483  		delay = xprt_reconnect_delay(xprt);
484  		xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
485  	}
486  	trace_xprtrdma_op_connect(r_xprt, delay);
487  	queue_delayed_work(system_long_wq, &r_xprt->rx_connect_worker, delay);
488  }
489  
490  /**
491   * xprt_rdma_alloc_slot - allocate an rpc_rqst
492   * @xprt: controlling RPC transport
493   * @task: RPC task requesting a fresh rpc_rqst
494   *
495   * tk_status values:
496   *	%0 if task->tk_rqstp points to a fresh rpc_rqst
497   *	%-EAGAIN if no rpc_rqst is available; queued on backlog
498   */
499  static void
xprt_rdma_alloc_slot(struct rpc_xprt * xprt,struct rpc_task * task)500  xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
501  {
502  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
503  	struct rpcrdma_req *req;
504  
505  	req = rpcrdma_buffer_get(&r_xprt->rx_buf);
506  	if (!req)
507  		goto out_sleep;
508  	task->tk_rqstp = &req->rl_slot;
509  	task->tk_status = 0;
510  	return;
511  
512  out_sleep:
513  	task->tk_status = -ENOMEM;
514  	xprt_add_backlog(xprt, task);
515  }
516  
517  /**
518   * xprt_rdma_free_slot - release an rpc_rqst
519   * @xprt: controlling RPC transport
520   * @rqst: rpc_rqst to release
521   *
522   */
523  static void
xprt_rdma_free_slot(struct rpc_xprt * xprt,struct rpc_rqst * rqst)524  xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
525  {
526  	struct rpcrdma_xprt *r_xprt =
527  		container_of(xprt, struct rpcrdma_xprt, rx_xprt);
528  
529  	rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
530  	if (!xprt_wake_up_backlog(xprt, rqst)) {
531  		memset(rqst, 0, sizeof(*rqst));
532  		rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
533  	}
534  }
535  
rpcrdma_check_regbuf(struct rpcrdma_xprt * r_xprt,struct rpcrdma_regbuf * rb,size_t size,gfp_t flags)536  static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
537  				 struct rpcrdma_regbuf *rb, size_t size,
538  				 gfp_t flags)
539  {
540  	if (unlikely(rdmab_length(rb) < size)) {
541  		if (!rpcrdma_regbuf_realloc(rb, size, flags))
542  			return false;
543  		r_xprt->rx_stats.hardway_register_count += size;
544  	}
545  	return true;
546  }
547  
548  /**
549   * xprt_rdma_allocate - allocate transport resources for an RPC
550   * @task: RPC task
551   *
552   * Return values:
553   *        0:	Success; rq_buffer points to RPC buffer to use
554   *   ENOMEM:	Out of memory, call again later
555   *      EIO:	A permanent error occurred, do not retry
556   */
557  static int
xprt_rdma_allocate(struct rpc_task * task)558  xprt_rdma_allocate(struct rpc_task *task)
559  {
560  	struct rpc_rqst *rqst = task->tk_rqstp;
561  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
562  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
563  	gfp_t flags = rpc_task_gfp_mask();
564  
565  	if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize,
566  				  flags))
567  		goto out_fail;
568  	if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize,
569  				  flags))
570  		goto out_fail;
571  
572  	rqst->rq_buffer = rdmab_data(req->rl_sendbuf);
573  	rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf);
574  	return 0;
575  
576  out_fail:
577  	return -ENOMEM;
578  }
579  
580  /**
581   * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
582   * @task: RPC task
583   *
584   * Caller guarantees rqst->rq_buffer is non-NULL.
585   */
586  static void
xprt_rdma_free(struct rpc_task * task)587  xprt_rdma_free(struct rpc_task *task)
588  {
589  	struct rpc_rqst *rqst = task->tk_rqstp;
590  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
591  
592  	if (unlikely(!list_empty(&req->rl_registered))) {
593  		trace_xprtrdma_mrs_zap(task);
594  		frwr_unmap_sync(rpcx_to_rdmax(rqst->rq_xprt), req);
595  	}
596  
597  	/* XXX: If the RPC is completing because of a signal and
598  	 * not because a reply was received, we ought to ensure
599  	 * that the Send completion has fired, so that memory
600  	 * involved with the Send is not still visible to the NIC.
601  	 */
602  }
603  
604  /**
605   * xprt_rdma_send_request - marshal and send an RPC request
606   * @rqst: RPC message in rq_snd_buf
607   *
608   * Caller holds the transport's write lock.
609   *
610   * Returns:
611   *	%0 if the RPC message has been sent
612   *	%-ENOTCONN if the caller should reconnect and call again
613   *	%-EAGAIN if the caller should call again
614   *	%-ENOBUFS if the caller should call again after a delay
615   *	%-EMSGSIZE if encoding ran out of buffer space. The request
616   *		was not sent. Do not try to send this message again.
617   *	%-EIO if an I/O error occurred. The request was not sent.
618   *		Do not try to send this message again.
619   */
620  static int
xprt_rdma_send_request(struct rpc_rqst * rqst)621  xprt_rdma_send_request(struct rpc_rqst *rqst)
622  {
623  	struct rpc_xprt *xprt = rqst->rq_xprt;
624  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
625  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
626  	int rc = 0;
627  
628  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
629  	if (unlikely(!rqst->rq_buffer))
630  		return xprt_rdma_bc_send_reply(rqst);
631  #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
632  
633  	if (!xprt_connected(xprt))
634  		return -ENOTCONN;
635  
636  	if (!xprt_request_get_cong(xprt, rqst))
637  		return -EBADSLT;
638  
639  	rc = rpcrdma_marshal_req(r_xprt, rqst);
640  	if (rc < 0)
641  		goto failed_marshal;
642  
643  	/* Must suppress retransmit to maintain credits */
644  	if (rqst->rq_connect_cookie == xprt->connect_cookie)
645  		goto drop_connection;
646  	rqst->rq_xtime = ktime_get();
647  
648  	if (frwr_send(r_xprt, req))
649  		goto drop_connection;
650  
651  	rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
652  
653  	/* An RPC with no reply will throw off credit accounting,
654  	 * so drop the connection to reset the credit grant.
655  	 */
656  	if (!rpc_reply_expected(rqst->rq_task))
657  		goto drop_connection;
658  	return 0;
659  
660  failed_marshal:
661  	if (rc != -ENOTCONN)
662  		return rc;
663  drop_connection:
664  	xprt_rdma_close(xprt);
665  	return -ENOTCONN;
666  }
667  
xprt_rdma_print_stats(struct rpc_xprt * xprt,struct seq_file * seq)668  void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
669  {
670  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
671  	long idle_time = 0;
672  
673  	if (xprt_connected(xprt))
674  		idle_time = (long)(jiffies - xprt->last_used) / HZ;
675  
676  	seq_puts(seq, "\txprt:\trdma ");
677  	seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
678  		   0,	/* need a local port? */
679  		   xprt->stat.bind_count,
680  		   xprt->stat.connect_count,
681  		   xprt->stat.connect_time / HZ,
682  		   idle_time,
683  		   xprt->stat.sends,
684  		   xprt->stat.recvs,
685  		   xprt->stat.bad_xids,
686  		   xprt->stat.req_u,
687  		   xprt->stat.bklog_u);
688  	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
689  		   r_xprt->rx_stats.read_chunk_count,
690  		   r_xprt->rx_stats.write_chunk_count,
691  		   r_xprt->rx_stats.reply_chunk_count,
692  		   r_xprt->rx_stats.total_rdma_request,
693  		   r_xprt->rx_stats.total_rdma_reply,
694  		   r_xprt->rx_stats.pullup_copy_count,
695  		   r_xprt->rx_stats.fixup_copy_count,
696  		   r_xprt->rx_stats.hardway_register_count,
697  		   r_xprt->rx_stats.failed_marshal_count,
698  		   r_xprt->rx_stats.bad_reply_count,
699  		   r_xprt->rx_stats.nomsg_call_count);
700  	seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
701  		   r_xprt->rx_stats.mrs_recycled,
702  		   r_xprt->rx_stats.mrs_orphaned,
703  		   r_xprt->rx_stats.mrs_allocated,
704  		   r_xprt->rx_stats.local_inv_needed,
705  		   r_xprt->rx_stats.empty_sendctx_q,
706  		   r_xprt->rx_stats.reply_waits_for_send);
707  }
708  
709  static int
xprt_rdma_enable_swap(struct rpc_xprt * xprt)710  xprt_rdma_enable_swap(struct rpc_xprt *xprt)
711  {
712  	return 0;
713  }
714  
715  static void
xprt_rdma_disable_swap(struct rpc_xprt * xprt)716  xprt_rdma_disable_swap(struct rpc_xprt *xprt)
717  {
718  }
719  
720  /*
721   * Plumbing for rpc transport switch and kernel module
722   */
723  
724  static const struct rpc_xprt_ops xprt_rdma_procs = {
725  	.reserve_xprt		= xprt_reserve_xprt_cong,
726  	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
727  	.alloc_slot		= xprt_rdma_alloc_slot,
728  	.free_slot		= xprt_rdma_free_slot,
729  	.release_request	= xprt_release_rqst_cong,       /* ditto */
730  	.wait_for_reply_request	= xprt_wait_for_reply_request_def, /* ditto */
731  	.timer			= xprt_rdma_timer,
732  	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
733  	.set_port		= xprt_rdma_set_port,
734  	.connect		= xprt_rdma_connect,
735  	.buf_alloc		= xprt_rdma_allocate,
736  	.buf_free		= xprt_rdma_free,
737  	.send_request		= xprt_rdma_send_request,
738  	.close			= xprt_rdma_close,
739  	.destroy		= xprt_rdma_destroy,
740  	.set_connect_timeout	= xprt_rdma_set_connect_timeout,
741  	.print_stats		= xprt_rdma_print_stats,
742  	.enable_swap		= xprt_rdma_enable_swap,
743  	.disable_swap		= xprt_rdma_disable_swap,
744  	.inject_disconnect	= xprt_rdma_inject_disconnect,
745  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
746  	.bc_setup		= xprt_rdma_bc_setup,
747  	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
748  	.bc_num_slots		= xprt_rdma_bc_max_slots,
749  	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
750  	.bc_destroy		= xprt_rdma_bc_destroy,
751  #endif
752  };
753  
754  static struct xprt_class xprt_rdma = {
755  	.list			= LIST_HEAD_INIT(xprt_rdma.list),
756  	.name			= "rdma",
757  	.owner			= THIS_MODULE,
758  	.ident			= XPRT_TRANSPORT_RDMA,
759  	.setup			= xprt_setup_rdma,
760  	.netid			= { "rdma", "rdma6", "" },
761  };
762  
xprt_rdma_cleanup(void)763  void xprt_rdma_cleanup(void)
764  {
765  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
766  	if (sunrpc_table_header) {
767  		unregister_sysctl_table(sunrpc_table_header);
768  		sunrpc_table_header = NULL;
769  	}
770  #endif
771  
772  	xprt_unregister_transport(&xprt_rdma);
773  	xprt_unregister_transport(&xprt_rdma_bc);
774  }
775  
xprt_rdma_init(void)776  int xprt_rdma_init(void)
777  {
778  	int rc;
779  
780  	rc = xprt_register_transport(&xprt_rdma);
781  	if (rc)
782  		return rc;
783  
784  	rc = xprt_register_transport(&xprt_rdma_bc);
785  	if (rc) {
786  		xprt_unregister_transport(&xprt_rdma);
787  		return rc;
788  	}
789  
790  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
791  	if (!sunrpc_table_header)
792  		sunrpc_table_header = register_sysctl("sunrpc", xr_tunables_table);
793  #endif
794  	return 0;
795  }
796