Lines Matching full:con

100 void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)  in ceph_con_flag_clear()  argument
104 clear_bit(con_flag, &con->flags); in ceph_con_flag_clear()
107 void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag) in ceph_con_flag_set() argument
111 set_bit(con_flag, &con->flags); in ceph_con_flag_set()
114 bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag) in ceph_con_flag_test() argument
118 return test_bit(con_flag, &con->flags); in ceph_con_flag_test()
121 bool ceph_con_flag_test_and_clear(struct ceph_connection *con, in ceph_con_flag_test_and_clear() argument
126 return test_and_clear_bit(con_flag, &con->flags); in ceph_con_flag_test_and_clear()
129 bool ceph_con_flag_test_and_set(struct ceph_connection *con, in ceph_con_flag_test_and_set() argument
134 return test_and_set_bit(con_flag, &con->flags); in ceph_con_flag_test_and_set()
145 static void queue_con(struct ceph_connection *con);
146 static void cancel_con(struct ceph_connection *con);
148 static void con_fault(struct ceph_connection *con);
280 static void con_sock_state_init(struct ceph_connection *con) in con_sock_state_init() argument
284 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_init()
287 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_init()
291 static void con_sock_state_connecting(struct ceph_connection *con) in con_sock_state_connecting() argument
295 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); in con_sock_state_connecting()
298 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connecting()
302 static void con_sock_state_connected(struct ceph_connection *con) in con_sock_state_connected() argument
306 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); in con_sock_state_connected()
309 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connected()
313 static void con_sock_state_closing(struct ceph_connection *con) in con_sock_state_closing() argument
317 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); in con_sock_state_closing()
322 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closing()
326 static void con_sock_state_closed(struct ceph_connection *con) in con_sock_state_closed() argument
330 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_closed()
336 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closed()
347 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_data_ready() local
351 if (atomic_read(&con->msgr->stopping)) { in ceph_sock_data_ready()
357 con, con->state); in ceph_sock_data_ready()
358 queue_con(con); in ceph_sock_data_ready()
365 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_write_space() local
374 if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { in ceph_sock_write_space()
376 dout("%s %p queueing write work\n", __func__, con); in ceph_sock_write_space()
378 queue_con(con); in ceph_sock_write_space()
381 dout("%s %p nothing to write\n", __func__, con); in ceph_sock_write_space()
388 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_state_change() local
391 con, con->state, sk->sk_state); in ceph_sock_state_change()
399 con_sock_state_closing(con); in ceph_sock_state_change()
400 ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); in ceph_sock_state_change()
401 queue_con(con); in ceph_sock_state_change()
405 con_sock_state_connected(con); in ceph_sock_state_change()
406 queue_con(con); in ceph_sock_state_change()
417 struct ceph_connection *con) in set_sock_callbacks() argument
420 sk->sk_user_data = con; in set_sock_callbacks()
434 int ceph_tcp_connect(struct ceph_connection *con) in ceph_tcp_connect() argument
436 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ in ceph_tcp_connect()
441 dout("%s con %p peer_addr %s\n", __func__, con, in ceph_tcp_connect()
442 ceph_pr_addr(&con->peer_addr)); in ceph_tcp_connect()
443 BUG_ON(con->sock); in ceph_tcp_connect()
447 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family, in ceph_tcp_connect()
459 set_sock_callbacks(sock, con); in ceph_tcp_connect()
461 con_sock_state_connecting(con); in ceph_tcp_connect()
466 ceph_pr_addr(&con->peer_addr), in ceph_tcp_connect()
470 ceph_pr_addr(&con->peer_addr), ret); in ceph_tcp_connect()
475 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) in ceph_tcp_connect()
478 con->sock = sock; in ceph_tcp_connect()
485 int ceph_con_close_socket(struct ceph_connection *con) in ceph_con_close_socket() argument
489 dout("%s con %p sock %p\n", __func__, con, con->sock); in ceph_con_close_socket()
490 if (con->sock) { in ceph_con_close_socket()
491 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); in ceph_con_close_socket()
492 sock_release(con->sock); in ceph_con_close_socket()
493 con->sock = NULL; in ceph_con_close_socket()
502 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); in ceph_con_close_socket()
504 con_sock_state_closed(con); in ceph_con_close_socket()
508 static void ceph_con_reset_protocol(struct ceph_connection *con) in ceph_con_reset_protocol() argument
510 dout("%s con %p\n", __func__, con); in ceph_con_reset_protocol()
512 ceph_con_close_socket(con); in ceph_con_reset_protocol()
513 if (con->in_msg) { in ceph_con_reset_protocol()
514 WARN_ON(con->in_msg->con != con); in ceph_con_reset_protocol()
515 ceph_msg_put(con->in_msg); in ceph_con_reset_protocol()
516 con->in_msg = NULL; in ceph_con_reset_protocol()
518 if (con->out_msg) { in ceph_con_reset_protocol()
519 WARN_ON(con->out_msg->con != con); in ceph_con_reset_protocol()
520 ceph_msg_put(con->out_msg); in ceph_con_reset_protocol()
521 con->out_msg = NULL; in ceph_con_reset_protocol()
523 if (con->bounce_page) { in ceph_con_reset_protocol()
524 __free_page(con->bounce_page); in ceph_con_reset_protocol()
525 con->bounce_page = NULL; in ceph_con_reset_protocol()
528 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_reset_protocol()
529 ceph_con_v2_reset_protocol(con); in ceph_con_reset_protocol()
531 ceph_con_v1_reset_protocol(con); in ceph_con_reset_protocol()
554 void ceph_con_reset_session(struct ceph_connection *con) in ceph_con_reset_session() argument
556 dout("%s con %p\n", __func__, con); in ceph_con_reset_session()
558 WARN_ON(con->in_msg); in ceph_con_reset_session()
559 WARN_ON(con->out_msg); in ceph_con_reset_session()
560 ceph_msg_remove_list(&con->out_queue); in ceph_con_reset_session()
561 ceph_msg_remove_list(&con->out_sent); in ceph_con_reset_session()
562 con->out_seq = 0; in ceph_con_reset_session()
563 con->in_seq = 0; in ceph_con_reset_session()
564 con->in_seq_acked = 0; in ceph_con_reset_session()
566 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_reset_session()
567 ceph_con_v2_reset_session(con); in ceph_con_reset_session()
569 ceph_con_v1_reset_session(con); in ceph_con_reset_session()
575 void ceph_con_close(struct ceph_connection *con) in ceph_con_close() argument
577 mutex_lock(&con->mutex); in ceph_con_close()
578 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); in ceph_con_close()
579 con->state = CEPH_CON_S_CLOSED; in ceph_con_close()
581 ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next in ceph_con_close()
583 ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); in ceph_con_close()
584 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); in ceph_con_close()
585 ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF); in ceph_con_close()
587 ceph_con_reset_protocol(con); in ceph_con_close()
588 ceph_con_reset_session(con); in ceph_con_close()
589 cancel_con(con); in ceph_con_close()
590 mutex_unlock(&con->mutex); in ceph_con_close()
597 void ceph_con_open(struct ceph_connection *con, in ceph_con_open() argument
601 mutex_lock(&con->mutex); in ceph_con_open()
602 dout("con_open %p %s\n", con, ceph_pr_addr(addr)); in ceph_con_open()
604 WARN_ON(con->state != CEPH_CON_S_CLOSED); in ceph_con_open()
605 con->state = CEPH_CON_S_PREOPEN; in ceph_con_open()
607 con->peer_name.type = (__u8) entity_type; in ceph_con_open()
608 con->peer_name.num = cpu_to_le64(entity_num); in ceph_con_open()
610 memcpy(&con->peer_addr, addr, sizeof(*addr)); in ceph_con_open()
611 con->delay = 0; /* reset backoff memory */ in ceph_con_open()
612 mutex_unlock(&con->mutex); in ceph_con_open()
613 queue_con(con); in ceph_con_open()
620 bool ceph_con_opened(struct ceph_connection *con) in ceph_con_opened() argument
622 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_opened()
623 return ceph_con_v2_opened(con); in ceph_con_opened()
625 return ceph_con_v1_opened(con); in ceph_con_opened()
631 void ceph_con_init(struct ceph_connection *con, void *private, in ceph_con_init() argument
635 dout("con_init %p\n", con); in ceph_con_init()
636 memset(con, 0, sizeof(*con)); in ceph_con_init()
637 con->private = private; in ceph_con_init()
638 con->ops = ops; in ceph_con_init()
639 con->msgr = msgr; in ceph_con_init()
641 con_sock_state_init(con); in ceph_con_init()
643 mutex_init(&con->mutex); in ceph_con_init()
644 INIT_LIST_HEAD(&con->out_queue); in ceph_con_init()
645 INIT_LIST_HEAD(&con->out_sent); in ceph_con_init()
646 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); in ceph_con_init()
648 con->state = CEPH_CON_S_CLOSED; in ceph_con_init()
671 void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) in ceph_con_discard_sent() argument
676 dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq); in ceph_con_discard_sent()
677 while (!list_empty(&con->out_sent)) { in ceph_con_discard_sent()
678 msg = list_first_entry(&con->out_sent, struct ceph_msg, in ceph_con_discard_sent()
685 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, in ceph_con_discard_sent()
696 void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) in ceph_con_discard_requeued() argument
701 dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq); in ceph_con_discard_requeued()
702 while (!list_empty(&con->out_queue)) { in ceph_con_discard_requeued()
703 msg = list_first_entry(&con->out_queue, struct ceph_msg, in ceph_con_discard_requeued()
711 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, in ceph_con_discard_requeued()
1401 void ceph_con_process_message(struct ceph_connection *con) in ceph_con_process_message() argument
1403 struct ceph_msg *msg = con->in_msg; in ceph_con_process_message()
1405 BUG_ON(con->in_msg->con != con); in ceph_con_process_message()
1406 con->in_msg = NULL; in ceph_con_process_message()
1409 if (con->peer_name.type == 0) in ceph_con_process_message()
1410 con->peer_name = msg->hdr.src; in ceph_con_process_message()
1412 con->in_seq++; in ceph_con_process_message()
1413 mutex_unlock(&con->mutex); in ceph_con_process_message()
1423 con->in_front_crc, con->in_middle_crc, con->in_data_crc); in ceph_con_process_message()
1424 con->ops->dispatch(con, msg); in ceph_con_process_message()
1426 mutex_lock(&con->mutex); in ceph_con_process_message()
1431 * Bump @con reference to avoid races with connection teardown.
1434 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) in queue_con_delay() argument
1436 if (!con->ops->get(con)) { in queue_con_delay()
1437 dout("%s %p ref count 0\n", __func__, con); in queue_con_delay()
1444 dout("%s %p %lu\n", __func__, con, delay); in queue_con_delay()
1445 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { in queue_con_delay()
1446 dout("%s %p - already queued\n", __func__, con); in queue_con_delay()
1447 con->ops->put(con); in queue_con_delay()
1454 static void queue_con(struct ceph_connection *con) in queue_con() argument
1456 (void) queue_con_delay(con, 0); in queue_con()
1459 static void cancel_con(struct ceph_connection *con) in cancel_con() argument
1461 if (cancel_delayed_work(&con->work)) { in cancel_con()
1462 dout("%s %p\n", __func__, con); in cancel_con()
1463 con->ops->put(con); in cancel_con()
1467 static bool con_sock_closed(struct ceph_connection *con) in con_sock_closed() argument
1469 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) in con_sock_closed()
1474 con->error_msg = "socket closed (con state " #x ")"; \ in con_sock_closed()
1477 switch (con->state) { in con_sock_closed()
1499 static bool con_backoff(struct ceph_connection *con) in con_backoff() argument
1503 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) in con_backoff()
1506 ret = queue_con_delay(con, con->delay); in con_backoff()
1508 dout("%s: con %p FAILED to back off %lu\n", __func__, in con_backoff()
1509 con, con->delay); in con_backoff()
1511 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); in con_backoff()
1517 /* Finish fault handling; con->mutex must *not* be held here */
1519 static void con_fault_finish(struct ceph_connection *con) in con_fault_finish() argument
1521 dout("%s %p\n", __func__, con); in con_fault_finish()
1527 if (con->v1.auth_retry) { in con_fault_finish()
1528 dout("auth_retry %d, invalidating\n", con->v1.auth_retry); in con_fault_finish()
1529 if (con->ops->invalidate_authorizer) in con_fault_finish()
1530 con->ops->invalidate_authorizer(con); in con_fault_finish()
1531 con->v1.auth_retry = 0; in con_fault_finish()
1534 if (con->ops->fault) in con_fault_finish()
1535 con->ops->fault(con); in con_fault_finish()
1543 struct ceph_connection *con = container_of(work, struct ceph_connection, in ceph_con_workfn() local
1547 mutex_lock(&con->mutex); in ceph_con_workfn()
1551 if ((fault = con_sock_closed(con))) { in ceph_con_workfn()
1552 dout("%s: con %p SOCK_CLOSED\n", __func__, con); in ceph_con_workfn()
1555 if (con_backoff(con)) { in ceph_con_workfn()
1556 dout("%s: con %p BACKOFF\n", __func__, con); in ceph_con_workfn()
1559 if (con->state == CEPH_CON_S_STANDBY) { in ceph_con_workfn()
1560 dout("%s: con %p STANDBY\n", __func__, con); in ceph_con_workfn()
1563 if (con->state == CEPH_CON_S_CLOSED) { in ceph_con_workfn()
1564 dout("%s: con %p CLOSED\n", __func__, con); in ceph_con_workfn()
1565 BUG_ON(con->sock); in ceph_con_workfn()
1568 if (con->state == CEPH_CON_S_PREOPEN) { in ceph_con_workfn()
1569 dout("%s: con %p PREOPEN\n", __func__, con); in ceph_con_workfn()
1570 BUG_ON(con->sock); in ceph_con_workfn()
1573 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_workfn()
1574 ret = ceph_con_v2_try_read(con); in ceph_con_workfn()
1576 ret = ceph_con_v1_try_read(con); in ceph_con_workfn()
1580 if (!con->error_msg) in ceph_con_workfn()
1581 con->error_msg = "socket error on read"; in ceph_con_workfn()
1586 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_workfn()
1587 ret = ceph_con_v2_try_write(con); in ceph_con_workfn()
1589 ret = ceph_con_v1_try_write(con); in ceph_con_workfn()
1593 if (!con->error_msg) in ceph_con_workfn()
1594 con->error_msg = "socket error on write"; in ceph_con_workfn()
1601 con_fault(con); in ceph_con_workfn()
1602 mutex_unlock(&con->mutex); in ceph_con_workfn()
1605 con_fault_finish(con); in ceph_con_workfn()
1607 con->ops->put(con); in ceph_con_workfn()
1614 static void con_fault(struct ceph_connection *con) in con_fault() argument
1617 con, con->state, ceph_pr_addr(&con->peer_addr)); in con_fault()
1619 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), in con_fault()
1620 ceph_pr_addr(&con->peer_addr), con->error_msg); in con_fault()
1621 con->error_msg = NULL; in con_fault()
1623 WARN_ON(con->state == CEPH_CON_S_STANDBY || in con_fault()
1624 con->state == CEPH_CON_S_CLOSED); in con_fault()
1626 ceph_con_reset_protocol(con); in con_fault()
1628 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { in con_fault()
1630 con->state = CEPH_CON_S_CLOSED; in con_fault()
1635 list_splice_init(&con->out_sent, &con->out_queue); in con_fault()
1639 if (list_empty(&con->out_queue) && in con_fault()
1640 !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { in con_fault()
1641 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); in con_fault()
1642 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); in con_fault()
1643 con->state = CEPH_CON_S_STANDBY; in con_fault()
1646 con->state = CEPH_CON_S_PREOPEN; in con_fault()
1647 if (!con->delay) { in con_fault()
1648 con->delay = BASE_DELAY_INTERVAL; in con_fault()
1649 } else if (con->delay < MAX_DELAY_INTERVAL) { in con_fault()
1650 con->delay *= 2; in con_fault()
1651 if (con->delay > MAX_DELAY_INTERVAL) in con_fault()
1652 con->delay = MAX_DELAY_INTERVAL; in con_fault()
1654 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); in con_fault()
1655 queue_con(con); in con_fault()
1704 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) in msg_con_set() argument
1706 if (msg->con) in msg_con_set()
1707 msg->con->ops->put(msg->con); in msg_con_set()
1709 msg->con = con ? con->ops->get(con) : NULL; in msg_con_set()
1710 BUG_ON(msg->con != con); in msg_con_set()
1713 static void clear_standby(struct ceph_connection *con) in clear_standby() argument
1716 if (con->state == CEPH_CON_S_STANDBY) { in clear_standby()
1717 dout("clear_standby %p and ++connect_seq\n", con); in clear_standby()
1718 con->state = CEPH_CON_S_PREOPEN; in clear_standby()
1719 con->v1.connect_seq++; in clear_standby()
1720 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); in clear_standby()
1721 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); in clear_standby()
1730 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) in ceph_con_send() argument
1733 msg->hdr.src = con->msgr->inst.name; in ceph_con_send()
1737 mutex_lock(&con->mutex); in ceph_con_send()
1739 if (con->state == CEPH_CON_S_CLOSED) { in ceph_con_send()
1740 dout("con_send %p closed, dropping %p\n", con, msg); in ceph_con_send()
1742 mutex_unlock(&con->mutex); in ceph_con_send()
1746 msg_con_set(msg, con); in ceph_con_send()
1749 list_add_tail(&msg->list_head, &con->out_queue); in ceph_con_send()
1751 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), in ceph_con_send()
1757 clear_standby(con); in ceph_con_send()
1758 mutex_unlock(&con->mutex); in ceph_con_send()
1762 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) in ceph_con_send()
1763 queue_con(con); in ceph_con_send()
1772 struct ceph_connection *con = msg->con; in ceph_msg_revoke() local
1774 if (!con) { in ceph_msg_revoke()
1775 dout("%s msg %p null con\n", __func__, msg); in ceph_msg_revoke()
1779 mutex_lock(&con->mutex); in ceph_msg_revoke()
1781 WARN_ON(con->out_msg == msg); in ceph_msg_revoke()
1782 dout("%s con %p msg %p not linked\n", __func__, con, msg); in ceph_msg_revoke()
1783 mutex_unlock(&con->mutex); in ceph_msg_revoke()
1787 dout("%s con %p msg %p was linked\n", __func__, con, msg); in ceph_msg_revoke()
1791 if (con->out_msg == msg) { in ceph_msg_revoke()
1792 WARN_ON(con->state != CEPH_CON_S_OPEN); in ceph_msg_revoke()
1793 dout("%s con %p msg %p was sending\n", __func__, con, msg); in ceph_msg_revoke()
1794 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_msg_revoke()
1795 ceph_con_v2_revoke(con); in ceph_msg_revoke()
1797 ceph_con_v1_revoke(con); in ceph_msg_revoke()
1798 ceph_msg_put(con->out_msg); in ceph_msg_revoke()
1799 con->out_msg = NULL; in ceph_msg_revoke()
1801 dout("%s con %p msg %p not current, out_msg %p\n", __func__, in ceph_msg_revoke()
1802 con, msg, con->out_msg); in ceph_msg_revoke()
1804 mutex_unlock(&con->mutex); in ceph_msg_revoke()
1812 struct ceph_connection *con = msg->con; in ceph_msg_revoke_incoming() local
1814 if (!con) { in ceph_msg_revoke_incoming()
1815 dout("%s msg %p null con\n", __func__, msg); in ceph_msg_revoke_incoming()
1819 mutex_lock(&con->mutex); in ceph_msg_revoke_incoming()
1820 if (con->in_msg == msg) { in ceph_msg_revoke_incoming()
1821 WARN_ON(con->state != CEPH_CON_S_OPEN); in ceph_msg_revoke_incoming()
1822 dout("%s con %p msg %p was recving\n", __func__, con, msg); in ceph_msg_revoke_incoming()
1823 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_msg_revoke_incoming()
1824 ceph_con_v2_revoke_incoming(con); in ceph_msg_revoke_incoming()
1826 ceph_con_v1_revoke_incoming(con); in ceph_msg_revoke_incoming()
1827 ceph_msg_put(con->in_msg); in ceph_msg_revoke_incoming()
1828 con->in_msg = NULL; in ceph_msg_revoke_incoming()
1830 dout("%s con %p msg %p not current, in_msg %p\n", __func__, in ceph_msg_revoke_incoming()
1831 con, msg, con->in_msg); in ceph_msg_revoke_incoming()
1833 mutex_unlock(&con->mutex); in ceph_msg_revoke_incoming()
1839 void ceph_con_keepalive(struct ceph_connection *con) in ceph_con_keepalive() argument
1841 dout("con_keepalive %p\n", con); in ceph_con_keepalive()
1842 mutex_lock(&con->mutex); in ceph_con_keepalive()
1843 clear_standby(con); in ceph_con_keepalive()
1844 ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); in ceph_con_keepalive()
1845 mutex_unlock(&con->mutex); in ceph_con_keepalive()
1847 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) in ceph_con_keepalive()
1848 queue_con(con); in ceph_con_keepalive()
1852 bool ceph_con_keepalive_expired(struct ceph_connection *con, in ceph_con_keepalive_expired() argument
1856 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { in ceph_con_keepalive_expired()
1861 ts = timespec64_add(con->last_keepalive_ack, ts); in ceph_con_keepalive_expired()
2034 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) in ceph_alloc_middle() argument
2052 * connection, and save the result in con->in_msg. Uses the
2059 * - con->in_msg == NULL
2061 * - con->in_msg is non-null.
2063 * - con->in_msg == NULL
2065 int ceph_con_in_msg_alloc(struct ceph_connection *con, in ceph_con_in_msg_alloc() argument
2072 BUG_ON(con->in_msg != NULL); in ceph_con_in_msg_alloc()
2073 BUG_ON(!con->ops->alloc_msg); in ceph_con_in_msg_alloc()
2075 mutex_unlock(&con->mutex); in ceph_con_in_msg_alloc()
2076 msg = con->ops->alloc_msg(con, hdr, skip); in ceph_con_in_msg_alloc()
2077 mutex_lock(&con->mutex); in ceph_con_in_msg_alloc()
2078 if (con->state != CEPH_CON_S_OPEN) { in ceph_con_in_msg_alloc()
2085 msg_con_set(msg, con); in ceph_con_in_msg_alloc()
2086 con->in_msg = msg; in ceph_con_in_msg_alloc()
2096 con->error_msg = "error allocating memory for incoming message"; in ceph_con_in_msg_alloc()
2099 memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr)); in ceph_con_in_msg_alloc()
2101 if (middle_len && !con->in_msg->middle) { in ceph_con_in_msg_alloc()
2102 ret = ceph_alloc_middle(con, con->in_msg); in ceph_con_in_msg_alloc()
2104 ceph_msg_put(con->in_msg); in ceph_con_in_msg_alloc()
2105 con->in_msg = NULL; in ceph_con_in_msg_alloc()
2112 void ceph_con_get_out_msg(struct ceph_connection *con) in ceph_con_get_out_msg() argument
2116 BUG_ON(list_empty(&con->out_queue)); in ceph_con_get_out_msg()
2117 msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head); in ceph_con_get_out_msg()
2118 WARN_ON(msg->con != con); in ceph_con_get_out_msg()
2124 list_move_tail(&msg->list_head, &con->out_sent); in ceph_con_get_out_msg()
2131 msg->hdr.seq = cpu_to_le64(++con->out_seq); in ceph_con_get_out_msg()
2134 if (con->ops->reencode_message) in ceph_con_get_out_msg()
2135 con->ops->reencode_message(msg); in ceph_con_get_out_msg()
2142 WARN_ON(con->out_msg); in ceph_con_get_out_msg()
2143 con->out_msg = ceph_msg_get(msg); in ceph_con_get_out_msg()