1  // SPDX-License-Identifier: GPL-2.0-only
2  /******************************************************************************
3  *******************************************************************************
4  **
5  **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6  **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
7  **
8  **
9  *******************************************************************************
10  ******************************************************************************/
11  
12  /*
13   * lowcomms.c
14   *
15   * This is the "low-level" comms layer.
16   *
17   * It is responsible for sending/receiving messages
18   * from other nodes in the cluster.
19   *
20   * Cluster nodes are referred to by their nodeids. nodeids are
21   * simply 32 bit numbers to the locking module - if they need to
22   * be expanded for the cluster infrastructure then that is its
23   * responsibility. It is this layer's
24   * responsibility to resolve these into IP address or
25   * whatever it needs for inter-node communication.
26   *
27   * The comms level is two kernel threads that deal mainly with
28   * the receiving of messages from other nodes and passing them
29   * up to the mid-level comms layer (which understands the
30   * message format) for execution by the locking core, and
31   * a send thread which does all the setting up of connections
32   * to remote nodes and the sending of data. Threads are not allowed
33   * to send their own data because it may cause them to wait in times
34   * of high load. Also, this way, the sending thread can collect together
35   * messages bound for one node and send them in one block.
36   *
37   * lowcomms will choose to use either TCP or SCTP as its transport layer
38   * depending on the configuration variable 'protocol'. This should be set
39   * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40   * cluster-wide mechanism as it must be the same on all nodes of the cluster
41   * for the DLM to function.
42   *
43   */
44  
45  #include <asm/ioctls.h>
46  #include <net/sock.h>
47  #include <net/tcp.h>
48  #include <linux/pagemap.h>
49  #include <linux/file.h>
50  #include <linux/mutex.h>
51  #include <linux/sctp.h>
52  #include <linux/slab.h>
53  #include <net/sctp/sctp.h>
54  #include <net/ipv6.h>
55  
56  #include <trace/events/dlm.h>
57  #include <trace/events/sock.h>
58  
59  #include "dlm_internal.h"
60  #include "lowcomms.h"
61  #include "midcomms.h"
62  #include "memory.h"
63  #include "config.h"
64  
65  #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
66  #define DLM_MAX_PROCESS_BUFFERS 24
67  #define NEEDED_RMEM (4*1024*1024)
68  
69  struct connection {
70  	struct socket *sock;	/* NULL if not connected */
71  	uint32_t nodeid;	/* So we know who we are in the list */
72  	/* this semaphore is used to allow parallel recv/send in read
73  	 * lock mode. When we release a sock we need to held the write lock.
74  	 *
75  	 * However this is locking code and not nice. When we remove the
76  	 * othercon handling we can look into other mechanism to synchronize
77  	 * io handling to call sock_release() at the right time.
78  	 */
79  	struct rw_semaphore sock_lock;
80  	unsigned long flags;
81  #define CF_APP_LIMITED 0
82  #define CF_RECV_PENDING 1
83  #define CF_SEND_PENDING 2
84  #define CF_RECV_INTR 3
85  #define CF_IO_STOP 4
86  #define CF_IS_OTHERCON 5
87  	struct list_head writequeue;  /* List of outgoing writequeue_entries */
88  	spinlock_t writequeue_lock;
89  	int retries;
90  	struct hlist_node list;
91  	/* due some connect()/accept() races we currently have this cross over
92  	 * connection attempt second connection for one node.
93  	 *
94  	 * There is a solution to avoid the race by introducing a connect
95  	 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
96  	 * connect. Otherside can connect but will only be considered that
97  	 * the other side wants to have a reconnect.
98  	 *
99  	 * However changing to this behaviour will break backwards compatible.
100  	 * In a DLM protocol major version upgrade we should remove this!
101  	 */
102  	struct connection *othercon;
103  	struct work_struct rwork; /* receive worker */
104  	struct work_struct swork; /* send worker */
105  	wait_queue_head_t shutdown_wait;
106  	unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
107  	int rx_leftover;
108  	int mark;
109  	int addr_count;
110  	int curr_addr_index;
111  	struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
112  	spinlock_t addrs_lock;
113  	struct rcu_head rcu;
114  };
115  #define sock2con(x) ((struct connection *)(x)->sk_user_data)
116  
117  struct listen_connection {
118  	struct socket *sock;
119  	struct work_struct rwork;
120  };
121  
122  #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
123  #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
124  
125  /* An entry waiting to be sent */
126  struct writequeue_entry {
127  	struct list_head list;
128  	struct page *page;
129  	int offset;
130  	int len;
131  	int end;
132  	int users;
133  	bool dirty;
134  	struct connection *con;
135  	struct list_head msgs;
136  	struct kref ref;
137  };
138  
139  struct dlm_msg {
140  	struct writequeue_entry *entry;
141  	struct dlm_msg *orig_msg;
142  	bool retransmit;
143  	void *ppc;
144  	int len;
145  	int idx; /* new()/commit() idx exchange */
146  
147  	struct list_head list;
148  	struct kref ref;
149  };
150  
151  struct processqueue_entry {
152  	unsigned char *buf;
153  	int nodeid;
154  	int buflen;
155  
156  	struct list_head list;
157  };
158  
159  struct dlm_proto_ops {
160  	bool try_new_addr;
161  	const char *name;
162  	int proto;
163  
164  	void (*sockopts)(struct socket *sock);
165  	int (*bind)(struct socket *sock);
166  	int (*listen_validate)(void);
167  	void (*listen_sockopts)(struct socket *sock);
168  	int (*listen_bind)(struct socket *sock);
169  };
170  
171  static struct listen_sock_callbacks {
172  	void (*sk_error_report)(struct sock *);
173  	void (*sk_data_ready)(struct sock *);
174  	void (*sk_state_change)(struct sock *);
175  	void (*sk_write_space)(struct sock *);
176  } listen_sock;
177  
178  static struct listen_connection listen_con;
179  static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
180  static int dlm_local_count;
181  
182  /* Work queues */
183  static struct workqueue_struct *io_workqueue;
184  static struct workqueue_struct *process_workqueue;
185  
186  static struct hlist_head connection_hash[CONN_HASH_SIZE];
187  static DEFINE_SPINLOCK(connections_lock);
188  DEFINE_STATIC_SRCU(connections_srcu);
189  
190  static const struct dlm_proto_ops *dlm_proto_ops;
191  
192  #define DLM_IO_SUCCESS 0
193  #define DLM_IO_END 1
194  #define DLM_IO_EOF 2
195  #define DLM_IO_RESCHED 3
196  #define DLM_IO_FLUSH 4
197  
198  static void process_recv_sockets(struct work_struct *work);
199  static void process_send_sockets(struct work_struct *work);
200  static void process_dlm_messages(struct work_struct *work);
201  
202  static DECLARE_WORK(process_work, process_dlm_messages);
203  static DEFINE_SPINLOCK(processqueue_lock);
204  static bool process_dlm_messages_pending;
205  static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
206  static atomic_t processqueue_count;
207  static LIST_HEAD(processqueue);
208  
dlm_lowcomms_is_running(void)209  bool dlm_lowcomms_is_running(void)
210  {
211  	return !!listen_con.sock;
212  }
213  
lowcomms_queue_swork(struct connection * con)214  static void lowcomms_queue_swork(struct connection *con)
215  {
216  	assert_spin_locked(&con->writequeue_lock);
217  
218  	if (!test_bit(CF_IO_STOP, &con->flags) &&
219  	    !test_bit(CF_APP_LIMITED, &con->flags) &&
220  	    !test_and_set_bit(CF_SEND_PENDING, &con->flags))
221  		queue_work(io_workqueue, &con->swork);
222  }
223  
lowcomms_queue_rwork(struct connection * con)224  static void lowcomms_queue_rwork(struct connection *con)
225  {
226  #ifdef CONFIG_LOCKDEP
227  	WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
228  #endif
229  
230  	if (!test_bit(CF_IO_STOP, &con->flags) &&
231  	    !test_and_set_bit(CF_RECV_PENDING, &con->flags))
232  		queue_work(io_workqueue, &con->rwork);
233  }
234  
writequeue_entry_ctor(void * data)235  static void writequeue_entry_ctor(void *data)
236  {
237  	struct writequeue_entry *entry = data;
238  
239  	INIT_LIST_HEAD(&entry->msgs);
240  }
241  
dlm_lowcomms_writequeue_cache_create(void)242  struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
243  {
244  	return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
245  				 0, 0, writequeue_entry_ctor);
246  }
247  
dlm_lowcomms_msg_cache_create(void)248  struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
249  {
250  	return KMEM_CACHE(dlm_msg, 0);
251  }
252  
253  /* need to held writequeue_lock */
con_next_wq(struct connection * con)254  static struct writequeue_entry *con_next_wq(struct connection *con)
255  {
256  	struct writequeue_entry *e;
257  
258  	e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
259  				     list);
260  	/* if len is zero nothing is to send, if there are users filling
261  	 * buffers we wait until the users are done so we can send more.
262  	 */
263  	if (!e || e->users || e->len == 0)
264  		return NULL;
265  
266  	return e;
267  }
268  
__find_con(int nodeid,int r)269  static struct connection *__find_con(int nodeid, int r)
270  {
271  	struct connection *con;
272  
273  	hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
274  		if (con->nodeid == nodeid)
275  			return con;
276  	}
277  
278  	return NULL;
279  }
280  
dlm_con_init(struct connection * con,int nodeid)281  static void dlm_con_init(struct connection *con, int nodeid)
282  {
283  	con->nodeid = nodeid;
284  	init_rwsem(&con->sock_lock);
285  	INIT_LIST_HEAD(&con->writequeue);
286  	spin_lock_init(&con->writequeue_lock);
287  	INIT_WORK(&con->swork, process_send_sockets);
288  	INIT_WORK(&con->rwork, process_recv_sockets);
289  	spin_lock_init(&con->addrs_lock);
290  	init_waitqueue_head(&con->shutdown_wait);
291  }
292  
293  /*
294   * If 'allocation' is zero then we don't attempt to create a new
295   * connection structure for this node.
296   */
nodeid2con(int nodeid,gfp_t alloc)297  static struct connection *nodeid2con(int nodeid, gfp_t alloc)
298  {
299  	struct connection *con, *tmp;
300  	int r;
301  
302  	r = nodeid_hash(nodeid);
303  	con = __find_con(nodeid, r);
304  	if (con || !alloc)
305  		return con;
306  
307  	con = kzalloc(sizeof(*con), alloc);
308  	if (!con)
309  		return NULL;
310  
311  	dlm_con_init(con, nodeid);
312  
313  	spin_lock(&connections_lock);
314  	/* Because multiple workqueues/threads calls this function it can
315  	 * race on multiple cpu's. Instead of locking hot path __find_con()
316  	 * we just check in rare cases of recently added nodes again
317  	 * under protection of connections_lock. If this is the case we
318  	 * abort our connection creation and return the existing connection.
319  	 */
320  	tmp = __find_con(nodeid, r);
321  	if (tmp) {
322  		spin_unlock(&connections_lock);
323  		kfree(con);
324  		return tmp;
325  	}
326  
327  	hlist_add_head_rcu(&con->list, &connection_hash[r]);
328  	spin_unlock(&connections_lock);
329  
330  	return con;
331  }
332  
addr_compare(const struct sockaddr_storage * x,const struct sockaddr_storage * y)333  static int addr_compare(const struct sockaddr_storage *x,
334  			const struct sockaddr_storage *y)
335  {
336  	switch (x->ss_family) {
337  	case AF_INET: {
338  		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
339  		struct sockaddr_in *siny = (struct sockaddr_in *)y;
340  		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
341  			return 0;
342  		if (sinx->sin_port != siny->sin_port)
343  			return 0;
344  		break;
345  	}
346  	case AF_INET6: {
347  		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
348  		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
349  		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
350  			return 0;
351  		if (sinx->sin6_port != siny->sin6_port)
352  			return 0;
353  		break;
354  	}
355  	default:
356  		return 0;
357  	}
358  	return 1;
359  }
360  
nodeid_to_addr(int nodeid,struct sockaddr_storage * sas_out,struct sockaddr * sa_out,bool try_new_addr,unsigned int * mark)361  static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
362  			  struct sockaddr *sa_out, bool try_new_addr,
363  			  unsigned int *mark)
364  {
365  	struct sockaddr_storage sas;
366  	struct connection *con;
367  	int idx;
368  
369  	if (!dlm_local_count)
370  		return -1;
371  
372  	idx = srcu_read_lock(&connections_srcu);
373  	con = nodeid2con(nodeid, 0);
374  	if (!con) {
375  		srcu_read_unlock(&connections_srcu, idx);
376  		return -ENOENT;
377  	}
378  
379  	spin_lock(&con->addrs_lock);
380  	if (!con->addr_count) {
381  		spin_unlock(&con->addrs_lock);
382  		srcu_read_unlock(&connections_srcu, idx);
383  		return -ENOENT;
384  	}
385  
386  	memcpy(&sas, &con->addr[con->curr_addr_index],
387  	       sizeof(struct sockaddr_storage));
388  
389  	if (try_new_addr) {
390  		con->curr_addr_index++;
391  		if (con->curr_addr_index == con->addr_count)
392  			con->curr_addr_index = 0;
393  	}
394  
395  	*mark = con->mark;
396  	spin_unlock(&con->addrs_lock);
397  
398  	if (sas_out)
399  		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
400  
401  	if (!sa_out) {
402  		srcu_read_unlock(&connections_srcu, idx);
403  		return 0;
404  	}
405  
406  	if (dlm_local_addr[0].ss_family == AF_INET) {
407  		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
408  		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
409  		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
410  	} else {
411  		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
412  		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
413  		ret6->sin6_addr = in6->sin6_addr;
414  	}
415  
416  	srcu_read_unlock(&connections_srcu, idx);
417  	return 0;
418  }
419  
addr_to_nodeid(struct sockaddr_storage * addr,int * nodeid,unsigned int * mark)420  static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
421  			  unsigned int *mark)
422  {
423  	struct connection *con;
424  	int i, idx, addr_i;
425  
426  	idx = srcu_read_lock(&connections_srcu);
427  	for (i = 0; i < CONN_HASH_SIZE; i++) {
428  		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
429  			WARN_ON_ONCE(!con->addr_count);
430  
431  			spin_lock(&con->addrs_lock);
432  			for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
433  				if (addr_compare(&con->addr[addr_i], addr)) {
434  					*nodeid = con->nodeid;
435  					*mark = con->mark;
436  					spin_unlock(&con->addrs_lock);
437  					srcu_read_unlock(&connections_srcu, idx);
438  					return 0;
439  				}
440  			}
441  			spin_unlock(&con->addrs_lock);
442  		}
443  	}
444  	srcu_read_unlock(&connections_srcu, idx);
445  
446  	return -ENOENT;
447  }
448  
dlm_lowcomms_con_has_addr(const struct connection * con,const struct sockaddr_storage * addr)449  static bool dlm_lowcomms_con_has_addr(const struct connection *con,
450  				      const struct sockaddr_storage *addr)
451  {
452  	int i;
453  
454  	for (i = 0; i < con->addr_count; i++) {
455  		if (addr_compare(&con->addr[i], addr))
456  			return true;
457  	}
458  
459  	return false;
460  }
461  
dlm_lowcomms_addr(int nodeid,struct sockaddr_storage * addr)462  int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr)
463  {
464  	struct connection *con;
465  	bool ret, idx;
466  
467  	idx = srcu_read_lock(&connections_srcu);
468  	con = nodeid2con(nodeid, GFP_NOFS);
469  	if (!con) {
470  		srcu_read_unlock(&connections_srcu, idx);
471  		return -ENOMEM;
472  	}
473  
474  	spin_lock(&con->addrs_lock);
475  	if (!con->addr_count) {
476  		memcpy(&con->addr[0], addr, sizeof(*addr));
477  		con->addr_count = 1;
478  		con->mark = dlm_config.ci_mark;
479  		spin_unlock(&con->addrs_lock);
480  		srcu_read_unlock(&connections_srcu, idx);
481  		return 0;
482  	}
483  
484  	ret = dlm_lowcomms_con_has_addr(con, addr);
485  	if (ret) {
486  		spin_unlock(&con->addrs_lock);
487  		srcu_read_unlock(&connections_srcu, idx);
488  		return -EEXIST;
489  	}
490  
491  	if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
492  		spin_unlock(&con->addrs_lock);
493  		srcu_read_unlock(&connections_srcu, idx);
494  		return -ENOSPC;
495  	}
496  
497  	memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
498  	srcu_read_unlock(&connections_srcu, idx);
499  	spin_unlock(&con->addrs_lock);
500  	return 0;
501  }
502  
503  /* Data available on socket or listen socket received a connect */
lowcomms_data_ready(struct sock * sk)504  static void lowcomms_data_ready(struct sock *sk)
505  {
506  	struct connection *con = sock2con(sk);
507  
508  	trace_sk_data_ready(sk);
509  
510  	set_bit(CF_RECV_INTR, &con->flags);
511  	lowcomms_queue_rwork(con);
512  }
513  
lowcomms_write_space(struct sock * sk)514  static void lowcomms_write_space(struct sock *sk)
515  {
516  	struct connection *con = sock2con(sk);
517  
518  	clear_bit(SOCK_NOSPACE, &con->sock->flags);
519  
520  	spin_lock_bh(&con->writequeue_lock);
521  	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
522  		con->sock->sk->sk_write_pending--;
523  		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
524  	}
525  
526  	lowcomms_queue_swork(con);
527  	spin_unlock_bh(&con->writequeue_lock);
528  }
529  
lowcomms_state_change(struct sock * sk)530  static void lowcomms_state_change(struct sock *sk)
531  {
532  	/* SCTP layer is not calling sk_data_ready when the connection
533  	 * is done, so we catch the signal through here.
534  	 */
535  	if (sk->sk_shutdown == RCV_SHUTDOWN)
536  		lowcomms_data_ready(sk);
537  }
538  
lowcomms_listen_data_ready(struct sock * sk)539  static void lowcomms_listen_data_ready(struct sock *sk)
540  {
541  	trace_sk_data_ready(sk);
542  
543  	queue_work(io_workqueue, &listen_con.rwork);
544  }
545  
dlm_lowcomms_connect_node(int nodeid)546  int dlm_lowcomms_connect_node(int nodeid)
547  {
548  	struct connection *con;
549  	int idx;
550  
551  	idx = srcu_read_lock(&connections_srcu);
552  	con = nodeid2con(nodeid, 0);
553  	if (WARN_ON_ONCE(!con)) {
554  		srcu_read_unlock(&connections_srcu, idx);
555  		return -ENOENT;
556  	}
557  
558  	down_read(&con->sock_lock);
559  	if (!con->sock) {
560  		spin_lock_bh(&con->writequeue_lock);
561  		lowcomms_queue_swork(con);
562  		spin_unlock_bh(&con->writequeue_lock);
563  	}
564  	up_read(&con->sock_lock);
565  	srcu_read_unlock(&connections_srcu, idx);
566  
567  	cond_resched();
568  	return 0;
569  }
570  
dlm_lowcomms_nodes_set_mark(int nodeid,unsigned int mark)571  int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
572  {
573  	struct connection *con;
574  	int idx;
575  
576  	idx = srcu_read_lock(&connections_srcu);
577  	con = nodeid2con(nodeid, 0);
578  	if (!con) {
579  		srcu_read_unlock(&connections_srcu, idx);
580  		return -ENOENT;
581  	}
582  
583  	spin_lock(&con->addrs_lock);
584  	con->mark = mark;
585  	spin_unlock(&con->addrs_lock);
586  	srcu_read_unlock(&connections_srcu, idx);
587  	return 0;
588  }
589  
lowcomms_error_report(struct sock * sk)590  static void lowcomms_error_report(struct sock *sk)
591  {
592  	struct connection *con = sock2con(sk);
593  	struct inet_sock *inet;
594  
595  	inet = inet_sk(sk);
596  	switch (sk->sk_family) {
597  	case AF_INET:
598  		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
599  				   "sending to node %d at %pI4, dport %d, "
600  				   "sk_err=%d/%d\n", dlm_our_nodeid(),
601  				   con->nodeid, &inet->inet_daddr,
602  				   ntohs(inet->inet_dport), sk->sk_err,
603  				   READ_ONCE(sk->sk_err_soft));
604  		break;
605  #if IS_ENABLED(CONFIG_IPV6)
606  	case AF_INET6:
607  		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
608  				   "sending to node %d at %pI6c, "
609  				   "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
610  				   con->nodeid, &sk->sk_v6_daddr,
611  				   ntohs(inet->inet_dport), sk->sk_err,
612  				   READ_ONCE(sk->sk_err_soft));
613  		break;
614  #endif
615  	default:
616  		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
617  				   "invalid socket family %d set, "
618  				   "sk_err=%d/%d\n", dlm_our_nodeid(),
619  				   sk->sk_family, sk->sk_err,
620  				   READ_ONCE(sk->sk_err_soft));
621  		break;
622  	}
623  
624  	dlm_midcomms_unack_msg_resend(con->nodeid);
625  
626  	listen_sock.sk_error_report(sk);
627  }
628  
restore_callbacks(struct sock * sk)629  static void restore_callbacks(struct sock *sk)
630  {
631  #ifdef CONFIG_LOCKDEP
632  	WARN_ON_ONCE(!lockdep_sock_is_held(sk));
633  #endif
634  
635  	sk->sk_user_data = NULL;
636  	sk->sk_data_ready = listen_sock.sk_data_ready;
637  	sk->sk_state_change = listen_sock.sk_state_change;
638  	sk->sk_write_space = listen_sock.sk_write_space;
639  	sk->sk_error_report = listen_sock.sk_error_report;
640  }
641  
642  /* Make a socket active */
add_sock(struct socket * sock,struct connection * con)643  static void add_sock(struct socket *sock, struct connection *con)
644  {
645  	struct sock *sk = sock->sk;
646  
647  	lock_sock(sk);
648  	con->sock = sock;
649  
650  	sk->sk_user_data = con;
651  	sk->sk_data_ready = lowcomms_data_ready;
652  	sk->sk_write_space = lowcomms_write_space;
653  	if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
654  		sk->sk_state_change = lowcomms_state_change;
655  	sk->sk_allocation = GFP_NOFS;
656  	sk->sk_use_task_frag = false;
657  	sk->sk_error_report = lowcomms_error_report;
658  	release_sock(sk);
659  }
660  
661  /* Add the port number to an IPv6 or 4 sockaddr and return the address
662     length */
make_sockaddr(struct sockaddr_storage * saddr,uint16_t port,int * addr_len)663  static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
664  			  int *addr_len)
665  {
666  	saddr->ss_family =  dlm_local_addr[0].ss_family;
667  	if (saddr->ss_family == AF_INET) {
668  		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
669  		in4_addr->sin_port = cpu_to_be16(port);
670  		*addr_len = sizeof(struct sockaddr_in);
671  		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
672  	} else {
673  		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
674  		in6_addr->sin6_port = cpu_to_be16(port);
675  		*addr_len = sizeof(struct sockaddr_in6);
676  	}
677  	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
678  }
679  
dlm_page_release(struct kref * kref)680  static void dlm_page_release(struct kref *kref)
681  {
682  	struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
683  						  ref);
684  
685  	__free_page(e->page);
686  	dlm_free_writequeue(e);
687  }
688  
dlm_msg_release(struct kref * kref)689  static void dlm_msg_release(struct kref *kref)
690  {
691  	struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
692  
693  	kref_put(&msg->entry->ref, dlm_page_release);
694  	dlm_free_msg(msg);
695  }
696  
free_entry(struct writequeue_entry * e)697  static void free_entry(struct writequeue_entry *e)
698  {
699  	struct dlm_msg *msg, *tmp;
700  
701  	list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
702  		if (msg->orig_msg) {
703  			msg->orig_msg->retransmit = false;
704  			kref_put(&msg->orig_msg->ref, dlm_msg_release);
705  		}
706  
707  		list_del(&msg->list);
708  		kref_put(&msg->ref, dlm_msg_release);
709  	}
710  
711  	list_del(&e->list);
712  	kref_put(&e->ref, dlm_page_release);
713  }
714  
dlm_close_sock(struct socket ** sock)715  static void dlm_close_sock(struct socket **sock)
716  {
717  	lock_sock((*sock)->sk);
718  	restore_callbacks((*sock)->sk);
719  	release_sock((*sock)->sk);
720  
721  	sock_release(*sock);
722  	*sock = NULL;
723  }
724  
allow_connection_io(struct connection * con)725  static void allow_connection_io(struct connection *con)
726  {
727  	if (con->othercon)
728  		clear_bit(CF_IO_STOP, &con->othercon->flags);
729  	clear_bit(CF_IO_STOP, &con->flags);
730  }
731  
stop_connection_io(struct connection * con)732  static void stop_connection_io(struct connection *con)
733  {
734  	if (con->othercon)
735  		stop_connection_io(con->othercon);
736  
737  	spin_lock_bh(&con->writequeue_lock);
738  	set_bit(CF_IO_STOP, &con->flags);
739  	spin_unlock_bh(&con->writequeue_lock);
740  
741  	down_write(&con->sock_lock);
742  	if (con->sock) {
743  		lock_sock(con->sock->sk);
744  		restore_callbacks(con->sock->sk);
745  		release_sock(con->sock->sk);
746  	}
747  	up_write(&con->sock_lock);
748  
749  	cancel_work_sync(&con->swork);
750  	cancel_work_sync(&con->rwork);
751  }
752  
753  /* Close a remote connection and tidy up */
close_connection(struct connection * con,bool and_other)754  static void close_connection(struct connection *con, bool and_other)
755  {
756  	struct writequeue_entry *e;
757  
758  	if (con->othercon && and_other)
759  		close_connection(con->othercon, false);
760  
761  	down_write(&con->sock_lock);
762  	if (!con->sock) {
763  		up_write(&con->sock_lock);
764  		return;
765  	}
766  
767  	dlm_close_sock(&con->sock);
768  
769  	/* if we send a writequeue entry only a half way, we drop the
770  	 * whole entry because reconnection and that we not start of the
771  	 * middle of a msg which will confuse the other end.
772  	 *
773  	 * we can always drop messages because retransmits, but what we
774  	 * cannot allow is to transmit half messages which may be processed
775  	 * at the other side.
776  	 *
777  	 * our policy is to start on a clean state when disconnects, we don't
778  	 * know what's send/received on transport layer in this case.
779  	 */
780  	spin_lock_bh(&con->writequeue_lock);
781  	if (!list_empty(&con->writequeue)) {
782  		e = list_first_entry(&con->writequeue, struct writequeue_entry,
783  				     list);
784  		if (e->dirty)
785  			free_entry(e);
786  	}
787  	spin_unlock_bh(&con->writequeue_lock);
788  
789  	con->rx_leftover = 0;
790  	con->retries = 0;
791  	clear_bit(CF_APP_LIMITED, &con->flags);
792  	clear_bit(CF_RECV_PENDING, &con->flags);
793  	clear_bit(CF_SEND_PENDING, &con->flags);
794  	up_write(&con->sock_lock);
795  }
796  
shutdown_connection(struct connection * con,bool and_other)797  static void shutdown_connection(struct connection *con, bool and_other)
798  {
799  	int ret;
800  
801  	if (con->othercon && and_other)
802  		shutdown_connection(con->othercon, false);
803  
804  	flush_workqueue(io_workqueue);
805  	down_read(&con->sock_lock);
806  	/* nothing to shutdown */
807  	if (!con->sock) {
808  		up_read(&con->sock_lock);
809  		return;
810  	}
811  
812  	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
813  	up_read(&con->sock_lock);
814  	if (ret) {
815  		log_print("Connection %p failed to shutdown: %d will force close",
816  			  con, ret);
817  		goto force_close;
818  	} else {
819  		ret = wait_event_timeout(con->shutdown_wait, !con->sock,
820  					 DLM_SHUTDOWN_WAIT_TIMEOUT);
821  		if (ret == 0) {
822  			log_print("Connection %p shutdown timed out, will force close",
823  				  con);
824  			goto force_close;
825  		}
826  	}
827  
828  	return;
829  
830  force_close:
831  	close_connection(con, false);
832  }
833  
new_processqueue_entry(int nodeid,int buflen)834  static struct processqueue_entry *new_processqueue_entry(int nodeid,
835  							 int buflen)
836  {
837  	struct processqueue_entry *pentry;
838  
839  	pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
840  	if (!pentry)
841  		return NULL;
842  
843  	pentry->buf = kmalloc(buflen, GFP_NOFS);
844  	if (!pentry->buf) {
845  		kfree(pentry);
846  		return NULL;
847  	}
848  
849  	pentry->nodeid = nodeid;
850  	return pentry;
851  }
852  
free_processqueue_entry(struct processqueue_entry * pentry)853  static void free_processqueue_entry(struct processqueue_entry *pentry)
854  {
855  	kfree(pentry->buf);
856  	kfree(pentry);
857  }
858  
process_dlm_messages(struct work_struct * work)859  static void process_dlm_messages(struct work_struct *work)
860  {
861  	struct processqueue_entry *pentry;
862  
863  	spin_lock_bh(&processqueue_lock);
864  	pentry = list_first_entry_or_null(&processqueue,
865  					  struct processqueue_entry, list);
866  	if (WARN_ON_ONCE(!pentry)) {
867  		process_dlm_messages_pending = false;
868  		spin_unlock_bh(&processqueue_lock);
869  		return;
870  	}
871  
872  	list_del(&pentry->list);
873  	if (atomic_dec_and_test(&processqueue_count))
874  		wake_up(&processqueue_wq);
875  	spin_unlock_bh(&processqueue_lock);
876  
877  	for (;;) {
878  		dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
879  					    pentry->buflen);
880  		free_processqueue_entry(pentry);
881  
882  		spin_lock_bh(&processqueue_lock);
883  		pentry = list_first_entry_or_null(&processqueue,
884  						  struct processqueue_entry, list);
885  		if (!pentry) {
886  			process_dlm_messages_pending = false;
887  			spin_unlock_bh(&processqueue_lock);
888  			break;
889  		}
890  
891  		list_del(&pentry->list);
892  		if (atomic_dec_and_test(&processqueue_count))
893  			wake_up(&processqueue_wq);
894  		spin_unlock_bh(&processqueue_lock);
895  	}
896  }
897  
898  /* Data received from remote end */
receive_from_sock(struct connection * con,int buflen)899  static int receive_from_sock(struct connection *con, int buflen)
900  {
901  	struct processqueue_entry *pentry;
902  	int ret, buflen_real;
903  	struct msghdr msg;
904  	struct kvec iov;
905  
906  	pentry = new_processqueue_entry(con->nodeid, buflen);
907  	if (!pentry)
908  		return DLM_IO_RESCHED;
909  
910  	memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
911  
912  	/* calculate new buffer parameter regarding last receive and
913  	 * possible leftover bytes
914  	 */
915  	iov.iov_base = pentry->buf + con->rx_leftover;
916  	iov.iov_len = buflen - con->rx_leftover;
917  
918  	memset(&msg, 0, sizeof(msg));
919  	msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
920  	clear_bit(CF_RECV_INTR, &con->flags);
921  again:
922  	ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
923  			     msg.msg_flags);
924  	trace_dlm_recv(con->nodeid, ret);
925  	if (ret == -EAGAIN) {
926  		lock_sock(con->sock->sk);
927  		if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
928  			release_sock(con->sock->sk);
929  			goto again;
930  		}
931  
932  		clear_bit(CF_RECV_PENDING, &con->flags);
933  		release_sock(con->sock->sk);
934  		free_processqueue_entry(pentry);
935  		return DLM_IO_END;
936  	} else if (ret == 0) {
937  		/* close will clear CF_RECV_PENDING */
938  		free_processqueue_entry(pentry);
939  		return DLM_IO_EOF;
940  	} else if (ret < 0) {
941  		free_processqueue_entry(pentry);
942  		return ret;
943  	}
944  
945  	/* new buflen according readed bytes and leftover from last receive */
946  	buflen_real = ret + con->rx_leftover;
947  	ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
948  					   buflen_real);
949  	if (ret < 0) {
950  		free_processqueue_entry(pentry);
951  		return ret;
952  	}
953  
954  	pentry->buflen = ret;
955  
956  	/* calculate leftover bytes from process and put it into begin of
957  	 * the receive buffer, so next receive we have the full message
958  	 * at the start address of the receive buffer.
959  	 */
960  	con->rx_leftover = buflen_real - ret;
961  	memmove(con->rx_leftover_buf, pentry->buf + ret,
962  		con->rx_leftover);
963  
964  	spin_lock_bh(&processqueue_lock);
965  	ret = atomic_inc_return(&processqueue_count);
966  	list_add_tail(&pentry->list, &processqueue);
967  	if (!process_dlm_messages_pending) {
968  		process_dlm_messages_pending = true;
969  		queue_work(process_workqueue, &process_work);
970  	}
971  	spin_unlock_bh(&processqueue_lock);
972  
973  	if (ret > DLM_MAX_PROCESS_BUFFERS)
974  		return DLM_IO_FLUSH;
975  
976  	return DLM_IO_SUCCESS;
977  }
978  
979  /* Listening socket is busy, accept a connection */
accept_from_sock(void)980  static int accept_from_sock(void)
981  {
982  	struct sockaddr_storage peeraddr;
983  	int len, idx, result, nodeid;
984  	struct connection *newcon;
985  	struct socket *newsock;
986  	unsigned int mark;
987  
988  	result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
989  	if (result == -EAGAIN)
990  		return DLM_IO_END;
991  	else if (result < 0)
992  		goto accept_err;
993  
994  	/* Get the connected socket's peer */
995  	memset(&peeraddr, 0, sizeof(peeraddr));
996  	len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
997  	if (len < 0) {
998  		result = -ECONNABORTED;
999  		goto accept_err;
1000  	}
1001  
1002  	/* Get the new node's NODEID */
1003  	make_sockaddr(&peeraddr, 0, &len);
1004  	if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
1005  		switch (peeraddr.ss_family) {
1006  		case AF_INET: {
1007  			struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
1008  
1009  			log_print("connect from non cluster IPv4 node %pI4",
1010  				  &sin->sin_addr);
1011  			break;
1012  		}
1013  #if IS_ENABLED(CONFIG_IPV6)
1014  		case AF_INET6: {
1015  			struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
1016  
1017  			log_print("connect from non cluster IPv6 node %pI6c",
1018  				  &sin6->sin6_addr);
1019  			break;
1020  		}
1021  #endif
1022  		default:
1023  			log_print("invalid family from non cluster node");
1024  			break;
1025  		}
1026  
1027  		sock_release(newsock);
1028  		return -1;
1029  	}
1030  
1031  	log_print("got connection from %d", nodeid);
1032  
1033  	/*  Check to see if we already have a connection to this node. This
1034  	 *  could happen if the two nodes initiate a connection at roughly
1035  	 *  the same time and the connections cross on the wire.
1036  	 *  In this case we store the incoming one in "othercon"
1037  	 */
1038  	idx = srcu_read_lock(&connections_srcu);
1039  	newcon = nodeid2con(nodeid, 0);
1040  	if (WARN_ON_ONCE(!newcon)) {
1041  		srcu_read_unlock(&connections_srcu, idx);
1042  		result = -ENOENT;
1043  		goto accept_err;
1044  	}
1045  
1046  	sock_set_mark(newsock->sk, mark);
1047  
1048  	down_write(&newcon->sock_lock);
1049  	if (newcon->sock) {
1050  		struct connection *othercon = newcon->othercon;
1051  
1052  		if (!othercon) {
1053  			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
1054  			if (!othercon) {
1055  				log_print("failed to allocate incoming socket");
1056  				up_write(&newcon->sock_lock);
1057  				srcu_read_unlock(&connections_srcu, idx);
1058  				result = -ENOMEM;
1059  				goto accept_err;
1060  			}
1061  
1062  			dlm_con_init(othercon, nodeid);
1063  			lockdep_set_subclass(&othercon->sock_lock, 1);
1064  			newcon->othercon = othercon;
1065  			set_bit(CF_IS_OTHERCON, &othercon->flags);
1066  		} else {
1067  			/* close other sock con if we have something new */
1068  			close_connection(othercon, false);
1069  		}
1070  
1071  		down_write(&othercon->sock_lock);
1072  		add_sock(newsock, othercon);
1073  
1074  		/* check if we receved something while adding */
1075  		lock_sock(othercon->sock->sk);
1076  		lowcomms_queue_rwork(othercon);
1077  		release_sock(othercon->sock->sk);
1078  		up_write(&othercon->sock_lock);
1079  	}
1080  	else {
1081  		/* accept copies the sk after we've saved the callbacks, so we
1082  		   don't want to save them a second time or comm errors will
1083  		   result in calling sk_error_report recursively. */
1084  		add_sock(newsock, newcon);
1085  
1086  		/* check if we receved something while adding */
1087  		lock_sock(newcon->sock->sk);
1088  		lowcomms_queue_rwork(newcon);
1089  		release_sock(newcon->sock->sk);
1090  	}
1091  	up_write(&newcon->sock_lock);
1092  	srcu_read_unlock(&connections_srcu, idx);
1093  
1094  	return DLM_IO_SUCCESS;
1095  
1096  accept_err:
1097  	if (newsock)
1098  		sock_release(newsock);
1099  
1100  	return result;
1101  }
1102  
1103  /*
1104   * writequeue_entry_complete - try to delete and free write queue entry
1105   * @e: write queue entry to try to delete
1106   * @completed: bytes completed
1107   *
1108   * writequeue_lock must be held.
1109   */
writequeue_entry_complete(struct writequeue_entry * e,int completed)1110  static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1111  {
1112  	e->offset += completed;
1113  	e->len -= completed;
1114  	/* signal that page was half way transmitted */
1115  	e->dirty = true;
1116  
1117  	if (e->len == 0 && e->users == 0)
1118  		free_entry(e);
1119  }
1120  
1121  /*
1122   * sctp_bind_addrs - bind a SCTP socket to all our addresses
1123   */
sctp_bind_addrs(struct socket * sock,uint16_t port)1124  static int sctp_bind_addrs(struct socket *sock, uint16_t port)
1125  {
1126  	struct sockaddr_storage localaddr;
1127  	struct sockaddr *addr = (struct sockaddr *)&localaddr;
1128  	int i, addr_len, result = 0;
1129  
1130  	for (i = 0; i < dlm_local_count; i++) {
1131  		memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
1132  		make_sockaddr(&localaddr, port, &addr_len);
1133  
1134  		if (!i)
1135  			result = kernel_bind(sock, addr, addr_len);
1136  		else
1137  			result = sock_bind_add(sock->sk, addr, addr_len);
1138  
1139  		if (result < 0) {
1140  			log_print("Can't bind to %d addr number %d, %d.\n",
1141  				  port, i + 1, result);
1142  			break;
1143  		}
1144  	}
1145  	return result;
1146  }
1147  
1148  /* Get local addresses */
init_local(void)1149  static void init_local(void)
1150  {
1151  	struct sockaddr_storage sas;
1152  	int i;
1153  
1154  	dlm_local_count = 0;
1155  	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1156  		if (dlm_our_addr(&sas, i))
1157  			break;
1158  
1159  		memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
1160  	}
1161  }
1162  
new_writequeue_entry(struct connection * con)1163  static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1164  {
1165  	struct writequeue_entry *entry;
1166  
1167  	entry = dlm_allocate_writequeue();
1168  	if (!entry)
1169  		return NULL;
1170  
1171  	entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
1172  	if (!entry->page) {
1173  		dlm_free_writequeue(entry);
1174  		return NULL;
1175  	}
1176  
1177  	entry->offset = 0;
1178  	entry->len = 0;
1179  	entry->end = 0;
1180  	entry->dirty = false;
1181  	entry->con = con;
1182  	entry->users = 1;
1183  	kref_init(&entry->ref);
1184  	return entry;
1185  }
1186  
new_wq_entry(struct connection * con,int len,char ** ppc,void (* cb)(void * data),void * data)1187  static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1188  					     char **ppc, void (*cb)(void *data),
1189  					     void *data)
1190  {
1191  	struct writequeue_entry *e;
1192  
1193  	spin_lock_bh(&con->writequeue_lock);
1194  	if (!list_empty(&con->writequeue)) {
1195  		e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1196  		if (DLM_WQ_REMAIN_BYTES(e) >= len) {
1197  			kref_get(&e->ref);
1198  
1199  			*ppc = page_address(e->page) + e->end;
1200  			if (cb)
1201  				cb(data);
1202  
1203  			e->end += len;
1204  			e->users++;
1205  			goto out;
1206  		}
1207  	}
1208  
1209  	e = new_writequeue_entry(con);
1210  	if (!e)
1211  		goto out;
1212  
1213  	kref_get(&e->ref);
1214  	*ppc = page_address(e->page);
1215  	e->end += len;
1216  	if (cb)
1217  		cb(data);
1218  
1219  	list_add_tail(&e->list, &con->writequeue);
1220  
1221  out:
1222  	spin_unlock_bh(&con->writequeue_lock);
1223  	return e;
1224  };
1225  
dlm_lowcomms_new_msg_con(struct connection * con,int len,char ** ppc,void (* cb)(void * data),void * data)1226  static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1227  						char **ppc, void (*cb)(void *data),
1228  						void *data)
1229  {
1230  	struct writequeue_entry *e;
1231  	struct dlm_msg *msg;
1232  
1233  	msg = dlm_allocate_msg();
1234  	if (!msg)
1235  		return NULL;
1236  
1237  	kref_init(&msg->ref);
1238  
1239  	e = new_wq_entry(con, len, ppc, cb, data);
1240  	if (!e) {
1241  		dlm_free_msg(msg);
1242  		return NULL;
1243  	}
1244  
1245  	msg->retransmit = false;
1246  	msg->orig_msg = NULL;
1247  	msg->ppc = *ppc;
1248  	msg->len = len;
1249  	msg->entry = e;
1250  
1251  	return msg;
1252  }
1253  
1254  /* avoid false positive for nodes_srcu, unlock happens in
1255   * dlm_lowcomms_commit_msg which is a must call if success
1256   */
1257  #ifndef __CHECKER__
dlm_lowcomms_new_msg(int nodeid,int len,char ** ppc,void (* cb)(void * data),void * data)1258  struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
1259  				     void (*cb)(void *data), void *data)
1260  {
1261  	struct connection *con;
1262  	struct dlm_msg *msg;
1263  	int idx;
1264  
1265  	if (len > DLM_MAX_SOCKET_BUFSIZE ||
1266  	    len < sizeof(struct dlm_header)) {
1267  		BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
1268  		log_print("failed to allocate a buffer of size %d", len);
1269  		WARN_ON_ONCE(1);
1270  		return NULL;
1271  	}
1272  
1273  	idx = srcu_read_lock(&connections_srcu);
1274  	con = nodeid2con(nodeid, 0);
1275  	if (WARN_ON_ONCE(!con)) {
1276  		srcu_read_unlock(&connections_srcu, idx);
1277  		return NULL;
1278  	}
1279  
1280  	msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
1281  	if (!msg) {
1282  		srcu_read_unlock(&connections_srcu, idx);
1283  		return NULL;
1284  	}
1285  
1286  	/* for dlm_lowcomms_commit_msg() */
1287  	kref_get(&msg->ref);
1288  	/* we assume if successful commit must called */
1289  	msg->idx = idx;
1290  	return msg;
1291  }
1292  #endif
1293  
_dlm_lowcomms_commit_msg(struct dlm_msg * msg)1294  static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1295  {
1296  	struct writequeue_entry *e = msg->entry;
1297  	struct connection *con = e->con;
1298  	int users;
1299  
1300  	spin_lock_bh(&con->writequeue_lock);
1301  	kref_get(&msg->ref);
1302  	list_add(&msg->list, &e->msgs);
1303  
1304  	users = --e->users;
1305  	if (users)
1306  		goto out;
1307  
1308  	e->len = DLM_WQ_LENGTH_BYTES(e);
1309  
1310  	lowcomms_queue_swork(con);
1311  
1312  out:
1313  	spin_unlock_bh(&con->writequeue_lock);
1314  	return;
1315  }
1316  
1317  /* avoid false positive for nodes_srcu, lock was happen in
1318   * dlm_lowcomms_new_msg
1319   */
1320  #ifndef __CHECKER__
dlm_lowcomms_commit_msg(struct dlm_msg * msg)1321  void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1322  {
1323  	_dlm_lowcomms_commit_msg(msg);
1324  	srcu_read_unlock(&connections_srcu, msg->idx);
1325  	/* because dlm_lowcomms_new_msg() */
1326  	kref_put(&msg->ref, dlm_msg_release);
1327  }
1328  #endif
1329  
dlm_lowcomms_put_msg(struct dlm_msg * msg)1330  void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1331  {
1332  	kref_put(&msg->ref, dlm_msg_release);
1333  }
1334  
1335  /* does not held connections_srcu, usage lowcomms_error_report only */
dlm_lowcomms_resend_msg(struct dlm_msg * msg)1336  int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1337  {
1338  	struct dlm_msg *msg_resend;
1339  	char *ppc;
1340  
1341  	if (msg->retransmit)
1342  		return 1;
1343  
1344  	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
1345  					      NULL, NULL);
1346  	if (!msg_resend)
1347  		return -ENOMEM;
1348  
1349  	msg->retransmit = true;
1350  	kref_get(&msg->ref);
1351  	msg_resend->orig_msg = msg;
1352  
1353  	memcpy(ppc, msg->ppc, msg->len);
1354  	_dlm_lowcomms_commit_msg(msg_resend);
1355  	dlm_lowcomms_put_msg(msg_resend);
1356  
1357  	return 0;
1358  }
1359  
1360  /* Send a message */
send_to_sock(struct connection * con)1361  static int send_to_sock(struct connection *con)
1362  {
1363  	struct writequeue_entry *e;
1364  	struct bio_vec bvec;
1365  	struct msghdr msg = {
1366  		.msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
1367  	};
1368  	int len, offset, ret;
1369  
1370  	spin_lock_bh(&con->writequeue_lock);
1371  	e = con_next_wq(con);
1372  	if (!e) {
1373  		clear_bit(CF_SEND_PENDING, &con->flags);
1374  		spin_unlock_bh(&con->writequeue_lock);
1375  		return DLM_IO_END;
1376  	}
1377  
1378  	len = e->len;
1379  	offset = e->offset;
1380  	WARN_ON_ONCE(len == 0 && e->users == 0);
1381  	spin_unlock_bh(&con->writequeue_lock);
1382  
1383  	bvec_set_page(&bvec, e->page, len, offset);
1384  	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
1385  	ret = sock_sendmsg(con->sock, &msg);
1386  	trace_dlm_send(con->nodeid, ret);
1387  	if (ret == -EAGAIN || ret == 0) {
1388  		lock_sock(con->sock->sk);
1389  		spin_lock_bh(&con->writequeue_lock);
1390  		if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1391  		    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1392  			/* Notify TCP that we're limited by the
1393  			 * application window size.
1394  			 */
1395  			set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
1396  			con->sock->sk->sk_write_pending++;
1397  
1398  			clear_bit(CF_SEND_PENDING, &con->flags);
1399  			spin_unlock_bh(&con->writequeue_lock);
1400  			release_sock(con->sock->sk);
1401  
1402  			/* wait for write_space() event */
1403  			return DLM_IO_END;
1404  		}
1405  		spin_unlock_bh(&con->writequeue_lock);
1406  		release_sock(con->sock->sk);
1407  
1408  		return DLM_IO_RESCHED;
1409  	} else if (ret < 0) {
1410  		return ret;
1411  	}
1412  
1413  	spin_lock_bh(&con->writequeue_lock);
1414  	writequeue_entry_complete(e, ret);
1415  	spin_unlock_bh(&con->writequeue_lock);
1416  
1417  	return DLM_IO_SUCCESS;
1418  }
1419  
clean_one_writequeue(struct connection * con)1420  static void clean_one_writequeue(struct connection *con)
1421  {
1422  	struct writequeue_entry *e, *safe;
1423  
1424  	spin_lock_bh(&con->writequeue_lock);
1425  	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1426  		free_entry(e);
1427  	}
1428  	spin_unlock_bh(&con->writequeue_lock);
1429  }
1430  
connection_release(struct rcu_head * rcu)1431  static void connection_release(struct rcu_head *rcu)
1432  {
1433  	struct connection *con = container_of(rcu, struct connection, rcu);
1434  
1435  	WARN_ON_ONCE(!list_empty(&con->writequeue));
1436  	WARN_ON_ONCE(con->sock);
1437  	kfree(con);
1438  }
1439  
1440  /* Called from recovery when it knows that a node has
1441     left the cluster */
dlm_lowcomms_close(int nodeid)1442  int dlm_lowcomms_close(int nodeid)
1443  {
1444  	struct connection *con;
1445  	int idx;
1446  
1447  	log_print("closing connection to node %d", nodeid);
1448  
1449  	idx = srcu_read_lock(&connections_srcu);
1450  	con = nodeid2con(nodeid, 0);
1451  	if (WARN_ON_ONCE(!con)) {
1452  		srcu_read_unlock(&connections_srcu, idx);
1453  		return -ENOENT;
1454  	}
1455  
1456  	stop_connection_io(con);
1457  	log_print("io handling for node: %d stopped", nodeid);
1458  	close_connection(con, true);
1459  
1460  	spin_lock(&connections_lock);
1461  	hlist_del_rcu(&con->list);
1462  	spin_unlock(&connections_lock);
1463  
1464  	clean_one_writequeue(con);
1465  	call_srcu(&connections_srcu, &con->rcu, connection_release);
1466  	if (con->othercon) {
1467  		clean_one_writequeue(con->othercon);
1468  		call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
1469  	}
1470  	srcu_read_unlock(&connections_srcu, idx);
1471  
1472  	/* for debugging we print when we are done to compare with other
1473  	 * messages in between. This function need to be correctly synchronized
1474  	 * with io handling
1475  	 */
1476  	log_print("closing connection to node %d done", nodeid);
1477  
1478  	return 0;
1479  }
1480  
1481  /* Receive worker function */
process_recv_sockets(struct work_struct * work)1482  static void process_recv_sockets(struct work_struct *work)
1483  {
1484  	struct connection *con = container_of(work, struct connection, rwork);
1485  	int ret, buflen;
1486  
1487  	down_read(&con->sock_lock);
1488  	if (!con->sock) {
1489  		up_read(&con->sock_lock);
1490  		return;
1491  	}
1492  
1493  	buflen = READ_ONCE(dlm_config.ci_buffer_size);
1494  	do {
1495  		ret = receive_from_sock(con, buflen);
1496  	} while (ret == DLM_IO_SUCCESS);
1497  	up_read(&con->sock_lock);
1498  
1499  	switch (ret) {
1500  	case DLM_IO_END:
1501  		/* CF_RECV_PENDING cleared */
1502  		break;
1503  	case DLM_IO_EOF:
1504  		close_connection(con, false);
1505  		wake_up(&con->shutdown_wait);
1506  		/* CF_RECV_PENDING cleared */
1507  		break;
1508  	case DLM_IO_FLUSH:
1509  		/* we can't flush the process_workqueue here because a
1510  		 * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
1511  		 * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
1512  		 * we have a waitqueue to wait until all messages are
1513  		 * processed.
1514  		 *
1515  		 * This handling is only necessary to backoff the sender and
1516  		 * not queue all messages from the socket layer into DLM
1517  		 * processqueue. When DLM is capable to parse multiple messages
1518  		 * on an e.g. per socket basis this handling can might be
1519  		 * removed. Especially in a message burst we are too slow to
1520  		 * process messages and the queue will fill up memory.
1521  		 */
1522  		wait_event(processqueue_wq, !atomic_read(&processqueue_count));
1523  		fallthrough;
1524  	case DLM_IO_RESCHED:
1525  		cond_resched();
1526  		queue_work(io_workqueue, &con->rwork);
1527  		/* CF_RECV_PENDING not cleared */
1528  		break;
1529  	default:
1530  		if (ret < 0) {
1531  			if (test_bit(CF_IS_OTHERCON, &con->flags)) {
1532  				close_connection(con, false);
1533  			} else {
1534  				spin_lock_bh(&con->writequeue_lock);
1535  				lowcomms_queue_swork(con);
1536  				spin_unlock_bh(&con->writequeue_lock);
1537  			}
1538  
1539  			/* CF_RECV_PENDING cleared for othercon
1540  			 * we trigger send queue if not already done
1541  			 * and process_send_sockets will handle it
1542  			 */
1543  			break;
1544  		}
1545  
1546  		WARN_ON_ONCE(1);
1547  		break;
1548  	}
1549  }
1550  
process_listen_recv_socket(struct work_struct * work)1551  static void process_listen_recv_socket(struct work_struct *work)
1552  {
1553  	int ret;
1554  
1555  	if (WARN_ON_ONCE(!listen_con.sock))
1556  		return;
1557  
1558  	do {
1559  		ret = accept_from_sock();
1560  	} while (ret == DLM_IO_SUCCESS);
1561  
1562  	if (ret < 0)
1563  		log_print("critical error accepting connection: %d", ret);
1564  }
1565  
dlm_connect(struct connection * con)1566  static int dlm_connect(struct connection *con)
1567  {
1568  	struct sockaddr_storage addr;
1569  	int result, addr_len;
1570  	struct socket *sock;
1571  	unsigned int mark;
1572  
1573  	memset(&addr, 0, sizeof(addr));
1574  	result = nodeid_to_addr(con->nodeid, &addr, NULL,
1575  				dlm_proto_ops->try_new_addr, &mark);
1576  	if (result < 0) {
1577  		log_print("no address for nodeid %d", con->nodeid);
1578  		return result;
1579  	}
1580  
1581  	/* Create a socket to communicate with */
1582  	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1583  				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
1584  	if (result < 0)
1585  		return result;
1586  
1587  	sock_set_mark(sock->sk, mark);
1588  	dlm_proto_ops->sockopts(sock);
1589  
1590  	result = dlm_proto_ops->bind(sock);
1591  	if (result < 0) {
1592  		sock_release(sock);
1593  		return result;
1594  	}
1595  
1596  	add_sock(sock, con);
1597  
1598  	log_print_ratelimited("connecting to %d", con->nodeid);
1599  	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
1600  	result = kernel_connect(sock, (struct sockaddr *)&addr, addr_len, 0);
1601  	switch (result) {
1602  	case -EINPROGRESS:
1603  		/* not an error */
1604  		fallthrough;
1605  	case 0:
1606  		break;
1607  	default:
1608  		if (result < 0)
1609  			dlm_close_sock(&con->sock);
1610  
1611  		break;
1612  	}
1613  
1614  	return result;
1615  }
1616  
1617  /* Send worker function */
process_send_sockets(struct work_struct * work)1618  static void process_send_sockets(struct work_struct *work)
1619  {
1620  	struct connection *con = container_of(work, struct connection, swork);
1621  	int ret;
1622  
1623  	WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
1624  
1625  	down_read(&con->sock_lock);
1626  	if (!con->sock) {
1627  		up_read(&con->sock_lock);
1628  		down_write(&con->sock_lock);
1629  		if (!con->sock) {
1630  			ret = dlm_connect(con);
1631  			switch (ret) {
1632  			case 0:
1633  				break;
1634  			default:
1635  				/* CF_SEND_PENDING not cleared */
1636  				up_write(&con->sock_lock);
1637  				log_print("connect to node %d try %d error %d",
1638  					  con->nodeid, con->retries++, ret);
1639  				msleep(1000);
1640  				/* For now we try forever to reconnect. In
1641  				 * future we should send a event to cluster
1642  				 * manager to fence itself after certain amount
1643  				 * of retries.
1644  				 */
1645  				queue_work(io_workqueue, &con->swork);
1646  				return;
1647  			}
1648  		}
1649  		downgrade_write(&con->sock_lock);
1650  	}
1651  
1652  	do {
1653  		ret = send_to_sock(con);
1654  	} while (ret == DLM_IO_SUCCESS);
1655  	up_read(&con->sock_lock);
1656  
1657  	switch (ret) {
1658  	case DLM_IO_END:
1659  		/* CF_SEND_PENDING cleared */
1660  		break;
1661  	case DLM_IO_RESCHED:
1662  		/* CF_SEND_PENDING not cleared */
1663  		cond_resched();
1664  		queue_work(io_workqueue, &con->swork);
1665  		break;
1666  	default:
1667  		if (ret < 0) {
1668  			close_connection(con, false);
1669  
1670  			/* CF_SEND_PENDING cleared */
1671  			spin_lock_bh(&con->writequeue_lock);
1672  			lowcomms_queue_swork(con);
1673  			spin_unlock_bh(&con->writequeue_lock);
1674  			break;
1675  		}
1676  
1677  		WARN_ON_ONCE(1);
1678  		break;
1679  	}
1680  }
1681  
work_stop(void)1682  static void work_stop(void)
1683  {
1684  	if (io_workqueue) {
1685  		destroy_workqueue(io_workqueue);
1686  		io_workqueue = NULL;
1687  	}
1688  
1689  	if (process_workqueue) {
1690  		destroy_workqueue(process_workqueue);
1691  		process_workqueue = NULL;
1692  	}
1693  }
1694  
work_start(void)1695  static int work_start(void)
1696  {
1697  	io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM |
1698  				       WQ_UNBOUND, 0);
1699  	if (!io_workqueue) {
1700  		log_print("can't start dlm_io");
1701  		return -ENOMEM;
1702  	}
1703  
1704  	process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
1705  	if (!process_workqueue) {
1706  		log_print("can't start dlm_process");
1707  		destroy_workqueue(io_workqueue);
1708  		io_workqueue = NULL;
1709  		return -ENOMEM;
1710  	}
1711  
1712  	return 0;
1713  }
1714  
dlm_lowcomms_shutdown(void)1715  void dlm_lowcomms_shutdown(void)
1716  {
1717  	struct connection *con;
1718  	int i, idx;
1719  
1720  	/* stop lowcomms_listen_data_ready calls */
1721  	lock_sock(listen_con.sock->sk);
1722  	listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
1723  	release_sock(listen_con.sock->sk);
1724  
1725  	cancel_work_sync(&listen_con.rwork);
1726  	dlm_close_sock(&listen_con.sock);
1727  
1728  	idx = srcu_read_lock(&connections_srcu);
1729  	for (i = 0; i < CONN_HASH_SIZE; i++) {
1730  		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1731  			shutdown_connection(con, true);
1732  			stop_connection_io(con);
1733  			flush_workqueue(process_workqueue);
1734  			close_connection(con, true);
1735  
1736  			clean_one_writequeue(con);
1737  			if (con->othercon)
1738  				clean_one_writequeue(con->othercon);
1739  			allow_connection_io(con);
1740  		}
1741  	}
1742  	srcu_read_unlock(&connections_srcu, idx);
1743  }
1744  
dlm_lowcomms_stop(void)1745  void dlm_lowcomms_stop(void)
1746  {
1747  	work_stop();
1748  	dlm_proto_ops = NULL;
1749  }
1750  
dlm_listen_for_all(void)1751  static int dlm_listen_for_all(void)
1752  {
1753  	struct socket *sock;
1754  	int result;
1755  
1756  	log_print("Using %s for communications",
1757  		  dlm_proto_ops->name);
1758  
1759  	result = dlm_proto_ops->listen_validate();
1760  	if (result < 0)
1761  		return result;
1762  
1763  	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1764  				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
1765  	if (result < 0) {
1766  		log_print("Can't create comms socket: %d", result);
1767  		return result;
1768  	}
1769  
1770  	sock_set_mark(sock->sk, dlm_config.ci_mark);
1771  	dlm_proto_ops->listen_sockopts(sock);
1772  
1773  	result = dlm_proto_ops->listen_bind(sock);
1774  	if (result < 0)
1775  		goto out;
1776  
1777  	lock_sock(sock->sk);
1778  	listen_sock.sk_data_ready = sock->sk->sk_data_ready;
1779  	listen_sock.sk_write_space = sock->sk->sk_write_space;
1780  	listen_sock.sk_error_report = sock->sk->sk_error_report;
1781  	listen_sock.sk_state_change = sock->sk->sk_state_change;
1782  
1783  	listen_con.sock = sock;
1784  
1785  	sock->sk->sk_allocation = GFP_NOFS;
1786  	sock->sk->sk_use_task_frag = false;
1787  	sock->sk->sk_data_ready = lowcomms_listen_data_ready;
1788  	release_sock(sock->sk);
1789  
1790  	result = sock->ops->listen(sock, 128);
1791  	if (result < 0) {
1792  		dlm_close_sock(&listen_con.sock);
1793  		return result;
1794  	}
1795  
1796  	return 0;
1797  
1798  out:
1799  	sock_release(sock);
1800  	return result;
1801  }
1802  
dlm_tcp_bind(struct socket * sock)1803  static int dlm_tcp_bind(struct socket *sock)
1804  {
1805  	struct sockaddr_storage src_addr;
1806  	int result, addr_len;
1807  
1808  	/* Bind to our cluster-known address connecting to avoid
1809  	 * routing problems.
1810  	 */
1811  	memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
1812  	make_sockaddr(&src_addr, 0, &addr_len);
1813  
1814  	result = kernel_bind(sock, (struct sockaddr *)&src_addr,
1815  			     addr_len);
1816  	if (result < 0) {
1817  		/* This *may* not indicate a critical error */
1818  		log_print("could not bind for connect: %d", result);
1819  	}
1820  
1821  	return 0;
1822  }
1823  
dlm_tcp_listen_validate(void)1824  static int dlm_tcp_listen_validate(void)
1825  {
1826  	/* We don't support multi-homed hosts */
1827  	if (dlm_local_count > 1) {
1828  		log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1829  		return -EINVAL;
1830  	}
1831  
1832  	return 0;
1833  }
1834  
dlm_tcp_sockopts(struct socket * sock)1835  static void dlm_tcp_sockopts(struct socket *sock)
1836  {
1837  	/* Turn off Nagle's algorithm */
1838  	tcp_sock_set_nodelay(sock->sk);
1839  }
1840  
dlm_tcp_listen_sockopts(struct socket * sock)1841  static void dlm_tcp_listen_sockopts(struct socket *sock)
1842  {
1843  	dlm_tcp_sockopts(sock);
1844  	sock_set_reuseaddr(sock->sk);
1845  }
1846  
dlm_tcp_listen_bind(struct socket * sock)1847  static int dlm_tcp_listen_bind(struct socket *sock)
1848  {
1849  	int addr_len;
1850  
1851  	/* Bind to our port */
1852  	make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
1853  	return kernel_bind(sock, (struct sockaddr *)&dlm_local_addr[0],
1854  			   addr_len);
1855  }
1856  
1857  static const struct dlm_proto_ops dlm_tcp_ops = {
1858  	.name = "TCP",
1859  	.proto = IPPROTO_TCP,
1860  	.sockopts = dlm_tcp_sockopts,
1861  	.bind = dlm_tcp_bind,
1862  	.listen_validate = dlm_tcp_listen_validate,
1863  	.listen_sockopts = dlm_tcp_listen_sockopts,
1864  	.listen_bind = dlm_tcp_listen_bind,
1865  };
1866  
dlm_sctp_bind(struct socket * sock)1867  static int dlm_sctp_bind(struct socket *sock)
1868  {
1869  	return sctp_bind_addrs(sock, 0);
1870  }
1871  
dlm_sctp_listen_validate(void)1872  static int dlm_sctp_listen_validate(void)
1873  {
1874  	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
1875  		log_print("SCTP is not enabled by this kernel");
1876  		return -EOPNOTSUPP;
1877  	}
1878  
1879  	request_module("sctp");
1880  	return 0;
1881  }
1882  
dlm_sctp_bind_listen(struct socket * sock)1883  static int dlm_sctp_bind_listen(struct socket *sock)
1884  {
1885  	return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
1886  }
1887  
dlm_sctp_sockopts(struct socket * sock)1888  static void dlm_sctp_sockopts(struct socket *sock)
1889  {
1890  	/* Turn off Nagle's algorithm */
1891  	sctp_sock_set_nodelay(sock->sk);
1892  	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1893  }
1894  
1895  static const struct dlm_proto_ops dlm_sctp_ops = {
1896  	.name = "SCTP",
1897  	.proto = IPPROTO_SCTP,
1898  	.try_new_addr = true,
1899  	.sockopts = dlm_sctp_sockopts,
1900  	.bind = dlm_sctp_bind,
1901  	.listen_validate = dlm_sctp_listen_validate,
1902  	.listen_sockopts = dlm_sctp_sockopts,
1903  	.listen_bind = dlm_sctp_bind_listen,
1904  };
1905  
dlm_lowcomms_start(void)1906  int dlm_lowcomms_start(void)
1907  {
1908  	int error;
1909  
1910  	init_local();
1911  	if (!dlm_local_count) {
1912  		error = -ENOTCONN;
1913  		log_print("no local IP address has been set");
1914  		goto fail;
1915  	}
1916  
1917  	error = work_start();
1918  	if (error)
1919  		goto fail;
1920  
1921  	/* Start listening */
1922  	switch (dlm_config.ci_protocol) {
1923  	case DLM_PROTO_TCP:
1924  		dlm_proto_ops = &dlm_tcp_ops;
1925  		break;
1926  	case DLM_PROTO_SCTP:
1927  		dlm_proto_ops = &dlm_sctp_ops;
1928  		break;
1929  	default:
1930  		log_print("Invalid protocol identifier %d set",
1931  			  dlm_config.ci_protocol);
1932  		error = -EINVAL;
1933  		goto fail_proto_ops;
1934  	}
1935  
1936  	error = dlm_listen_for_all();
1937  	if (error)
1938  		goto fail_listen;
1939  
1940  	return 0;
1941  
1942  fail_listen:
1943  	dlm_proto_ops = NULL;
1944  fail_proto_ops:
1945  	work_stop();
1946  fail:
1947  	return error;
1948  }
1949  
dlm_lowcomms_init(void)1950  void dlm_lowcomms_init(void)
1951  {
1952  	int i;
1953  
1954  	for (i = 0; i < CONN_HASH_SIZE; i++)
1955  		INIT_HLIST_HEAD(&connection_hash[i]);
1956  
1957  	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1958  }
1959  
dlm_lowcomms_exit(void)1960  void dlm_lowcomms_exit(void)
1961  {
1962  	struct connection *con;
1963  	int i, idx;
1964  
1965  	idx = srcu_read_lock(&connections_srcu);
1966  	for (i = 0; i < CONN_HASH_SIZE; i++) {
1967  		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1968  			spin_lock(&connections_lock);
1969  			hlist_del_rcu(&con->list);
1970  			spin_unlock(&connections_lock);
1971  
1972  			if (con->othercon)
1973  				call_srcu(&connections_srcu, &con->othercon->rcu,
1974  					  connection_release);
1975  			call_srcu(&connections_srcu, &con->rcu, connection_release);
1976  		}
1977  	}
1978  	srcu_read_unlock(&connections_srcu, idx);
1979  }
1980