1  // SPDX-License-Identifier: GPL-2.0
2  /*
3   * Shared Memory Communications over RDMA (SMC-R) and RoCE
4   *
5   * Manage send buffer.
6   * Producer:
7   * Copy user space data into send buffer, if send buffer space available.
8   * Consumer:
9   * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
10   *
11   * Copyright IBM Corp. 2016
12   *
13   * Author(s):  Ursula Braun <ubraun@linux.vnet.ibm.com>
14   */
15  
16  #include <linux/net.h>
17  #include <linux/rcupdate.h>
18  #include <linux/workqueue.h>
19  #include <linux/sched/signal.h>
20  
21  #include <net/sock.h>
22  #include <net/tcp.h>
23  
24  #include "smc.h"
25  #include "smc_wr.h"
26  #include "smc_cdc.h"
27  #include "smc_close.h"
28  #include "smc_ism.h"
29  #include "smc_tx.h"
30  #include "smc_stats.h"
31  #include "smc_tracepoint.h"
32  
33  #define SMC_TX_WORK_DELAY	0
34  
35  /***************************** sndbuf producer *******************************/
36  
37  /* callback implementation for sk.sk_write_space()
38   * to wakeup sndbuf producers that blocked with smc_tx_wait().
39   * called under sk_socket lock.
40   */
smc_tx_write_space(struct sock * sk)41  static void smc_tx_write_space(struct sock *sk)
42  {
43  	struct socket *sock = sk->sk_socket;
44  	struct smc_sock *smc = smc_sk(sk);
45  	struct socket_wq *wq;
46  
47  	/* similar to sk_stream_write_space */
48  	if (atomic_read(&smc->conn.sndbuf_space) && sock) {
49  		if (test_bit(SOCK_NOSPACE, &sock->flags))
50  			SMC_STAT_RMB_TX_FULL(smc, !smc->conn.lnk);
51  		clear_bit(SOCK_NOSPACE, &sock->flags);
52  		rcu_read_lock();
53  		wq = rcu_dereference(sk->sk_wq);
54  		if (skwq_has_sleeper(wq))
55  			wake_up_interruptible_poll(&wq->wait,
56  						   EPOLLOUT | EPOLLWRNORM |
57  						   EPOLLWRBAND);
58  		if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
59  			sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
60  		rcu_read_unlock();
61  	}
62  }
63  
64  /* Wakeup sndbuf producers that blocked with smc_tx_wait().
65   * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
66   */
smc_tx_sndbuf_nonfull(struct smc_sock * smc)67  void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
68  {
69  	if (smc->sk.sk_socket &&
70  	    test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
71  		smc->sk.sk_write_space(&smc->sk);
72  }
73  
74  /* blocks sndbuf producer until at least one byte of free space available
75   * or urgent Byte was consumed
76   */
smc_tx_wait(struct smc_sock * smc,int flags)77  static int smc_tx_wait(struct smc_sock *smc, int flags)
78  {
79  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
80  	struct smc_connection *conn = &smc->conn;
81  	struct sock *sk = &smc->sk;
82  	long timeo;
83  	int rc = 0;
84  
85  	/* similar to sk_stream_wait_memory */
86  	timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
87  	add_wait_queue(sk_sleep(sk), &wait);
88  	while (1) {
89  		sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
90  		if (sk->sk_err ||
91  		    (sk->sk_shutdown & SEND_SHUTDOWN) ||
92  		    conn->killed ||
93  		    conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
94  			rc = -EPIPE;
95  			break;
96  		}
97  		if (smc_cdc_rxed_any_close(conn)) {
98  			rc = -ECONNRESET;
99  			break;
100  		}
101  		if (!timeo) {
102  			/* ensure EPOLLOUT is subsequently generated */
103  			set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
104  			rc = -EAGAIN;
105  			break;
106  		}
107  		if (signal_pending(current)) {
108  			rc = sock_intr_errno(timeo);
109  			break;
110  		}
111  		sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
112  		if (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend)
113  			break; /* at least 1 byte of free & no urgent data */
114  		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
115  		sk_wait_event(sk, &timeo,
116  			      READ_ONCE(sk->sk_err) ||
117  			      (READ_ONCE(sk->sk_shutdown) & SEND_SHUTDOWN) ||
118  			      smc_cdc_rxed_any_close(conn) ||
119  			      (atomic_read(&conn->sndbuf_space) &&
120  			       !conn->urg_tx_pend),
121  			      &wait);
122  	}
123  	remove_wait_queue(sk_sleep(sk), &wait);
124  	return rc;
125  }
126  
smc_tx_is_corked(struct smc_sock * smc)127  static bool smc_tx_is_corked(struct smc_sock *smc)
128  {
129  	struct tcp_sock *tp = tcp_sk(smc->clcsock->sk);
130  
131  	return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
132  }
133  
134  /* If we have pending CDC messages, do not send:
135   * Because CQE of this CDC message will happen shortly, it gives
136   * a chance to coalesce future sendmsg() payload in to one RDMA Write,
137   * without need for a timer, and with no latency trade off.
138   * Algorithm here:
139   *  1. First message should never cork
140   *  2. If we have pending Tx CDC messages, wait for the first CDC
141   *     message's completion
142   *  3. Don't cork to much data in a single RDMA Write to prevent burst
143   *     traffic, total corked message should not exceed sendbuf/2
144   */
smc_should_autocork(struct smc_sock * smc)145  static bool smc_should_autocork(struct smc_sock *smc)
146  {
147  	struct smc_connection *conn = &smc->conn;
148  	int corking_size;
149  
150  	corking_size = min_t(unsigned int, conn->sndbuf_desc->len >> 1,
151  			     sock_net(&smc->sk)->smc.sysctl_autocorking_size);
152  
153  	if (atomic_read(&conn->cdc_pend_tx_wr) == 0 ||
154  	    smc_tx_prepared_sends(conn) > corking_size)
155  		return false;
156  	return true;
157  }
158  
smc_tx_should_cork(struct smc_sock * smc,struct msghdr * msg)159  static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg)
160  {
161  	struct smc_connection *conn = &smc->conn;
162  
163  	if (smc_should_autocork(smc))
164  		return true;
165  
166  	/* for a corked socket defer the RDMA writes if
167  	 * sndbuf_space is still available. The applications
168  	 * should known how/when to uncork it.
169  	 */
170  	if ((msg->msg_flags & MSG_MORE ||
171  	     smc_tx_is_corked(smc)) &&
172  	    atomic_read(&conn->sndbuf_space))
173  		return true;
174  
175  	return false;
176  }
177  
178  /* sndbuf producer: main API called by socket layer.
179   * called under sock lock.
180   */
smc_tx_sendmsg(struct smc_sock * smc,struct msghdr * msg,size_t len)181  int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
182  {
183  	size_t copylen, send_done = 0, send_remaining = len;
184  	size_t chunk_len, chunk_off, chunk_len_sum;
185  	struct smc_connection *conn = &smc->conn;
186  	union smc_host_cursor prep;
187  	struct sock *sk = &smc->sk;
188  	char *sndbuf_base;
189  	int tx_cnt_prep;
190  	int writespace;
191  	int rc, chunk;
192  
193  	/* This should be in poll */
194  	sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
195  
196  	if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
197  		rc = -EPIPE;
198  		goto out_err;
199  	}
200  
201  	if (sk->sk_state == SMC_INIT)
202  		return -ENOTCONN;
203  
204  	if (len > conn->sndbuf_desc->len)
205  		SMC_STAT_RMB_TX_SIZE_SMALL(smc, !conn->lnk);
206  
207  	if (len > conn->peer_rmbe_size)
208  		SMC_STAT_RMB_TX_PEER_SIZE_SMALL(smc, !conn->lnk);
209  
210  	if (msg->msg_flags & MSG_OOB)
211  		SMC_STAT_INC(smc, urg_data_cnt);
212  
213  	while (msg_data_left(msg)) {
214  		if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
215  		    (smc->sk.sk_err == ECONNABORTED) ||
216  		    conn->killed)
217  			return -EPIPE;
218  		if (smc_cdc_rxed_any_close(conn))
219  			return send_done ?: -ECONNRESET;
220  
221  		if (msg->msg_flags & MSG_OOB)
222  			conn->local_tx_ctrl.prod_flags.urg_data_pending = 1;
223  
224  		if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) {
225  			if (send_done)
226  				return send_done;
227  			rc = smc_tx_wait(smc, msg->msg_flags);
228  			if (rc)
229  				goto out_err;
230  			continue;
231  		}
232  
233  		/* initialize variables for 1st iteration of subsequent loop */
234  		/* could be just 1 byte, even after smc_tx_wait above */
235  		writespace = atomic_read(&conn->sndbuf_space);
236  		/* not more than what user space asked for */
237  		copylen = min_t(size_t, send_remaining, writespace);
238  		/* determine start of sndbuf */
239  		sndbuf_base = conn->sndbuf_desc->cpu_addr;
240  		smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
241  		tx_cnt_prep = prep.count;
242  		/* determine chunks where to write into sndbuf */
243  		/* either unwrapped case, or 1st chunk of wrapped case */
244  		chunk_len = min_t(size_t, copylen, conn->sndbuf_desc->len -
245  				  tx_cnt_prep);
246  		chunk_len_sum = chunk_len;
247  		chunk_off = tx_cnt_prep;
248  		for (chunk = 0; chunk < 2; chunk++) {
249  			rc = memcpy_from_msg(sndbuf_base + chunk_off,
250  					     msg, chunk_len);
251  			if (rc) {
252  				smc_sndbuf_sync_sg_for_device(conn);
253  				if (send_done)
254  					return send_done;
255  				goto out_err;
256  			}
257  			send_done += chunk_len;
258  			send_remaining -= chunk_len;
259  
260  			if (chunk_len_sum == copylen)
261  				break; /* either on 1st or 2nd iteration */
262  			/* prepare next (== 2nd) iteration */
263  			chunk_len = copylen - chunk_len; /* remainder */
264  			chunk_len_sum += chunk_len;
265  			chunk_off = 0; /* modulo offset in send ring buffer */
266  		}
267  		smc_sndbuf_sync_sg_for_device(conn);
268  		/* update cursors */
269  		smc_curs_add(conn->sndbuf_desc->len, &prep, copylen);
270  		smc_curs_copy(&conn->tx_curs_prep, &prep, conn);
271  		/* increased in send tasklet smc_cdc_tx_handler() */
272  		smp_mb__before_atomic();
273  		atomic_sub(copylen, &conn->sndbuf_space);
274  		/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
275  		smp_mb__after_atomic();
276  		/* since we just produced more new data into sndbuf,
277  		 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
278  		 */
279  		if ((msg->msg_flags & MSG_OOB) && !send_remaining)
280  			conn->urg_tx_pend = true;
281  		/* If we need to cork, do nothing and wait for the next
282  		 * sendmsg() call or push on tx completion
283  		 */
284  		if (!smc_tx_should_cork(smc, msg))
285  			smc_tx_sndbuf_nonempty(conn);
286  
287  		trace_smc_tx_sendmsg(smc, copylen);
288  	} /* while (msg_data_left(msg)) */
289  
290  	return send_done;
291  
292  out_err:
293  	rc = sk_stream_error(sk, msg->msg_flags, rc);
294  	/* make sure we wake any epoll edge trigger waiter */
295  	if (unlikely(rc == -EAGAIN))
296  		sk->sk_write_space(sk);
297  	return rc;
298  }
299  
300  /***************************** sndbuf consumer *******************************/
301  
302  /* sndbuf consumer: actual data transfer of one target chunk with ISM write */
smcd_tx_ism_write(struct smc_connection * conn,void * data,size_t len,u32 offset,int signal)303  int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
304  		      u32 offset, int signal)
305  {
306  	int rc;
307  
308  	rc = smc_ism_write(conn->lgr->smcd, conn->peer_token,
309  			   conn->peer_rmbe_idx, signal, conn->tx_off + offset,
310  			   data, len);
311  	if (rc)
312  		conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
313  	return rc;
314  }
315  
316  /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
smc_tx_rdma_write(struct smc_connection * conn,int peer_rmbe_offset,int num_sges,struct ib_rdma_wr * rdma_wr)317  static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
318  			     int num_sges, struct ib_rdma_wr *rdma_wr)
319  {
320  	struct smc_link_group *lgr = conn->lgr;
321  	struct smc_link *link = conn->lnk;
322  	int rc;
323  
324  	rdma_wr->wr.wr_id = smc_wr_tx_get_next_wr_id(link);
325  	rdma_wr->wr.num_sge = num_sges;
326  	rdma_wr->remote_addr =
327  		lgr->rtokens[conn->rtoken_idx][link->link_idx].dma_addr +
328  		/* RMBE within RMB */
329  		conn->tx_off +
330  		/* offset within RMBE */
331  		peer_rmbe_offset;
332  	rdma_wr->rkey = lgr->rtokens[conn->rtoken_idx][link->link_idx].rkey;
333  	rc = ib_post_send(link->roce_qp, &rdma_wr->wr, NULL);
334  	if (rc)
335  		smcr_link_down_cond_sched(link);
336  	return rc;
337  }
338  
339  /* sndbuf consumer */
smc_tx_advance_cursors(struct smc_connection * conn,union smc_host_cursor * prod,union smc_host_cursor * sent,size_t len)340  static inline void smc_tx_advance_cursors(struct smc_connection *conn,
341  					  union smc_host_cursor *prod,
342  					  union smc_host_cursor *sent,
343  					  size_t len)
344  {
345  	smc_curs_add(conn->peer_rmbe_size, prod, len);
346  	/* increased in recv tasklet smc_cdc_msg_rcv() */
347  	smp_mb__before_atomic();
348  	/* data in flight reduces usable snd_wnd */
349  	atomic_sub(len, &conn->peer_rmbe_space);
350  	/* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
351  	smp_mb__after_atomic();
352  	smc_curs_add(conn->sndbuf_desc->len, sent, len);
353  }
354  
355  /* SMC-R helper for smc_tx_rdma_writes() */
smcr_tx_rdma_writes(struct smc_connection * conn,size_t len,size_t src_off,size_t src_len,size_t dst_off,size_t dst_len,struct smc_rdma_wr * wr_rdma_buf)356  static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len,
357  			       size_t src_off, size_t src_len,
358  			       size_t dst_off, size_t dst_len,
359  			       struct smc_rdma_wr *wr_rdma_buf)
360  {
361  	struct smc_link *link = conn->lnk;
362  
363  	dma_addr_t dma_addr =
364  		sg_dma_address(conn->sndbuf_desc->sgt[link->link_idx].sgl);
365  	u64 virt_addr = (uintptr_t)conn->sndbuf_desc->cpu_addr;
366  	int src_len_sum = src_len, dst_len_sum = dst_len;
367  	int sent_count = src_off;
368  	int srcchunk, dstchunk;
369  	int num_sges;
370  	int rc;
371  
372  	for (dstchunk = 0; dstchunk < 2; dstchunk++) {
373  		struct ib_rdma_wr *wr = &wr_rdma_buf->wr_tx_rdma[dstchunk];
374  		struct ib_sge *sge = wr->wr.sg_list;
375  		u64 base_addr = dma_addr;
376  
377  		if (dst_len < link->qp_attr.cap.max_inline_data) {
378  			base_addr = virt_addr;
379  			wr->wr.send_flags |= IB_SEND_INLINE;
380  		} else {
381  			wr->wr.send_flags &= ~IB_SEND_INLINE;
382  		}
383  
384  		num_sges = 0;
385  		for (srcchunk = 0; srcchunk < 2; srcchunk++) {
386  			sge[srcchunk].addr = conn->sndbuf_desc->is_vm ?
387  				(virt_addr + src_off) : (base_addr + src_off);
388  			sge[srcchunk].length = src_len;
389  			if (conn->sndbuf_desc->is_vm)
390  				sge[srcchunk].lkey =
391  					conn->sndbuf_desc->mr[link->link_idx]->lkey;
392  			num_sges++;
393  
394  			src_off += src_len;
395  			if (src_off >= conn->sndbuf_desc->len)
396  				src_off -= conn->sndbuf_desc->len;
397  						/* modulo in send ring */
398  			if (src_len_sum == dst_len)
399  				break; /* either on 1st or 2nd iteration */
400  			/* prepare next (== 2nd) iteration */
401  			src_len = dst_len - src_len; /* remainder */
402  			src_len_sum += src_len;
403  		}
404  		rc = smc_tx_rdma_write(conn, dst_off, num_sges, wr);
405  		if (rc)
406  			return rc;
407  		if (dst_len_sum == len)
408  			break; /* either on 1st or 2nd iteration */
409  		/* prepare next (== 2nd) iteration */
410  		dst_off = 0; /* modulo offset in RMBE ring buffer */
411  		dst_len = len - dst_len; /* remainder */
412  		dst_len_sum += dst_len;
413  		src_len = min_t(int, dst_len, conn->sndbuf_desc->len -
414  				sent_count);
415  		src_len_sum = src_len;
416  	}
417  	return 0;
418  }
419  
420  /* SMC-D helper for smc_tx_rdma_writes() */
smcd_tx_rdma_writes(struct smc_connection * conn,size_t len,size_t src_off,size_t src_len,size_t dst_off,size_t dst_len)421  static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len,
422  			       size_t src_off, size_t src_len,
423  			       size_t dst_off, size_t dst_len)
424  {
425  	int src_len_sum = src_len, dst_len_sum = dst_len;
426  	int srcchunk, dstchunk;
427  	int rc;
428  
429  	for (dstchunk = 0; dstchunk < 2; dstchunk++) {
430  		for (srcchunk = 0; srcchunk < 2; srcchunk++) {
431  			void *data = conn->sndbuf_desc->cpu_addr + src_off;
432  
433  			rc = smcd_tx_ism_write(conn, data, src_len, dst_off +
434  					       sizeof(struct smcd_cdc_msg), 0);
435  			if (rc)
436  				return rc;
437  			dst_off += src_len;
438  			src_off += src_len;
439  			if (src_off >= conn->sndbuf_desc->len)
440  				src_off -= conn->sndbuf_desc->len;
441  						/* modulo in send ring */
442  			if (src_len_sum == dst_len)
443  				break; /* either on 1st or 2nd iteration */
444  			/* prepare next (== 2nd) iteration */
445  			src_len = dst_len - src_len; /* remainder */
446  			src_len_sum += src_len;
447  		}
448  		if (dst_len_sum == len)
449  			break; /* either on 1st or 2nd iteration */
450  		/* prepare next (== 2nd) iteration */
451  		dst_off = 0; /* modulo offset in RMBE ring buffer */
452  		dst_len = len - dst_len; /* remainder */
453  		dst_len_sum += dst_len;
454  		src_len = min_t(int, dst_len, conn->sndbuf_desc->len - src_off);
455  		src_len_sum = src_len;
456  	}
457  	return 0;
458  }
459  
460  /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
461   * usable snd_wnd as max transmit
462   */
smc_tx_rdma_writes(struct smc_connection * conn,struct smc_rdma_wr * wr_rdma_buf)463  static int smc_tx_rdma_writes(struct smc_connection *conn,
464  			      struct smc_rdma_wr *wr_rdma_buf)
465  {
466  	size_t len, src_len, dst_off, dst_len; /* current chunk values */
467  	union smc_host_cursor sent, prep, prod, cons;
468  	struct smc_cdc_producer_flags *pflags;
469  	int to_send, rmbespace;
470  	int rc;
471  
472  	/* source: sndbuf */
473  	smc_curs_copy(&sent, &conn->tx_curs_sent, conn);
474  	smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
475  	/* cf. wmem_alloc - (snd_max - snd_una) */
476  	to_send = smc_curs_diff(conn->sndbuf_desc->len, &sent, &prep);
477  	if (to_send <= 0)
478  		return 0;
479  
480  	/* destination: RMBE */
481  	/* cf. snd_wnd */
482  	rmbespace = atomic_read(&conn->peer_rmbe_space);
483  	if (rmbespace <= 0) {
484  		struct smc_sock *smc = container_of(conn, struct smc_sock,
485  						    conn);
486  		SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
487  		return 0;
488  	}
489  	smc_curs_copy(&prod, &conn->local_tx_ctrl.prod, conn);
490  	smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
491  
492  	/* if usable snd_wnd closes ask peer to advertise once it opens again */
493  	pflags = &conn->local_tx_ctrl.prod_flags;
494  	pflags->write_blocked = (to_send >= rmbespace);
495  	/* cf. usable snd_wnd */
496  	len = min(to_send, rmbespace);
497  
498  	/* initialize variables for first iteration of subsequent nested loop */
499  	dst_off = prod.count;
500  	if (prod.wrap == cons.wrap) {
501  		/* the filled destination area is unwrapped,
502  		 * hence the available free destination space is wrapped
503  		 * and we need 2 destination chunks of sum len; start with 1st
504  		 * which is limited by what's available in sndbuf
505  		 */
506  		dst_len = min_t(size_t,
507  				conn->peer_rmbe_size - prod.count, len);
508  	} else {
509  		/* the filled destination area is wrapped,
510  		 * hence the available free destination space is unwrapped
511  		 * and we need a single destination chunk of entire len
512  		 */
513  		dst_len = len;
514  	}
515  	/* dst_len determines the maximum src_len */
516  	if (sent.count + dst_len <= conn->sndbuf_desc->len) {
517  		/* unwrapped src case: single chunk of entire dst_len */
518  		src_len = dst_len;
519  	} else {
520  		/* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
521  		src_len = conn->sndbuf_desc->len - sent.count;
522  	}
523  
524  	if (conn->lgr->is_smcd)
525  		rc = smcd_tx_rdma_writes(conn, len, sent.count, src_len,
526  					 dst_off, dst_len);
527  	else
528  		rc = smcr_tx_rdma_writes(conn, len, sent.count, src_len,
529  					 dst_off, dst_len, wr_rdma_buf);
530  	if (rc)
531  		return rc;
532  
533  	if (conn->urg_tx_pend && len == to_send)
534  		pflags->urg_data_present = 1;
535  	smc_tx_advance_cursors(conn, &prod, &sent, len);
536  	/* update connection's cursors with advanced local cursors */
537  	smc_curs_copy(&conn->local_tx_ctrl.prod, &prod, conn);
538  							/* dst: peer RMBE */
539  	smc_curs_copy(&conn->tx_curs_sent, &sent, conn);/* src: local sndbuf */
540  
541  	return 0;
542  }
543  
544  /* Wakeup sndbuf consumers from any context (IRQ or process)
545   * since there is more data to transmit; usable snd_wnd as max transmit
546   */
smcr_tx_sndbuf_nonempty(struct smc_connection * conn)547  static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
548  {
549  	struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
550  	struct smc_link *link = conn->lnk;
551  	struct smc_rdma_wr *wr_rdma_buf;
552  	struct smc_cdc_tx_pend *pend;
553  	struct smc_wr_buf *wr_buf;
554  	int rc;
555  
556  	if (!link || !smc_wr_tx_link_hold(link))
557  		return -ENOLINK;
558  	rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
559  	if (rc < 0) {
560  		smc_wr_tx_link_put(link);
561  		if (rc == -EBUSY) {
562  			struct smc_sock *smc =
563  				container_of(conn, struct smc_sock, conn);
564  
565  			if (smc->sk.sk_err == ECONNABORTED)
566  				return sock_error(&smc->sk);
567  			if (conn->killed)
568  				return -EPIPE;
569  			rc = 0;
570  			mod_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
571  					 SMC_TX_WORK_DELAY);
572  		}
573  		return rc;
574  	}
575  
576  	spin_lock_bh(&conn->send_lock);
577  	if (link != conn->lnk) {
578  		/* link of connection changed, tx_work will restart */
579  		smc_wr_tx_put_slot(link,
580  				   (struct smc_wr_tx_pend_priv *)pend);
581  		rc = -ENOLINK;
582  		goto out_unlock;
583  	}
584  	if (!pflags->urg_data_present) {
585  		rc = smc_tx_rdma_writes(conn, wr_rdma_buf);
586  		if (rc) {
587  			smc_wr_tx_put_slot(link,
588  					   (struct smc_wr_tx_pend_priv *)pend);
589  			goto out_unlock;
590  		}
591  	}
592  
593  	rc = smc_cdc_msg_send(conn, wr_buf, pend);
594  	if (!rc && pflags->urg_data_present) {
595  		pflags->urg_data_pending = 0;
596  		pflags->urg_data_present = 0;
597  	}
598  
599  out_unlock:
600  	spin_unlock_bh(&conn->send_lock);
601  	smc_wr_tx_link_put(link);
602  	return rc;
603  }
604  
smcd_tx_sndbuf_nonempty(struct smc_connection * conn)605  static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
606  {
607  	struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
608  	int rc = 0;
609  
610  	spin_lock_bh(&conn->send_lock);
611  	if (!pflags->urg_data_present)
612  		rc = smc_tx_rdma_writes(conn, NULL);
613  	if (!rc)
614  		rc = smcd_cdc_msg_send(conn);
615  
616  	if (!rc && pflags->urg_data_present) {
617  		pflags->urg_data_pending = 0;
618  		pflags->urg_data_present = 0;
619  	}
620  	spin_unlock_bh(&conn->send_lock);
621  	return rc;
622  }
623  
smc_tx_sndbuf_nonempty(struct smc_connection * conn)624  int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
625  {
626  	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
627  	int rc = 0;
628  
629  	/* No data in the send queue */
630  	if (unlikely(smc_tx_prepared_sends(conn) <= 0))
631  		goto out;
632  
633  	/* Peer don't have RMBE space */
634  	if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) {
635  		SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
636  		goto out;
637  	}
638  
639  	if (conn->killed ||
640  	    conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
641  		rc = -EPIPE;    /* connection being aborted */
642  		goto out;
643  	}
644  	if (conn->lgr->is_smcd)
645  		rc = smcd_tx_sndbuf_nonempty(conn);
646  	else
647  		rc = smcr_tx_sndbuf_nonempty(conn);
648  
649  	if (!rc) {
650  		/* trigger socket release if connection is closing */
651  		smc_close_wake_tx_prepared(smc);
652  	}
653  
654  out:
655  	return rc;
656  }
657  
658  /* Wakeup sndbuf consumers from process context
659   * since there is more data to transmit. The caller
660   * must hold sock lock.
661   */
smc_tx_pending(struct smc_connection * conn)662  void smc_tx_pending(struct smc_connection *conn)
663  {
664  	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
665  	int rc;
666  
667  	if (smc->sk.sk_err)
668  		return;
669  
670  	rc = smc_tx_sndbuf_nonempty(conn);
671  	if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
672  	    !atomic_read(&conn->bytes_to_rcv))
673  		conn->local_rx_ctrl.prod_flags.write_blocked = 0;
674  }
675  
676  /* Wakeup sndbuf consumers from process context
677   * since there is more data to transmit in locked
678   * sock.
679   */
smc_tx_work(struct work_struct * work)680  void smc_tx_work(struct work_struct *work)
681  {
682  	struct smc_connection *conn = container_of(to_delayed_work(work),
683  						   struct smc_connection,
684  						   tx_work);
685  	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
686  
687  	lock_sock(&smc->sk);
688  	smc_tx_pending(conn);
689  	release_sock(&smc->sk);
690  }
691  
smc_tx_consumer_update(struct smc_connection * conn,bool force)692  void smc_tx_consumer_update(struct smc_connection *conn, bool force)
693  {
694  	union smc_host_cursor cfed, cons, prod;
695  	int sender_free = conn->rmb_desc->len;
696  	int to_confirm;
697  
698  	smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
699  	smc_curs_copy(&cfed, &conn->rx_curs_confirmed, conn);
700  	to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
701  	if (to_confirm > conn->rmbe_update_limit) {
702  		smc_curs_copy(&prod, &conn->local_rx_ctrl.prod, conn);
703  		sender_free = conn->rmb_desc->len -
704  			      smc_curs_diff_large(conn->rmb_desc->len,
705  						  &cfed, &prod);
706  	}
707  
708  	if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
709  	    force ||
710  	    ((to_confirm > conn->rmbe_update_limit) &&
711  	     ((sender_free <= (conn->rmb_desc->len / 2)) ||
712  	      conn->local_rx_ctrl.prod_flags.write_blocked))) {
713  		if (conn->killed ||
714  		    conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
715  			return;
716  		if ((smc_cdc_get_slot_and_msg_send(conn) < 0) &&
717  		    !conn->killed) {
718  			queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
719  					   SMC_TX_WORK_DELAY);
720  			return;
721  		}
722  	}
723  	if (conn->local_rx_ctrl.prod_flags.write_blocked &&
724  	    !atomic_read(&conn->bytes_to_rcv))
725  		conn->local_rx_ctrl.prod_flags.write_blocked = 0;
726  }
727  
728  /***************************** send initialize *******************************/
729  
730  /* Initialize send properties on connection establishment. NB: not __init! */
smc_tx_init(struct smc_sock * smc)731  void smc_tx_init(struct smc_sock *smc)
732  {
733  	smc->sk.sk_write_space = smc_tx_write_space;
734  }
735