653
657
* Connection negotiation.
656
static int prepare_connect_authorizer(struct ceph_connection *con)
660
static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
660
int auth_protocol = 0;
663
struct ceph_auth_handshake *auth;
665
if (!con->ops->get_authorizer) {
666
con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
667
con->out_connect.authorizer_len = 0;
672
/* Can't hold the mutex while getting authorizer */
662
674
mutex_unlock(&con->mutex);
663
if (con->ops->get_authorizer)
664
con->ops->get_authorizer(con, &auth_buf, &auth_len,
665
&auth_protocol, &con->auth_reply_buf,
666
&con->auth_reply_buf_len,
676
auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
668
678
mutex_lock(&con->mutex);
670
if (test_bit(CLOSED, &con->state) ||
671
test_bit(OPENING, &con->state))
674
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
675
con->out_connect.authorizer_len = cpu_to_le32(auth_len);
678
ceph_con_out_kvec_add(con, auth_len, auth_buf);
682
if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
683
return ERR_PTR(-EAGAIN);
685
con->auth_reply_buf = auth->authorizer_reply_buf;
686
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
684
693
* We connected to a peer and are saying hello.
686
static void prepare_write_banner(struct ceph_messenger *msgr,
687
struct ceph_connection *con)
695
static void prepare_write_banner(struct ceph_connection *con)
689
ceph_con_out_kvec_reset(con);
690
697
ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
691
ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
698
ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
699
&con->msgr->my_enc_addr);
694
701
con->out_more = 0;
695
702
set_bit(WRITE_PENDING, &con->state);
698
static int prepare_write_connect(struct ceph_messenger *msgr,
699
struct ceph_connection *con,
705
static int prepare_write_connect(struct ceph_connection *con)
702
unsigned global_seq = get_global_seq(con->msgr, 0);
707
unsigned int global_seq = get_global_seq(con->msgr, 0);
710
struct ceph_auth_handshake *auth;
705
712
switch (con->peer_name.type) {
706
713
case CEPH_ENTITY_TYPE_MON:
719
726
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
720
727
con->connect_seq, global_seq, proto);
722
con->out_connect.features = cpu_to_le64(msgr->supported_features);
729
con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
723
730
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
724
731
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
725
732
con->out_connect.global_seq = cpu_to_le32(global_seq);
726
733
con->out_connect.protocol_version = cpu_to_le32(proto);
727
734
con->out_connect.flags = 0;
730
prepare_write_banner(msgr, con);
732
ceph_con_out_kvec_reset(con);
733
ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
736
auth_proto = CEPH_AUTH_UNKNOWN;
737
auth = get_connect_authorizer(con, &auth_proto);
739
return PTR_ERR(auth);
741
con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
742
con->out_connect.authorizer_len = auth ?
743
cpu_to_le32(auth->authorizer_buf_len) : 0;
745
ceph_con_out_kvec_add(con, sizeof (con->out_connect),
747
if (auth && auth->authorizer_buf_len)
748
ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
749
auth->authorizer_buf);
735
751
con->out_more = 0;
736
752
set_bit(WRITE_PENDING, &con->state);
738
return prepare_connect_authorizer(con);
994
1010
static int read_partial(struct ceph_connection *con,
995
int *to, int size, void *object)
1011
int end, int size, void *object)
998
while (con->in_base_pos < *to) {
999
int left = *to - con->in_base_pos;
1013
while (con->in_base_pos < end) {
1014
int left = end - con->in_base_pos;
1000
1015
int have = size - left;
1001
1016
int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1013
1028
static int read_partial_banner(struct ceph_connection *con)
1017
1034
dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1019
1036
/* peer's banner */
1020
ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
1023
ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
1024
&con->actual_peer_addr);
1027
ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
1028
&con->peer_addr_for_me);
1037
size = strlen(CEPH_BANNER);
1039
ret = read_partial(con, end, size, con->in_banner);
1043
size = sizeof (con->actual_peer_addr);
1045
ret = read_partial(con, end, size, &con->actual_peer_addr);
1049
size = sizeof (con->peer_addr_for_me);
1051
ret = read_partial(con, end, size, &con->peer_addr_for_me);
1035
1059
static int read_partial_connect(struct ceph_connection *con)
1039
1065
dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1041
ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
1067
size = sizeof (con->in_reply);
1069
ret = read_partial(con, end, size, &con->in_reply);
1044
ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
1045
con->auth_reply_buf);
1073
size = le32_to_cpu(con->in_reply.authorizer_len);
1075
ret = read_partial(con, end, size, con->auth_reply_buf);
1392
1423
* dropped messages.
1394
1425
dout("process_connect got RESET peer seq %u\n",
1395
le32_to_cpu(con->in_connect.connect_seq));
1426
le32_to_cpu(con->in_reply.connect_seq));
1396
1427
pr_err("%s%lld %s connection reset\n",
1397
1428
ENTITY_NAME(con->peer_name),
1398
1429
ceph_pr_addr(&con->peer_addr.in_addr));
1399
1430
reset_connection(con);
1400
prepare_write_connect(con->msgr, con, 0);
1431
ceph_con_out_kvec_reset(con);
1432
ret = prepare_write_connect(con);
1401
1435
prepare_read_connect(con);
1403
1437
/* Tell ceph about it. */
1416
1450
* If we sent a smaller connect_seq than the peer has, try
1417
1451
* again with a larger value.
1419
dout("process_connect got RETRY my seq = %u, peer_seq = %u\n",
1453
dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
1420
1454
le32_to_cpu(con->out_connect.connect_seq),
1421
le32_to_cpu(con->in_connect.connect_seq));
1422
con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1423
prepare_write_connect(con->msgr, con, 0);
1455
le32_to_cpu(con->in_reply.connect_seq));
1456
con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
1457
ceph_con_out_kvec_reset(con);
1458
ret = prepare_write_connect(con);
1424
1461
prepare_read_connect(con);
1432
1469
dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
1433
1470
con->peer_global_seq,
1434
le32_to_cpu(con->in_connect.global_seq));
1471
le32_to_cpu(con->in_reply.global_seq));
1435
1472
get_global_seq(con->msgr,
1436
le32_to_cpu(con->in_connect.global_seq));
1437
prepare_write_connect(con->msgr, con, 0);
1473
le32_to_cpu(con->in_reply.global_seq));
1474
ceph_con_out_kvec_reset(con);
1475
ret = prepare_write_connect(con);
1438
1478
prepare_read_connect(con);
1587
1627
#ifdef CONFIG_BLOCK
1588
1628
static int read_partial_message_bio(struct ceph_connection *con,
1589
1629
struct bio **bio_iter, int *bio_seg,
1590
unsigned data_len, bool do_datacrc)
1630
unsigned int data_len, bool do_datacrc)
1592
1632
struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1638
1679
dout("read_partial_message con %p msg %p\n", con, m);
1641
while (con->in_base_pos < sizeof(con->in_hdr)) {
1642
left = sizeof(con->in_hdr) - con->in_base_pos;
1643
ret = ceph_tcp_recvmsg(con->sock,
1644
(char *)&con->in_hdr + con->in_base_pos,
1648
con->in_base_pos += ret;
1682
size = sizeof (con->in_hdr);
1684
ret = read_partial(con, end, size, &con->in_hdr);
1651
1688
crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
1652
1689
if (cpu_to_le32(crc) != con->in_hdr.crc) {
1762
to = sizeof(m->hdr) + sizeof(m->footer);
1763
while (con->in_base_pos < to) {
1764
left = to - con->in_base_pos;
1765
ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
1766
(con->in_base_pos - sizeof(m->hdr)),
1770
con->in_base_pos += ret;
1799
size = sizeof (m->footer);
1801
ret = read_partial(con, end, size, &m->footer);
1772
1805
dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1773
1806
m, front_len, m->footer.front_crc, middle_len,
1774
1807
m->footer.middle_crc, data_len, m->footer.data_crc);
1847
1879
/* open the socket first? */
1848
1880
if (con->sock == NULL) {
1849
prepare_write_connect(msgr, con, 1);
1881
ceph_con_out_kvec_reset(con);
1882
prepare_write_banner(con);
1883
ret = prepare_write_connect(con);
1850
1886
prepare_read_banner(con);
1851
1887
set_bit(CONNECTING, &con->state);
1852
1888
clear_bit(NEGOTIATING, &con->state);
2346
2382
mutex_lock(&con->mutex);
2347
2383
if (con->in_msg && con->in_msg == msg) {
2348
unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
2349
unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
2350
unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
2384
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
2385
unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
2386
unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
2352
2388
/* skip rest of message */
2353
2389
dout("con_revoke_pages %p msg %p revoked\n", con, msg);