Lines Matching full:con

110 static void con_out_kvec_reset(struct ceph_connection *con)  in con_out_kvec_reset()  argument
112 BUG_ON(con->v1.out_skip); in con_out_kvec_reset()
114 con->v1.out_kvec_left = 0; in con_out_kvec_reset()
115 con->v1.out_kvec_bytes = 0; in con_out_kvec_reset()
116 con->v1.out_kvec_cur = &con->v1.out_kvec[0]; in con_out_kvec_reset()
119 static void con_out_kvec_add(struct ceph_connection *con, in con_out_kvec_add() argument
122 int index = con->v1.out_kvec_left; in con_out_kvec_add()
124 BUG_ON(con->v1.out_skip); in con_out_kvec_add()
125 BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec)); in con_out_kvec_add()
127 con->v1.out_kvec[index].iov_len = size; in con_out_kvec_add()
128 con->v1.out_kvec[index].iov_base = data; in con_out_kvec_add()
129 con->v1.out_kvec_left++; in con_out_kvec_add()
130 con->v1.out_kvec_bytes += size; in con_out_kvec_add()
138 static int con_out_kvec_skip(struct ceph_connection *con) in con_out_kvec_skip() argument
142 if (con->v1.out_kvec_bytes > 0) { in con_out_kvec_skip()
143 skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len; in con_out_kvec_skip()
144 BUG_ON(con->v1.out_kvec_bytes < skip); in con_out_kvec_skip()
145 BUG_ON(!con->v1.out_kvec_left); in con_out_kvec_skip()
146 con->v1.out_kvec_bytes -= skip; in con_out_kvec_skip()
147 con->v1.out_kvec_left--; in con_out_kvec_skip()
153 static size_t sizeof_footer(struct ceph_connection *con) in sizeof_footer() argument
155 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? in sizeof_footer()
172 static void prepare_write_message_footer(struct ceph_connection *con) in prepare_write_message_footer() argument
174 struct ceph_msg *m = con->out_msg; in prepare_write_message_footer()
178 dout("prepare_write_message_footer %p\n", con); in prepare_write_message_footer()
179 con_out_kvec_add(con, sizeof_footer(con), &m->footer); in prepare_write_message_footer()
180 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { in prepare_write_message_footer()
181 if (con->ops->sign_message) in prepare_write_message_footer()
182 con->ops->sign_message(m); in prepare_write_message_footer()
188 con->v1.out_more = m->more_to_follow; in prepare_write_message_footer()
189 con->v1.out_msg_done = true; in prepare_write_message_footer()
195 static void prepare_write_message(struct ceph_connection *con) in prepare_write_message() argument
200 con_out_kvec_reset(con); in prepare_write_message()
201 con->v1.out_msg_done = false; in prepare_write_message()
205 if (con->in_seq > con->in_seq_acked) { in prepare_write_message()
206 con->in_seq_acked = con->in_seq; in prepare_write_message()
207 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_message()
208 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_message()
209 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), in prepare_write_message()
210 &con->v1.out_temp_ack); in prepare_write_message()
213 ceph_con_get_out_msg(con); in prepare_write_message()
214 m = con->out_msg; in prepare_write_message()
217 m, con->out_seq, le16_to_cpu(m->hdr.type), in prepare_write_message()
224 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); in prepare_write_message()
225 con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr); in prepare_write_message()
226 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); in prepare_write_message()
229 con_out_kvec_add(con, m->middle->vec.iov_len, in prepare_write_message()
234 con->out_msg->hdr.crc = cpu_to_le32(crc); in prepare_write_message()
235 memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr)); in prepare_write_message()
239 con->out_msg->footer.front_crc = cpu_to_le32(crc); in prepare_write_message()
243 con->out_msg->footer.middle_crc = cpu_to_le32(crc); in prepare_write_message()
245 con->out_msg->footer.middle_crc = 0; in prepare_write_message()
247 le32_to_cpu(con->out_msg->footer.front_crc), in prepare_write_message()
248 le32_to_cpu(con->out_msg->footer.middle_crc)); in prepare_write_message()
249 con->out_msg->footer.flags = 0; in prepare_write_message()
252 con->out_msg->footer.data_crc = 0; in prepare_write_message()
254 prepare_message_data(con->out_msg, m->data_length); in prepare_write_message()
255 con->v1.out_more = 1; /* data + footer will follow */ in prepare_write_message()
258 prepare_write_message_footer(con); in prepare_write_message()
261 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_message()
267 static void prepare_write_ack(struct ceph_connection *con) in prepare_write_ack() argument
269 dout("prepare_write_ack %p %llu -> %llu\n", con, in prepare_write_ack()
270 con->in_seq_acked, con->in_seq); in prepare_write_ack()
271 con->in_seq_acked = con->in_seq; in prepare_write_ack()
273 con_out_kvec_reset(con); in prepare_write_ack()
275 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_ack()
277 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_ack()
278 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), in prepare_write_ack()
279 &con->v1.out_temp_ack); in prepare_write_ack()
281 con->v1.out_more = 1; /* more will follow.. eventually.. */ in prepare_write_ack()
282 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_ack()
288 static void prepare_write_seq(struct ceph_connection *con) in prepare_write_seq() argument
290 dout("prepare_write_seq %p %llu -> %llu\n", con, in prepare_write_seq()
291 con->in_seq_acked, con->in_seq); in prepare_write_seq()
292 con->in_seq_acked = con->in_seq; in prepare_write_seq()
294 con_out_kvec_reset(con); in prepare_write_seq()
296 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_seq()
297 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), in prepare_write_seq()
298 &con->v1.out_temp_ack); in prepare_write_seq()
300 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_seq()
306 static void prepare_write_keepalive(struct ceph_connection *con) in prepare_write_keepalive() argument
308 dout("prepare_write_keepalive %p\n", con); in prepare_write_keepalive()
309 con_out_kvec_reset(con); in prepare_write_keepalive()
310 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { in prepare_write_keepalive()
314 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); in prepare_write_keepalive()
315 ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now); in prepare_write_keepalive()
316 con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2), in prepare_write_keepalive()
317 &con->v1.out_temp_keepalive2); in prepare_write_keepalive()
319 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); in prepare_write_keepalive()
321 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_keepalive()
328 static int get_connect_authorizer(struct ceph_connection *con) in get_connect_authorizer() argument
333 if (!con->ops->get_authorizer) { in get_connect_authorizer()
334 con->v1.auth = NULL; in get_connect_authorizer()
335 con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; in get_connect_authorizer()
336 con->v1.out_connect.authorizer_len = 0; in get_connect_authorizer()
340 auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry); in get_connect_authorizer()
344 con->v1.auth = auth; in get_connect_authorizer()
345 con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto); in get_connect_authorizer()
346 con->v1.out_connect.authorizer_len = in get_connect_authorizer()
354 static void prepare_write_banner(struct ceph_connection *con) in prepare_write_banner() argument
356 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); in prepare_write_banner()
357 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), in prepare_write_banner()
358 &con->msgr->my_enc_addr); in prepare_write_banner()
360 con->v1.out_more = 0; in prepare_write_banner()
361 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_banner()
364 static void __prepare_write_connect(struct ceph_connection *con) in __prepare_write_connect() argument
366 con_out_kvec_add(con, sizeof(con->v1.out_connect), in __prepare_write_connect()
367 &con->v1.out_connect); in __prepare_write_connect()
368 if (con->v1.auth) in __prepare_write_connect()
369 con_out_kvec_add(con, con->v1.auth->authorizer_buf_len, in __prepare_write_connect()
370 con->v1.auth->authorizer_buf); in __prepare_write_connect()
372 con->v1.out_more = 0; in __prepare_write_connect()
373 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in __prepare_write_connect()
376 static int prepare_write_connect(struct ceph_connection *con) in prepare_write_connect() argument
378 unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); in prepare_write_connect()
382 switch (con->peer_name.type) { in prepare_write_connect()
396 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, in prepare_write_connect()
397 con->v1.connect_seq, global_seq, proto); in prepare_write_connect()
399 con->v1.out_connect.features = in prepare_write_connect()
400 cpu_to_le64(from_msgr(con->msgr)->supported_features); in prepare_write_connect()
401 con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); in prepare_write_connect()
402 con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq); in prepare_write_connect()
403 con->v1.out_connect.global_seq = cpu_to_le32(global_seq); in prepare_write_connect()
404 con->v1.out_connect.protocol_version = cpu_to_le32(proto); in prepare_write_connect()
405 con->v1.out_connect.flags = 0; in prepare_write_connect()
407 ret = get_connect_authorizer(con); in prepare_write_connect()
411 __prepare_write_connect(con); in prepare_write_connect()
421 static int write_partial_kvec(struct ceph_connection *con) in write_partial_kvec() argument
425 dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes); in write_partial_kvec()
426 while (con->v1.out_kvec_bytes > 0) { in write_partial_kvec()
427 ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur, in write_partial_kvec()
428 con->v1.out_kvec_left, in write_partial_kvec()
429 con->v1.out_kvec_bytes, in write_partial_kvec()
430 con->v1.out_more); in write_partial_kvec()
433 con->v1.out_kvec_bytes -= ret; in write_partial_kvec()
434 if (!con->v1.out_kvec_bytes) in write_partial_kvec()
438 while (ret >= con->v1.out_kvec_cur->iov_len) { in write_partial_kvec()
439 BUG_ON(!con->v1.out_kvec_left); in write_partial_kvec()
440 ret -= con->v1.out_kvec_cur->iov_len; in write_partial_kvec()
441 con->v1.out_kvec_cur++; in write_partial_kvec()
442 con->v1.out_kvec_left--; in write_partial_kvec()
446 con->v1.out_kvec_cur->iov_len -= ret; in write_partial_kvec()
447 con->v1.out_kvec_cur->iov_base += ret; in write_partial_kvec()
450 con->v1.out_kvec_left = 0; in write_partial_kvec()
453 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, in write_partial_kvec()
454 con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret); in write_partial_kvec()
465 static int write_partial_message_data(struct ceph_connection *con) in write_partial_message_data() argument
467 struct ceph_msg *msg = con->out_msg; in write_partial_message_data()
469 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in write_partial_message_data()
472 dout("%s %p msg %p\n", __func__, con, msg); in write_partial_message_data()
498 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, in write_partial_message_data()
511 dout("%s %p msg %p done\n", __func__, con, msg); in write_partial_message_data()
518 con_out_kvec_reset(con); in write_partial_message_data()
519 prepare_write_message_footer(con); in write_partial_message_data()
527 static int write_partial_skip(struct ceph_connection *con) in write_partial_skip() argument
531 dout("%s %p %d left\n", __func__, con, con->v1.out_skip); in write_partial_skip()
532 while (con->v1.out_skip > 0) { in write_partial_skip()
533 size_t size = min(con->v1.out_skip, (int)PAGE_SIZE); in write_partial_skip()
535 ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, in write_partial_skip()
539 con->v1.out_skip -= ret; in write_partial_skip()
549 static void prepare_read_banner(struct ceph_connection *con) in prepare_read_banner() argument
551 dout("prepare_read_banner %p\n", con); in prepare_read_banner()
552 con->v1.in_base_pos = 0; in prepare_read_banner()
555 static void prepare_read_connect(struct ceph_connection *con) in prepare_read_connect() argument
557 dout("prepare_read_connect %p\n", con); in prepare_read_connect()
558 con->v1.in_base_pos = 0; in prepare_read_connect()
561 static void prepare_read_ack(struct ceph_connection *con) in prepare_read_ack() argument
563 dout("prepare_read_ack %p\n", con); in prepare_read_ack()
564 con->v1.in_base_pos = 0; in prepare_read_ack()
567 static void prepare_read_seq(struct ceph_connection *con) in prepare_read_seq() argument
569 dout("prepare_read_seq %p\n", con); in prepare_read_seq()
570 con->v1.in_base_pos = 0; in prepare_read_seq()
571 con->v1.in_tag = CEPH_MSGR_TAG_SEQ; in prepare_read_seq()
574 static void prepare_read_tag(struct ceph_connection *con) in prepare_read_tag() argument
576 dout("prepare_read_tag %p\n", con); in prepare_read_tag()
577 con->v1.in_base_pos = 0; in prepare_read_tag()
578 con->v1.in_tag = CEPH_MSGR_TAG_READY; in prepare_read_tag()
581 static void prepare_read_keepalive_ack(struct ceph_connection *con) in prepare_read_keepalive_ack() argument
583 dout("prepare_read_keepalive_ack %p\n", con); in prepare_read_keepalive_ack()
584 con->v1.in_base_pos = 0; in prepare_read_keepalive_ack()
590 static int prepare_read_message(struct ceph_connection *con) in prepare_read_message() argument
592 dout("prepare_read_message %p\n", con); in prepare_read_message()
593 BUG_ON(con->in_msg != NULL); in prepare_read_message()
594 con->v1.in_base_pos = 0; in prepare_read_message()
595 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; in prepare_read_message()
599 static int read_partial(struct ceph_connection *con, in read_partial() argument
602 while (con->v1.in_base_pos < end) { in read_partial()
603 int left = end - con->v1.in_base_pos; in read_partial()
605 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); in read_partial()
608 con->v1.in_base_pos += ret; in read_partial()
616 static int read_partial_banner(struct ceph_connection *con) in read_partial_banner() argument
622 dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos); in read_partial_banner()
627 ret = read_partial(con, end, size, con->v1.in_banner); in read_partial_banner()
631 size = sizeof(con->v1.actual_peer_addr); in read_partial_banner()
633 ret = read_partial(con, end, size, &con->v1.actual_peer_addr); in read_partial_banner()
636 ceph_decode_banner_addr(&con->v1.actual_peer_addr); in read_partial_banner()
638 size = sizeof(con->v1.peer_addr_for_me); in read_partial_banner()
640 ret = read_partial(con, end, size, &con->v1.peer_addr_for_me); in read_partial_banner()
643 ceph_decode_banner_addr(&con->v1.peer_addr_for_me); in read_partial_banner()
649 static int read_partial_connect(struct ceph_connection *con) in read_partial_connect() argument
655 dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos); in read_partial_connect()
657 size = sizeof(con->v1.in_reply); in read_partial_connect()
659 ret = read_partial(con, end, size, &con->v1.in_reply); in read_partial_connect()
663 if (con->v1.auth) { in read_partial_connect()
664 size = le32_to_cpu(con->v1.in_reply.authorizer_len); in read_partial_connect()
665 if (size > con->v1.auth->authorizer_reply_buf_len) { in read_partial_connect()
667 con->v1.auth->authorizer_reply_buf_len); in read_partial_connect()
673 ret = read_partial(con, end, size, in read_partial_connect()
674 con->v1.auth->authorizer_reply_buf); in read_partial_connect()
680 con, con->v1.in_reply.tag, in read_partial_connect()
681 le32_to_cpu(con->v1.in_reply.connect_seq), in read_partial_connect()
682 le32_to_cpu(con->v1.in_reply.global_seq)); in read_partial_connect()
690 static int verify_hello(struct ceph_connection *con) in verify_hello() argument
692 if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { in verify_hello()
694 ceph_pr_addr(&con->peer_addr)); in verify_hello()
695 con->error_msg = "protocol error, bad banner"; in verify_hello()
701 static int process_banner(struct ceph_connection *con) in process_banner() argument
703 struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; in process_banner()
705 dout("process_banner on %p\n", con); in process_banner()
707 if (verify_hello(con) < 0) in process_banner()
715 if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr, in process_banner()
716 sizeof(con->peer_addr)) != 0 && in process_banner()
717 !(ceph_addr_is_blank(&con->v1.actual_peer_addr) && in process_banner()
718 con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) { in process_banner()
720 ceph_pr_addr(&con->peer_addr), in process_banner()
721 le32_to_cpu(con->peer_addr.nonce), in process_banner()
722 ceph_pr_addr(&con->v1.actual_peer_addr), in process_banner()
723 le32_to_cpu(con->v1.actual_peer_addr.nonce)); in process_banner()
724 con->error_msg = "wrong peer at address"; in process_banner()
733 &con->v1.peer_addr_for_me.in_addr, in process_banner()
734 sizeof(con->v1.peer_addr_for_me.in_addr)); in process_banner()
736 ceph_encode_my_addr(con->msgr); in process_banner()
744 static int process_connect(struct ceph_connection *con) in process_connect() argument
746 u64 sup_feat = from_msgr(con->msgr)->supported_features; in process_connect()
747 u64 req_feat = from_msgr(con->msgr)->required_features; in process_connect()
748 u64 server_feat = le64_to_cpu(con->v1.in_reply.features); in process_connect()
751 dout("process_connect on %p tag %d\n", con, con->v1.in_tag); in process_connect()
753 if (con->v1.auth) { in process_connect()
754 int len = le32_to_cpu(con->v1.in_reply.authorizer_len); in process_connect()
763 if (con->v1.in_reply.tag == in process_connect()
765 ret = con->ops->add_authorizer_challenge( in process_connect()
766 con, con->v1.auth->authorizer_reply_buf, len); in process_connect()
770 con_out_kvec_reset(con); in process_connect()
771 __prepare_write_connect(con); in process_connect()
772 prepare_read_connect(con); in process_connect()
777 ret = con->ops->verify_authorizer_reply(con); in process_connect()
779 con->error_msg = "bad authorize reply"; in process_connect()
785 switch (con->v1.in_reply.tag) { in process_connect()
789 ENTITY_NAME(con->peer_name), in process_connect()
790 ceph_pr_addr(&con->peer_addr), in process_connect()
792 con->error_msg = "missing required protocol features"; in process_connect()
798 ENTITY_NAME(con->peer_name), in process_connect()
799 ceph_pr_addr(&con->peer_addr), in process_connect()
800 le32_to_cpu(con->v1.out_connect.protocol_version), in process_connect()
801 le32_to_cpu(con->v1.in_reply.protocol_version)); in process_connect()
802 con->error_msg = "protocol version mismatch"; in process_connect()
806 con->v1.auth_retry++; in process_connect()
807 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, in process_connect()
808 con->v1.auth_retry); in process_connect()
809 if (con->v1.auth_retry == 2) { in process_connect()
810 con->error_msg = "connect authorization failure"; in process_connect()
813 con_out_kvec_reset(con); in process_connect()
814 ret = prepare_write_connect(con); in process_connect()
817 prepare_read_connect(con); in process_connect()
829 le32_to_cpu(con->v1.in_reply.connect_seq)); in process_connect()
831 ENTITY_NAME(con->peer_name), in process_connect()
832 ceph_pr_addr(&con->peer_addr)); in process_connect()
833 ceph_con_reset_session(con); in process_connect()
834 con_out_kvec_reset(con); in process_connect()
835 ret = prepare_write_connect(con); in process_connect()
838 prepare_read_connect(con); in process_connect()
841 mutex_unlock(&con->mutex); in process_connect()
842 if (con->ops->peer_reset) in process_connect()
843 con->ops->peer_reset(con); in process_connect()
844 mutex_lock(&con->mutex); in process_connect()
845 if (con->state != CEPH_CON_S_V1_CONNECT_MSG) in process_connect()
855 le32_to_cpu(con->v1.out_connect.connect_seq), in process_connect()
856 le32_to_cpu(con->v1.in_reply.connect_seq)); in process_connect()
857 con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq); in process_connect()
858 con_out_kvec_reset(con); in process_connect()
859 ret = prepare_write_connect(con); in process_connect()
862 prepare_read_connect(con); in process_connect()
871 con->v1.peer_global_seq, in process_connect()
872 le32_to_cpu(con->v1.in_reply.global_seq)); in process_connect()
873 ceph_get_global_seq(con->msgr, in process_connect()
874 le32_to_cpu(con->v1.in_reply.global_seq)); in process_connect()
875 con_out_kvec_reset(con); in process_connect()
876 ret = prepare_write_connect(con); in process_connect()
879 prepare_read_connect(con); in process_connect()
887 ENTITY_NAME(con->peer_name), in process_connect()
888 ceph_pr_addr(&con->peer_addr), in process_connect()
890 con->error_msg = "missing required protocol features"; in process_connect()
894 WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); in process_connect()
895 con->state = CEPH_CON_S_OPEN; in process_connect()
896 con->v1.auth_retry = 0; /* we authenticated; clear flag */ in process_connect()
897 con->v1.peer_global_seq = in process_connect()
898 le32_to_cpu(con->v1.in_reply.global_seq); in process_connect()
899 con->v1.connect_seq++; in process_connect()
900 con->peer_features = server_feat; in process_connect()
902 con->v1.peer_global_seq, in process_connect()
903 le32_to_cpu(con->v1.in_reply.connect_seq), in process_connect()
904 con->v1.connect_seq); in process_connect()
905 WARN_ON(con->v1.connect_seq != in process_connect()
906 le32_to_cpu(con->v1.in_reply.connect_seq)); in process_connect()
908 if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY) in process_connect()
909 ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); in process_connect()
911 con->delay = 0; /* reset backoff memory */ in process_connect()
913 if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) { in process_connect()
914 prepare_write_seq(con); in process_connect()
915 prepare_read_seq(con); in process_connect()
917 prepare_read_tag(con); in process_connect()
928 con->error_msg = "protocol error, got WAIT as client"; in process_connect()
932 con->error_msg = "protocol error, garbage tag during connect"; in process_connect()
941 static int read_partial_ack(struct ceph_connection *con) in read_partial_ack() argument
943 int size = sizeof(con->v1.in_temp_ack); in read_partial_ack()
946 return read_partial(con, end, size, &con->v1.in_temp_ack); in read_partial_ack()
952 static void process_ack(struct ceph_connection *con) in process_ack() argument
954 u64 ack = le64_to_cpu(con->v1.in_temp_ack); in process_ack()
956 if (con->v1.in_tag == CEPH_MSGR_TAG_ACK) in process_ack()
957 ceph_con_discard_sent(con, ack); in process_ack()
959 ceph_con_discard_requeued(con, ack); in process_ack()
961 prepare_read_tag(con); in process_ack()
964 static int read_partial_message_chunk(struct ceph_connection *con, in read_partial_message_chunk() argument
975 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + in read_partial_message_chunk()
987 static inline int read_partial_message_section(struct ceph_connection *con, in read_partial_message_section() argument
992 return read_partial_message_chunk(con, section, sec_len, crc); in read_partial_message_section()
995 static int read_partial_sparse_msg_extent(struct ceph_connection *con, u32 *crc) in read_partial_sparse_msg_extent() argument
997 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_partial_sparse_msg_extent()
998 bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE); in read_partial_sparse_msg_extent()
1000 if (do_bounce && unlikely(!con->bounce_page)) { in read_partial_sparse_msg_extent()
1001 con->bounce_page = alloc_page(GFP_NOIO); in read_partial_sparse_msg_extent()
1002 if (!con->bounce_page) { in read_partial_sparse_msg_extent()
1014 rpage = do_bounce ? con->bounce_page : page; in read_partial_sparse_msg_extent()
1018 ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len); in read_partial_sparse_msg_extent()
1030 static int read_partial_sparse_msg_data(struct ceph_connection *con) in read_partial_sparse_msg_data() argument
1032 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_partial_sparse_msg_data()
1033 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_sparse_msg_data()
1038 crc = con->in_data_crc; in read_partial_sparse_msg_data()
1041 if (con->v1.in_sr_kvec.iov_base) in read_partial_sparse_msg_data()
1042 ret = read_partial_message_chunk(con, in read_partial_sparse_msg_data()
1043 &con->v1.in_sr_kvec, in read_partial_sparse_msg_data()
1044 con->v1.in_sr_len, in read_partial_sparse_msg_data()
1047 ret = read_partial_sparse_msg_extent(con, &crc); in read_partial_sparse_msg_data()
1051 memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec)); in read_partial_sparse_msg_data()
1052 ret = con->ops->sparse_read(con, cursor, in read_partial_sparse_msg_data()
1053 (char **)&con->v1.in_sr_kvec.iov_base); in read_partial_sparse_msg_data()
1058 con->v1.in_sr_len = ret; in read_partial_sparse_msg_data()
1062 con->in_data_crc = crc; in read_partial_sparse_msg_data()
1067 static int read_partial_msg_data(struct ceph_connection *con) in read_partial_msg_data() argument
1069 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_partial_msg_data()
1070 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_msg_data()
1078 crc = con->in_data_crc; in read_partial_msg_data()
1086 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); in read_partial_msg_data()
1089 con->in_data_crc = crc; in read_partial_msg_data()
1099 con->in_data_crc = crc; in read_partial_msg_data()
1104 static int read_partial_msg_data_bounce(struct ceph_connection *con) in read_partial_msg_data_bounce() argument
1106 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_partial_msg_data_bounce()
1112 if (unlikely(!con->bounce_page)) { in read_partial_msg_data_bounce()
1113 con->bounce_page = alloc_page(GFP_NOIO); in read_partial_msg_data_bounce()
1114 if (!con->bounce_page) { in read_partial_msg_data_bounce()
1120 crc = con->in_data_crc; in read_partial_msg_data_bounce()
1128 ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len); in read_partial_msg_data_bounce()
1130 con->in_data_crc = crc; in read_partial_msg_data_bounce()
1134 crc = crc32c(crc, page_address(con->bounce_page), ret); in read_partial_msg_data_bounce()
1135 memcpy_to_page(page, off, page_address(con->bounce_page), ret); in read_partial_msg_data_bounce()
1139 con->in_data_crc = crc; in read_partial_msg_data_bounce()
1147 static int read_partial_message(struct ceph_connection *con) in read_partial_message() argument
1149 struct ceph_msg *m = con->in_msg; in read_partial_message()
1154 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_message()
1155 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); in read_partial_message()
1159 dout("read_partial_message con %p msg %p\n", con, m); in read_partial_message()
1162 size = sizeof(con->v1.in_hdr); in read_partial_message()
1164 ret = read_partial(con, end, size, &con->v1.in_hdr); in read_partial_message()
1168 crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc)); in read_partial_message()
1169 if (cpu_to_le32(crc) != con->v1.in_hdr.crc) { in read_partial_message()
1171 crc, con->v1.in_hdr.crc); in read_partial_message()
1175 front_len = le32_to_cpu(con->v1.in_hdr.front_len); in read_partial_message()
1178 middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); in read_partial_message()
1181 data_len = le32_to_cpu(con->v1.in_hdr.data_len); in read_partial_message()
1186 seq = le64_to_cpu(con->v1.in_hdr.seq); in read_partial_message()
1187 if ((s64)seq - (s64)con->in_seq < 1) { in read_partial_message()
1189 ENTITY_NAME(con->peer_name), in read_partial_message()
1190 ceph_pr_addr(&con->peer_addr), in read_partial_message()
1191 seq, con->in_seq + 1); in read_partial_message()
1192 con->v1.in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
1193 sizeof_footer(con); in read_partial_message()
1194 con->v1.in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
1196 } else if ((s64)seq - (s64)con->in_seq > 1) { in read_partial_message()
1198 seq, con->in_seq + 1); in read_partial_message()
1199 con->error_msg = "bad message sequence # for incoming message"; in read_partial_message()
1204 if (!con->in_msg) { in read_partial_message()
1207 dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type, in read_partial_message()
1209 ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip); in read_partial_message()
1213 BUG_ON((!con->in_msg) ^ skip); in read_partial_message()
1217 con->v1.in_base_pos = -front_len - middle_len - in read_partial_message()
1218 data_len - sizeof_footer(con); in read_partial_message()
1219 con->v1.in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
1220 con->in_seq++; in read_partial_message()
1224 BUG_ON(!con->in_msg); in read_partial_message()
1225 BUG_ON(con->in_msg->con != con); in read_partial_message()
1226 m = con->in_msg; in read_partial_message()
1234 prepare_message_data(con->in_msg, data_len); in read_partial_message()
1238 ret = read_partial_message_section(con, &m->front, front_len, in read_partial_message()
1239 &con->in_front_crc); in read_partial_message()
1245 ret = read_partial_message_section(con, &m->middle->vec, in read_partial_message()
1247 &con->in_middle_crc); in read_partial_message()
1258 ret = read_partial_sparse_msg_data(con); in read_partial_message()
1259 else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) in read_partial_message()
1260 ret = read_partial_msg_data_bounce(con); in read_partial_message()
1262 ret = read_partial_msg_data(con); in read_partial_message()
1268 size = sizeof_footer(con); in read_partial_message()
1270 ret = read_partial(con, end, size, &m->footer); in read_partial_message()
1284 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { in read_partial_message()
1286 m, con->in_front_crc, m->footer.front_crc); in read_partial_message()
1289 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { in read_partial_message()
1291 m, con->in_middle_crc, m->footer.middle_crc); in read_partial_message()
1296 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { in read_partial_message()
1298 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); in read_partial_message()
1302 if (need_sign && con->ops->check_message_signature && in read_partial_message()
1303 con->ops->check_message_signature(m)) { in read_partial_message()
1311 static int read_keepalive_ack(struct ceph_connection *con) in read_keepalive_ack() argument
1315 int ret = read_partial(con, size, size, &ceph_ts); in read_keepalive_ack()
1318 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); in read_keepalive_ack()
1319 prepare_read_tag(con); in read_keepalive_ack()
1326 int ceph_con_v1_try_read(struct ceph_connection *con) in ceph_con_v1_try_read() argument
1331 dout("try_read start %p state %d\n", con, con->state); in ceph_con_v1_try_read()
1332 if (con->state != CEPH_CON_S_V1_BANNER && in ceph_con_v1_try_read()
1333 con->state != CEPH_CON_S_V1_CONNECT_MSG && in ceph_con_v1_try_read()
1334 con->state != CEPH_CON_S_OPEN) in ceph_con_v1_try_read()
1337 BUG_ON(!con->sock); in ceph_con_v1_try_read()
1339 dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag, in ceph_con_v1_try_read()
1340 con->v1.in_base_pos); in ceph_con_v1_try_read()
1342 if (con->state == CEPH_CON_S_V1_BANNER) { in ceph_con_v1_try_read()
1343 ret = read_partial_banner(con); in ceph_con_v1_try_read()
1346 ret = process_banner(con); in ceph_con_v1_try_read()
1350 con->state = CEPH_CON_S_V1_CONNECT_MSG; in ceph_con_v1_try_read()
1357 ret = prepare_write_connect(con); in ceph_con_v1_try_read()
1360 prepare_read_connect(con); in ceph_con_v1_try_read()
1366 if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { in ceph_con_v1_try_read()
1367 ret = read_partial_connect(con); in ceph_con_v1_try_read()
1370 ret = process_connect(con); in ceph_con_v1_try_read()
1376 WARN_ON(con->state != CEPH_CON_S_OPEN); in ceph_con_v1_try_read()
1378 if (con->v1.in_base_pos < 0) { in ceph_con_v1_try_read()
1382 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos); in ceph_con_v1_try_read()
1385 dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos); in ceph_con_v1_try_read()
1386 con->v1.in_base_pos += ret; in ceph_con_v1_try_read()
1387 if (con->v1.in_base_pos) in ceph_con_v1_try_read()
1390 if (con->v1.in_tag == CEPH_MSGR_TAG_READY) { in ceph_con_v1_try_read()
1394 ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1); in ceph_con_v1_try_read()
1397 dout("try_read got tag %d\n", con->v1.in_tag); in ceph_con_v1_try_read()
1398 switch (con->v1.in_tag) { in ceph_con_v1_try_read()
1400 prepare_read_message(con); in ceph_con_v1_try_read()
1403 prepare_read_ack(con); in ceph_con_v1_try_read()
1406 prepare_read_keepalive_ack(con); in ceph_con_v1_try_read()
1409 ceph_con_close_socket(con); in ceph_con_v1_try_read()
1410 con->state = CEPH_CON_S_CLOSED; in ceph_con_v1_try_read()
1416 if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) { in ceph_con_v1_try_read()
1417 ret = read_partial_message(con); in ceph_con_v1_try_read()
1421 con->error_msg = "bad crc/signature"; in ceph_con_v1_try_read()
1427 con->error_msg = "io error"; in ceph_con_v1_try_read()
1432 if (con->v1.in_tag == CEPH_MSGR_TAG_READY) in ceph_con_v1_try_read()
1434 ceph_con_process_message(con); in ceph_con_v1_try_read()
1435 if (con->state == CEPH_CON_S_OPEN) in ceph_con_v1_try_read()
1436 prepare_read_tag(con); in ceph_con_v1_try_read()
1439 if (con->v1.in_tag == CEPH_MSGR_TAG_ACK || in ceph_con_v1_try_read()
1440 con->v1.in_tag == CEPH_MSGR_TAG_SEQ) { in ceph_con_v1_try_read()
1445 ret = read_partial_ack(con); in ceph_con_v1_try_read()
1448 process_ack(con); in ceph_con_v1_try_read()
1451 if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { in ceph_con_v1_try_read()
1452 ret = read_keepalive_ack(con); in ceph_con_v1_try_read()
1459 dout("try_read done on %p ret %d\n", con, ret); in ceph_con_v1_try_read()
1463 pr_err("try_read bad tag %d\n", con->v1.in_tag); in ceph_con_v1_try_read()
1464 con->error_msg = "protocol error, garbage tag"; in ceph_con_v1_try_read()
1473 int ceph_con_v1_try_write(struct ceph_connection *con) in ceph_con_v1_try_write() argument
1477 dout("try_write start %p state %d\n", con, con->state); in ceph_con_v1_try_write()
1478 if (con->state != CEPH_CON_S_PREOPEN && in ceph_con_v1_try_write()
1479 con->state != CEPH_CON_S_V1_BANNER && in ceph_con_v1_try_write()
1480 con->state != CEPH_CON_S_V1_CONNECT_MSG && in ceph_con_v1_try_write()
1481 con->state != CEPH_CON_S_OPEN) in ceph_con_v1_try_write()
1485 if (con->state == CEPH_CON_S_PREOPEN) { in ceph_con_v1_try_write()
1486 BUG_ON(con->sock); in ceph_con_v1_try_write()
1487 con->state = CEPH_CON_S_V1_BANNER; in ceph_con_v1_try_write()
1489 con_out_kvec_reset(con); in ceph_con_v1_try_write()
1490 prepare_write_banner(con); in ceph_con_v1_try_write()
1491 prepare_read_banner(con); in ceph_con_v1_try_write()
1493 BUG_ON(con->in_msg); in ceph_con_v1_try_write()
1494 con->v1.in_tag = CEPH_MSGR_TAG_READY; in ceph_con_v1_try_write()
1496 con, con->state); in ceph_con_v1_try_write()
1497 ret = ceph_tcp_connect(con); in ceph_con_v1_try_write()
1499 con->error_msg = "connect error"; in ceph_con_v1_try_write()
1505 dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes); in ceph_con_v1_try_write()
1506 BUG_ON(!con->sock); in ceph_con_v1_try_write()
1509 if (con->v1.out_kvec_left) { in ceph_con_v1_try_write()
1510 ret = write_partial_kvec(con); in ceph_con_v1_try_write()
1514 if (con->v1.out_skip) { in ceph_con_v1_try_write()
1515 ret = write_partial_skip(con); in ceph_con_v1_try_write()
1521 if (con->out_msg) { in ceph_con_v1_try_write()
1522 if (con->v1.out_msg_done) { in ceph_con_v1_try_write()
1523 ceph_msg_put(con->out_msg); in ceph_con_v1_try_write()
1524 con->out_msg = NULL; /* we're done with this one */ in ceph_con_v1_try_write()
1528 ret = write_partial_message_data(con); in ceph_con_v1_try_write()
1541 if (con->state == CEPH_CON_S_OPEN) { in ceph_con_v1_try_write()
1542 if (ceph_con_flag_test_and_clear(con, in ceph_con_v1_try_write()
1544 prepare_write_keepalive(con); in ceph_con_v1_try_write()
1548 if (!list_empty(&con->out_queue)) { in ceph_con_v1_try_write()
1549 prepare_write_message(con); in ceph_con_v1_try_write()
1552 if (con->in_seq > con->in_seq_acked) { in ceph_con_v1_try_write()
1553 prepare_write_ack(con); in ceph_con_v1_try_write()
1559 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); in ceph_con_v1_try_write()
1563 dout("try_write done on %p ret %d\n", con, ret); in ceph_con_v1_try_write()
1567 void ceph_con_v1_revoke(struct ceph_connection *con) in ceph_con_v1_revoke() argument
1569 struct ceph_msg *msg = con->out_msg; in ceph_con_v1_revoke()
1571 WARN_ON(con->v1.out_skip); in ceph_con_v1_revoke()
1573 if (con->v1.out_msg_done) { in ceph_con_v1_revoke()
1574 con->v1.out_skip += con_out_kvec_skip(con); in ceph_con_v1_revoke()
1577 con->v1.out_skip += sizeof_footer(con); in ceph_con_v1_revoke()
1581 con->v1.out_skip += msg->cursor.total_resid; in ceph_con_v1_revoke()
1583 con->v1.out_skip += con_out_kvec_skip(con); in ceph_con_v1_revoke()
1584 con->v1.out_skip += con_out_kvec_skip(con); in ceph_con_v1_revoke()
1586 dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, in ceph_con_v1_revoke()
1587 con->v1.out_kvec_bytes, con->v1.out_skip); in ceph_con_v1_revoke()
1590 void ceph_con_v1_revoke_incoming(struct ceph_connection *con) in ceph_con_v1_revoke_incoming() argument
1592 unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len); in ceph_con_v1_revoke_incoming()
1593 unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); in ceph_con_v1_revoke_incoming()
1594 unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len); in ceph_con_v1_revoke_incoming()
1597 con->v1.in_base_pos = con->v1.in_base_pos - in ceph_con_v1_revoke_incoming()
1604 con->v1.in_tag = CEPH_MSGR_TAG_READY; in ceph_con_v1_revoke_incoming()
1605 con->in_seq++; in ceph_con_v1_revoke_incoming()
1607 dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos); in ceph_con_v1_revoke_incoming()
1610 bool ceph_con_v1_opened(struct ceph_connection *con) in ceph_con_v1_opened() argument
1612 return con->v1.connect_seq; in ceph_con_v1_opened()
1615 void ceph_con_v1_reset_session(struct ceph_connection *con) in ceph_con_v1_reset_session() argument
1617 con->v1.connect_seq = 0; in ceph_con_v1_reset_session()
1618 con->v1.peer_global_seq = 0; in ceph_con_v1_reset_session()
1621 void ceph_con_v1_reset_protocol(struct ceph_connection *con) in ceph_con_v1_reset_protocol() argument
1623 con->v1.out_skip = 0; in ceph_con_v1_reset_protocol()