Lines Matching full:con
134 struct connection *con; member
214 static void lowcomms_queue_swork(struct connection *con) in lowcomms_queue_swork() argument
216 assert_spin_locked(&con->writequeue_lock); in lowcomms_queue_swork()
218 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_swork()
219 !test_bit(CF_APP_LIMITED, &con->flags) && in lowcomms_queue_swork()
220 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) in lowcomms_queue_swork()
221 queue_work(io_workqueue, &con->swork); in lowcomms_queue_swork()
224 static void lowcomms_queue_rwork(struct connection *con) in lowcomms_queue_rwork() argument
227 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); in lowcomms_queue_rwork()
230 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_rwork()
231 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) in lowcomms_queue_rwork()
232 queue_work(io_workqueue, &con->rwork); in lowcomms_queue_rwork()
254 static struct writequeue_entry *con_next_wq(struct connection *con) in con_next_wq() argument
258 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, in con_next_wq()
271 struct connection *con; in __find_con() local
273 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { in __find_con()
274 if (con->nodeid == nodeid) in __find_con()
275 return con; in __find_con()
281 static void dlm_con_init(struct connection *con, int nodeid) in dlm_con_init() argument
283 con->nodeid = nodeid; in dlm_con_init()
284 init_rwsem(&con->sock_lock); in dlm_con_init()
285 INIT_LIST_HEAD(&con->writequeue); in dlm_con_init()
286 spin_lock_init(&con->writequeue_lock); in dlm_con_init()
287 INIT_WORK(&con->swork, process_send_sockets); in dlm_con_init()
288 INIT_WORK(&con->rwork, process_recv_sockets); in dlm_con_init()
289 spin_lock_init(&con->addrs_lock); in dlm_con_init()
290 init_waitqueue_head(&con->shutdown_wait); in dlm_con_init()
299 struct connection *con, *tmp; in nodeid2con() local
303 con = __find_con(nodeid, r); in nodeid2con()
304 if (con || !alloc) in nodeid2con()
305 return con; in nodeid2con()
307 con = kzalloc(sizeof(*con), alloc); in nodeid2con()
308 if (!con) in nodeid2con()
311 dlm_con_init(con, nodeid); in nodeid2con()
323 kfree(con); in nodeid2con()
327 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
330 return con; in nodeid2con()
366 struct connection *con; in nodeid_to_addr() local
373 con = nodeid2con(nodeid, 0); in nodeid_to_addr()
374 if (!con) { in nodeid_to_addr()
379 spin_lock(&con->addrs_lock); in nodeid_to_addr()
380 if (!con->addr_count) { in nodeid_to_addr()
381 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
386 memcpy(&sas, &con->addr[con->curr_addr_index], in nodeid_to_addr()
390 con->curr_addr_index++; in nodeid_to_addr()
391 if (con->curr_addr_index == con->addr_count) in nodeid_to_addr()
392 con->curr_addr_index = 0; in nodeid_to_addr()
395 *mark = con->mark; in nodeid_to_addr()
396 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
423 struct connection *con; in addr_to_nodeid() local
428 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in addr_to_nodeid()
429 WARN_ON_ONCE(!con->addr_count); in addr_to_nodeid()
431 spin_lock(&con->addrs_lock); in addr_to_nodeid()
432 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { in addr_to_nodeid()
433 if (addr_compare(&con->addr[addr_i], addr)) { in addr_to_nodeid()
434 *nodeid = con->nodeid; in addr_to_nodeid()
435 *mark = con->mark; in addr_to_nodeid()
436 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
441 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
449 static bool dlm_lowcomms_con_has_addr(const struct connection *con, in dlm_lowcomms_con_has_addr() argument
454 for (i = 0; i < con->addr_count; i++) { in dlm_lowcomms_con_has_addr()
455 if (addr_compare(&con->addr[i], addr)) in dlm_lowcomms_con_has_addr()
464 struct connection *con; in dlm_lowcomms_addr() local
468 con = nodeid2con(nodeid, GFP_NOFS); in dlm_lowcomms_addr()
469 if (!con) { in dlm_lowcomms_addr()
474 spin_lock(&con->addrs_lock); in dlm_lowcomms_addr()
475 if (!con->addr_count) { in dlm_lowcomms_addr()
476 memcpy(&con->addr[0], addr, sizeof(*addr)); in dlm_lowcomms_addr()
477 con->addr_count = 1; in dlm_lowcomms_addr()
478 con->mark = dlm_config.ci_mark; in dlm_lowcomms_addr()
479 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
484 ret = dlm_lowcomms_con_has_addr(con, addr); in dlm_lowcomms_addr()
486 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
491 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { in dlm_lowcomms_addr()
492 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
497 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); in dlm_lowcomms_addr()
499 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
506 struct connection *con = sock2con(sk); in lowcomms_data_ready() local
510 set_bit(CF_RECV_INTR, &con->flags); in lowcomms_data_ready()
511 lowcomms_queue_rwork(con); in lowcomms_data_ready()
516 struct connection *con = sock2con(sk); in lowcomms_write_space() local
518 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
520 spin_lock_bh(&con->writequeue_lock); in lowcomms_write_space()
521 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
522 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
523 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
526 lowcomms_queue_swork(con); in lowcomms_write_space()
527 spin_unlock_bh(&con->writequeue_lock); in lowcomms_write_space()
548 struct connection *con; in dlm_lowcomms_connect_node() local
552 con = nodeid2con(nodeid, 0); in dlm_lowcomms_connect_node()
553 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_connect_node()
558 down_read(&con->sock_lock); in dlm_lowcomms_connect_node()
559 if (!con->sock) { in dlm_lowcomms_connect_node()
560 spin_lock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
561 lowcomms_queue_swork(con); in dlm_lowcomms_connect_node()
562 spin_unlock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
564 up_read(&con->sock_lock); in dlm_lowcomms_connect_node()
573 struct connection *con; in dlm_lowcomms_nodes_set_mark() local
577 con = nodeid2con(nodeid, 0); in dlm_lowcomms_nodes_set_mark()
578 if (!con) { in dlm_lowcomms_nodes_set_mark()
583 spin_lock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
584 con->mark = mark; in dlm_lowcomms_nodes_set_mark()
585 spin_unlock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
592 struct connection *con = sock2con(sk); in lowcomms_error_report() local
601 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
610 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
624 dlm_midcomms_unack_msg_resend(con->nodeid); in lowcomms_error_report()
643 static void add_sock(struct socket *sock, struct connection *con) in add_sock() argument
648 con->sock = sock; in add_sock()
650 sk->sk_user_data = con; in add_sock()
725 static void allow_connection_io(struct connection *con) in allow_connection_io() argument
727 if (con->othercon) in allow_connection_io()
728 clear_bit(CF_IO_STOP, &con->othercon->flags); in allow_connection_io()
729 clear_bit(CF_IO_STOP, &con->flags); in allow_connection_io()
732 static void stop_connection_io(struct connection *con) in stop_connection_io() argument
734 if (con->othercon) in stop_connection_io()
735 stop_connection_io(con->othercon); in stop_connection_io()
737 spin_lock_bh(&con->writequeue_lock); in stop_connection_io()
738 set_bit(CF_IO_STOP, &con->flags); in stop_connection_io()
739 spin_unlock_bh(&con->writequeue_lock); in stop_connection_io()
741 down_write(&con->sock_lock); in stop_connection_io()
742 if (con->sock) { in stop_connection_io()
743 lock_sock(con->sock->sk); in stop_connection_io()
744 restore_callbacks(con->sock->sk); in stop_connection_io()
745 release_sock(con->sock->sk); in stop_connection_io()
747 up_write(&con->sock_lock); in stop_connection_io()
749 cancel_work_sync(&con->swork); in stop_connection_io()
750 cancel_work_sync(&con->rwork); in stop_connection_io()
754 static void close_connection(struct connection *con, bool and_other) in close_connection() argument
758 if (con->othercon && and_other) in close_connection()
759 close_connection(con->othercon, false); in close_connection()
761 down_write(&con->sock_lock); in close_connection()
762 if (!con->sock) { in close_connection()
763 up_write(&con->sock_lock); in close_connection()
767 dlm_close_sock(&con->sock); in close_connection()
780 spin_lock_bh(&con->writequeue_lock); in close_connection()
781 if (!list_empty(&con->writequeue)) { in close_connection()
782 e = list_first_entry(&con->writequeue, struct writequeue_entry, in close_connection()
787 spin_unlock_bh(&con->writequeue_lock); in close_connection()
789 con->rx_leftover = 0; in close_connection()
790 con->retries = 0; in close_connection()
791 clear_bit(CF_APP_LIMITED, &con->flags); in close_connection()
792 clear_bit(CF_RECV_PENDING, &con->flags); in close_connection()
793 clear_bit(CF_SEND_PENDING, &con->flags); in close_connection()
794 up_write(&con->sock_lock); in close_connection()
797 static void shutdown_connection(struct connection *con, bool and_other) in shutdown_connection() argument
801 if (con->othercon && and_other) in shutdown_connection()
802 shutdown_connection(con->othercon, false); in shutdown_connection()
805 down_read(&con->sock_lock); in shutdown_connection()
807 if (!con->sock) { in shutdown_connection()
808 up_read(&con->sock_lock); in shutdown_connection()
812 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
813 up_read(&con->sock_lock); in shutdown_connection()
816 con, ret); in shutdown_connection()
819 ret = wait_event_timeout(con->shutdown_wait, !con->sock, in shutdown_connection()
823 con); in shutdown_connection()
831 close_connection(con, false); in shutdown_connection()
899 static int receive_from_sock(struct connection *con, int buflen) in receive_from_sock() argument
906 pentry = new_processqueue_entry(con->nodeid, buflen); in receive_from_sock()
910 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); in receive_from_sock()
915 iov.iov_base = pentry->buf + con->rx_leftover; in receive_from_sock()
916 iov.iov_len = buflen - con->rx_leftover; in receive_from_sock()
920 clear_bit(CF_RECV_INTR, &con->flags); in receive_from_sock()
922 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
924 trace_dlm_recv(con->nodeid, ret); in receive_from_sock()
926 lock_sock(con->sock->sk); in receive_from_sock()
927 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { in receive_from_sock()
928 release_sock(con->sock->sk); in receive_from_sock()
932 clear_bit(CF_RECV_PENDING, &con->flags); in receive_from_sock()
933 release_sock(con->sock->sk); in receive_from_sock()
946 buflen_real = ret + con->rx_leftover; in receive_from_sock()
947 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, in receive_from_sock()
960 con->rx_leftover = buflen_real - ret; in receive_from_sock()
961 memmove(con->rx_leftover_buf, pentry->buf + ret, in receive_from_sock()
962 con->rx_leftover); in receive_from_sock()
1067 /* close other sock con if we have something new */ in accept_from_sock()
1163 static struct writequeue_entry *new_writequeue_entry(struct connection *con) in new_writequeue_entry() argument
1181 entry->con = con; in new_writequeue_entry()
1187 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, in new_wq_entry() argument
1193 spin_lock_bh(&con->writequeue_lock); in new_wq_entry()
1194 if (!list_empty(&con->writequeue)) { in new_wq_entry()
1195 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); in new_wq_entry()
1209 e = new_writequeue_entry(con); in new_wq_entry()
1219 list_add_tail(&e->list, &con->writequeue); in new_wq_entry()
1222 spin_unlock_bh(&con->writequeue_lock); in new_wq_entry()
1226 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, in dlm_lowcomms_new_msg_con() argument
1239 e = new_wq_entry(con, len, ppc, cb, data); in dlm_lowcomms_new_msg_con()
1261 struct connection *con; in dlm_lowcomms_new_msg() local
1274 con = nodeid2con(nodeid, 0); in dlm_lowcomms_new_msg()
1275 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_new_msg()
1280 msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data); in dlm_lowcomms_new_msg()
1297 struct connection *con = e->con; in _dlm_lowcomms_commit_msg() local
1300 spin_lock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1310 lowcomms_queue_swork(con); in _dlm_lowcomms_commit_msg()
1313 spin_unlock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1344 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, in dlm_lowcomms_resend_msg()
1361 static int send_to_sock(struct connection *con) in send_to_sock() argument
1370 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1371 e = con_next_wq(con); in send_to_sock()
1373 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1374 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1381 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1385 ret = sock_sendmsg(con->sock, &msg); in send_to_sock()
1386 trace_dlm_send(con->nodeid, ret); in send_to_sock()
1388 lock_sock(con->sock->sk); in send_to_sock()
1389 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1390 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1391 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1395 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); in send_to_sock()
1396 con->sock->sk->sk_write_pending++; in send_to_sock()
1398 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1399 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1400 release_sock(con->sock->sk); in send_to_sock()
1405 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1406 release_sock(con->sock->sk); in send_to_sock()
1413 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1415 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1420 static void clean_one_writequeue(struct connection *con) in clean_one_writequeue() argument
1424 spin_lock_bh(&con->writequeue_lock); in clean_one_writequeue()
1425 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1428 spin_unlock_bh(&con->writequeue_lock); in clean_one_writequeue()
1433 struct connection *con = container_of(rcu, struct connection, rcu); in connection_release() local
1435 WARN_ON_ONCE(!list_empty(&con->writequeue)); in connection_release()
1436 WARN_ON_ONCE(con->sock); in connection_release()
1437 kfree(con); in connection_release()
1444 struct connection *con; in dlm_lowcomms_close() local
1450 con = nodeid2con(nodeid, 0); in dlm_lowcomms_close()
1451 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_close()
1456 stop_connection_io(con); in dlm_lowcomms_close()
1458 close_connection(con, true); in dlm_lowcomms_close()
1461 hlist_del_rcu(&con->list); in dlm_lowcomms_close()
1464 clean_one_writequeue(con); in dlm_lowcomms_close()
1465 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_close()
1466 if (con->othercon) { in dlm_lowcomms_close()
1467 clean_one_writequeue(con->othercon); in dlm_lowcomms_close()
1468 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); in dlm_lowcomms_close()
1484 struct connection *con = container_of(work, struct connection, rwork); in process_recv_sockets() local
1487 down_read(&con->sock_lock); in process_recv_sockets()
1488 if (!con->sock) { in process_recv_sockets()
1489 up_read(&con->sock_lock); in process_recv_sockets()
1495 ret = receive_from_sock(con, buflen); in process_recv_sockets()
1497 up_read(&con->sock_lock); in process_recv_sockets()
1504 close_connection(con, false); in process_recv_sockets()
1505 wake_up(&con->shutdown_wait); in process_recv_sockets()
1526 queue_work(io_workqueue, &con->rwork); in process_recv_sockets()
1531 if (test_bit(CF_IS_OTHERCON, &con->flags)) { in process_recv_sockets()
1532 close_connection(con, false); in process_recv_sockets()
1534 spin_lock_bh(&con->writequeue_lock); in process_recv_sockets()
1535 lowcomms_queue_swork(con); in process_recv_sockets()
1536 spin_unlock_bh(&con->writequeue_lock); in process_recv_sockets()
1566 static int dlm_connect(struct connection *con) in dlm_connect() argument
1574 result = nodeid_to_addr(con->nodeid, &addr, NULL, in dlm_connect()
1577 log_print("no address for nodeid %d", con->nodeid); in dlm_connect()
1596 add_sock(sock, con); in dlm_connect()
1598 log_print_ratelimited("connecting to %d", con->nodeid); in dlm_connect()
1609 dlm_close_sock(&con->sock); in dlm_connect()
1620 struct connection *con = container_of(work, struct connection, swork); in process_send_sockets() local
1623 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); in process_send_sockets()
1625 down_read(&con->sock_lock); in process_send_sockets()
1626 if (!con->sock) { in process_send_sockets()
1627 up_read(&con->sock_lock); in process_send_sockets()
1628 down_write(&con->sock_lock); in process_send_sockets()
1629 if (!con->sock) { in process_send_sockets()
1630 ret = dlm_connect(con); in process_send_sockets()
1636 up_write(&con->sock_lock); in process_send_sockets()
1638 con->nodeid, con->retries++, ret); in process_send_sockets()
1645 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1649 downgrade_write(&con->sock_lock); in process_send_sockets()
1653 ret = send_to_sock(con); in process_send_sockets()
1655 up_read(&con->sock_lock); in process_send_sockets()
1664 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1668 close_connection(con, false); in process_send_sockets()
1671 spin_lock_bh(&con->writequeue_lock); in process_send_sockets()
1672 lowcomms_queue_swork(con); in process_send_sockets()
1673 spin_unlock_bh(&con->writequeue_lock); in process_send_sockets()
1717 struct connection *con; in dlm_lowcomms_shutdown() local
1730 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in dlm_lowcomms_shutdown()
1731 shutdown_connection(con, true); in dlm_lowcomms_shutdown()
1732 stop_connection_io(con); in dlm_lowcomms_shutdown()
1734 close_connection(con, true); in dlm_lowcomms_shutdown()
1736 clean_one_writequeue(con); in dlm_lowcomms_shutdown()
1737 if (con->othercon) in dlm_lowcomms_shutdown()
1738 clean_one_writequeue(con->othercon); in dlm_lowcomms_shutdown()
1739 allow_connection_io(con); in dlm_lowcomms_shutdown()
1962 struct connection *con; in dlm_lowcomms_exit() local
1967 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in dlm_lowcomms_exit()
1969 hlist_del_rcu(&con->list); in dlm_lowcomms_exit()
1972 if (con->othercon) in dlm_lowcomms_exit()
1973 call_srcu(&connections_srcu, &con->othercon->rcu, in dlm_lowcomms_exit()
1975 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_exit()