1  // SPDX-License-Identifier: GPL-2.0
2  
3  #include <linux/ceph/ceph_debug.h>
4  
5  #include <linux/module.h>
6  #include <linux/err.h>
7  #include <linux/highmem.h>
8  #include <linux/mm.h>
9  #include <linux/pagemap.h>
10  #include <linux/slab.h>
11  #include <linux/uaccess.h>
12  #ifdef CONFIG_BLOCK
13  #include <linux/bio.h>
14  #endif
15  
16  #include <linux/ceph/ceph_features.h>
17  #include <linux/ceph/libceph.h>
18  #include <linux/ceph/osd_client.h>
19  #include <linux/ceph/messenger.h>
20  #include <linux/ceph/decode.h>
21  #include <linux/ceph/auth.h>
22  #include <linux/ceph/pagelist.h>
23  #include <linux/ceph/striper.h>
24  
25  #define OSD_OPREPLY_FRONT_LEN	512
26  
27  static struct kmem_cache	*ceph_osd_request_cache;
28  
29  static const struct ceph_connection_operations osd_con_ops;
30  
31  /*
32   * Implement client access to distributed object storage cluster.
33   *
34   * All data objects are stored within a cluster/cloud of OSDs, or
35   * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
36   * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
37   * remote daemons serving up and coordinating consistent and safe
38   * access to storage.
39   *
40   * Cluster membership and the mapping of data objects onto storage devices
41   * are described by the osd map.
42   *
43   * We keep track of pending OSD requests (read, write), resubmit
44   * requests to different OSDs when the cluster topology/data layout
45   * change, or retry the affected requests when the communications
46   * channel with an OSD is reset.
47   */
48  
49  static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50  static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51  static void link_linger(struct ceph_osd *osd,
52  			struct ceph_osd_linger_request *lreq);
53  static void unlink_linger(struct ceph_osd *osd,
54  			  struct ceph_osd_linger_request *lreq);
55  static void clear_backoffs(struct ceph_osd *osd);
56  
57  #if 1
rwsem_is_wrlocked(struct rw_semaphore * sem)58  static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
59  {
60  	bool wrlocked = true;
61  
62  	if (unlikely(down_read_trylock(sem))) {
63  		wrlocked = false;
64  		up_read(sem);
65  	}
66  
67  	return wrlocked;
68  }
verify_osdc_locked(struct ceph_osd_client * osdc)69  static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
70  {
71  	WARN_ON(!rwsem_is_locked(&osdc->lock));
72  }
verify_osdc_wrlocked(struct ceph_osd_client * osdc)73  static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
74  {
75  	WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
76  }
verify_osd_locked(struct ceph_osd * osd)77  static inline void verify_osd_locked(struct ceph_osd *osd)
78  {
79  	struct ceph_osd_client *osdc = osd->o_osdc;
80  
81  	WARN_ON(!(mutex_is_locked(&osd->lock) &&
82  		  rwsem_is_locked(&osdc->lock)) &&
83  		!rwsem_is_wrlocked(&osdc->lock));
84  }
verify_lreq_locked(struct ceph_osd_linger_request * lreq)85  static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
86  {
87  	WARN_ON(!mutex_is_locked(&lreq->lock));
88  }
89  #else
verify_osdc_locked(struct ceph_osd_client * osdc)90  static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
verify_osdc_wrlocked(struct ceph_osd_client * osdc)91  static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
verify_osd_locked(struct ceph_osd * osd)92  static inline void verify_osd_locked(struct ceph_osd *osd) { }
verify_lreq_locked(struct ceph_osd_linger_request * lreq)93  static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
94  #endif
95  
96  /*
97   * calculate the mapping of a file extent onto an object, and fill out the
98   * request accordingly.  shorten extent as necessary if it crosses an
99   * object boundary.
100   *
101   * fill osd op in request message.
102   */
calc_layout(struct ceph_file_layout * layout,u64 off,u64 * plen,u64 * objnum,u64 * objoff,u64 * objlen)103  static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
104  			u64 *objnum, u64 *objoff, u64 *objlen)
105  {
106  	u64 orig_len = *plen;
107  	u32 xlen;
108  
109  	/* object extent? */
110  	ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
111  					  objoff, &xlen);
112  	*objlen = xlen;
113  	if (*objlen < orig_len) {
114  		*plen = *objlen;
115  		dout(" skipping last %llu, final file extent %llu~%llu\n",
116  		     orig_len - *plen, off, *plen);
117  	}
118  
119  	dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
120  	return 0;
121  }
122  
ceph_osd_data_init(struct ceph_osd_data * osd_data)123  static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
124  {
125  	memset(osd_data, 0, sizeof (*osd_data));
126  	osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127  }
128  
129  /*
130   * Consumes @pages if @own_pages is true.
131   */
ceph_osd_data_pages_init(struct ceph_osd_data * osd_data,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)132  static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
133  			struct page **pages, u64 length, u32 alignment,
134  			bool pages_from_pool, bool own_pages)
135  {
136  	osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
137  	osd_data->pages = pages;
138  	osd_data->length = length;
139  	osd_data->alignment = alignment;
140  	osd_data->pages_from_pool = pages_from_pool;
141  	osd_data->own_pages = own_pages;
142  }
143  
144  /*
145   * Consumes a ref on @pagelist.
146   */
ceph_osd_data_pagelist_init(struct ceph_osd_data * osd_data,struct ceph_pagelist * pagelist)147  static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
148  			struct ceph_pagelist *pagelist)
149  {
150  	osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
151  	osd_data->pagelist = pagelist;
152  }
153  
154  #ifdef CONFIG_BLOCK
ceph_osd_data_bio_init(struct ceph_osd_data * osd_data,struct ceph_bio_iter * bio_pos,u32 bio_length)155  static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
156  				   struct ceph_bio_iter *bio_pos,
157  				   u32 bio_length)
158  {
159  	osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
160  	osd_data->bio_pos = *bio_pos;
161  	osd_data->bio_length = bio_length;
162  }
163  #endif /* CONFIG_BLOCK */
164  
ceph_osd_data_bvecs_init(struct ceph_osd_data * osd_data,struct ceph_bvec_iter * bvec_pos,u32 num_bvecs)165  static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
166  				     struct ceph_bvec_iter *bvec_pos,
167  				     u32 num_bvecs)
168  {
169  	osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
170  	osd_data->bvec_pos = *bvec_pos;
171  	osd_data->num_bvecs = num_bvecs;
172  }
173  
ceph_osd_iter_init(struct ceph_osd_data * osd_data,struct iov_iter * iter)174  static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
175  			       struct iov_iter *iter)
176  {
177  	osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
178  	osd_data->iter = *iter;
179  }
180  
181  static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request * osd_req,unsigned int which)182  osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
183  {
184  	BUG_ON(which >= osd_req->r_num_ops);
185  
186  	return &osd_req->r_ops[which].raw_data_in;
187  }
188  
189  struct ceph_osd_data *
osd_req_op_extent_osd_data(struct ceph_osd_request * osd_req,unsigned int which)190  osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
191  			unsigned int which)
192  {
193  	return osd_req_op_data(osd_req, which, extent, osd_data);
194  }
195  EXPORT_SYMBOL(osd_req_op_extent_osd_data);
196  
osd_req_op_raw_data_in_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)197  void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
198  			unsigned int which, struct page **pages,
199  			u64 length, u32 alignment,
200  			bool pages_from_pool, bool own_pages)
201  {
202  	struct ceph_osd_data *osd_data;
203  
204  	osd_data = osd_req_op_raw_data_in(osd_req, which);
205  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
206  				pages_from_pool, own_pages);
207  }
208  EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
209  
osd_req_op_extent_osd_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)210  void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
211  			unsigned int which, struct page **pages,
212  			u64 length, u32 alignment,
213  			bool pages_from_pool, bool own_pages)
214  {
215  	struct ceph_osd_data *osd_data;
216  
217  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
218  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
219  				pages_from_pool, own_pages);
220  }
221  EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
222  
osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_pagelist * pagelist)223  void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
224  			unsigned int which, struct ceph_pagelist *pagelist)
225  {
226  	struct ceph_osd_data *osd_data;
227  
228  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
229  	ceph_osd_data_pagelist_init(osd_data, pagelist);
230  }
231  EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
232  
233  #ifdef CONFIG_BLOCK
osd_req_op_extent_osd_data_bio(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_bio_iter * bio_pos,u32 bio_length)234  void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
235  				    unsigned int which,
236  				    struct ceph_bio_iter *bio_pos,
237  				    u32 bio_length)
238  {
239  	struct ceph_osd_data *osd_data;
240  
241  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
242  	ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
243  }
244  EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
245  #endif /* CONFIG_BLOCK */
246  
osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request * osd_req,unsigned int which,struct bio_vec * bvecs,u32 num_bvecs,u32 bytes)247  void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
248  				      unsigned int which,
249  				      struct bio_vec *bvecs, u32 num_bvecs,
250  				      u32 bytes)
251  {
252  	struct ceph_osd_data *osd_data;
253  	struct ceph_bvec_iter it = {
254  		.bvecs = bvecs,
255  		.iter = { .bi_size = bytes },
256  	};
257  
258  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
259  	ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
260  }
261  EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
262  
osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_bvec_iter * bvec_pos)263  void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
264  					 unsigned int which,
265  					 struct ceph_bvec_iter *bvec_pos)
266  {
267  	struct ceph_osd_data *osd_data;
268  
269  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
270  	ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
271  }
272  EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
273  
274  /**
275   * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
276   * @osd_req: The request to set up
277   * @which: Index of the operation in which to set the iter
278   * @iter: The buffer iterator
279   */
osd_req_op_extent_osd_iter(struct ceph_osd_request * osd_req,unsigned int which,struct iov_iter * iter)280  void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
281  				unsigned int which, struct iov_iter *iter)
282  {
283  	struct ceph_osd_data *osd_data;
284  
285  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
286  	ceph_osd_iter_init(osd_data, iter);
287  }
288  EXPORT_SYMBOL(osd_req_op_extent_osd_iter);
289  
osd_req_op_cls_request_info_pagelist(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_pagelist * pagelist)290  static void osd_req_op_cls_request_info_pagelist(
291  			struct ceph_osd_request *osd_req,
292  			unsigned int which, struct ceph_pagelist *pagelist)
293  {
294  	struct ceph_osd_data *osd_data;
295  
296  	osd_data = osd_req_op_data(osd_req, which, cls, request_info);
297  	ceph_osd_data_pagelist_init(osd_data, pagelist);
298  }
299  
osd_req_op_cls_request_data_pagelist(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_pagelist * pagelist)300  void osd_req_op_cls_request_data_pagelist(
301  			struct ceph_osd_request *osd_req,
302  			unsigned int which, struct ceph_pagelist *pagelist)
303  {
304  	struct ceph_osd_data *osd_data;
305  
306  	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
307  	ceph_osd_data_pagelist_init(osd_data, pagelist);
308  	osd_req->r_ops[which].cls.indata_len += pagelist->length;
309  	osd_req->r_ops[which].indata_len += pagelist->length;
310  }
311  EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
312  
osd_req_op_cls_request_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)313  void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
314  			unsigned int which, struct page **pages, u64 length,
315  			u32 alignment, bool pages_from_pool, bool own_pages)
316  {
317  	struct ceph_osd_data *osd_data;
318  
319  	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
320  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
321  				pages_from_pool, own_pages);
322  	osd_req->r_ops[which].cls.indata_len += length;
323  	osd_req->r_ops[which].indata_len += length;
324  }
325  EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
326  
osd_req_op_cls_request_data_bvecs(struct ceph_osd_request * osd_req,unsigned int which,struct bio_vec * bvecs,u32 num_bvecs,u32 bytes)327  void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
328  				       unsigned int which,
329  				       struct bio_vec *bvecs, u32 num_bvecs,
330  				       u32 bytes)
331  {
332  	struct ceph_osd_data *osd_data;
333  	struct ceph_bvec_iter it = {
334  		.bvecs = bvecs,
335  		.iter = { .bi_size = bytes },
336  	};
337  
338  	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
339  	ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
340  	osd_req->r_ops[which].cls.indata_len += bytes;
341  	osd_req->r_ops[which].indata_len += bytes;
342  }
343  EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
344  
osd_req_op_cls_response_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)345  void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
346  			unsigned int which, struct page **pages, u64 length,
347  			u32 alignment, bool pages_from_pool, bool own_pages)
348  {
349  	struct ceph_osd_data *osd_data;
350  
351  	osd_data = osd_req_op_data(osd_req, which, cls, response_data);
352  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
353  				pages_from_pool, own_pages);
354  }
355  EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
356  
ceph_osd_data_length(struct ceph_osd_data * osd_data)357  static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
358  {
359  	switch (osd_data->type) {
360  	case CEPH_OSD_DATA_TYPE_NONE:
361  		return 0;
362  	case CEPH_OSD_DATA_TYPE_PAGES:
363  		return osd_data->length;
364  	case CEPH_OSD_DATA_TYPE_PAGELIST:
365  		return (u64)osd_data->pagelist->length;
366  #ifdef CONFIG_BLOCK
367  	case CEPH_OSD_DATA_TYPE_BIO:
368  		return (u64)osd_data->bio_length;
369  #endif /* CONFIG_BLOCK */
370  	case CEPH_OSD_DATA_TYPE_BVECS:
371  		return osd_data->bvec_pos.iter.bi_size;
372  	case CEPH_OSD_DATA_TYPE_ITER:
373  		return iov_iter_count(&osd_data->iter);
374  	default:
375  		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
376  		return 0;
377  	}
378  }
379  
ceph_osd_data_release(struct ceph_osd_data * osd_data)380  static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
381  {
382  	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
383  		int num_pages;
384  
385  		num_pages = calc_pages_for((u64)osd_data->alignment,
386  						(u64)osd_data->length);
387  		ceph_release_page_vector(osd_data->pages, num_pages);
388  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
389  		ceph_pagelist_release(osd_data->pagelist);
390  	}
391  	ceph_osd_data_init(osd_data);
392  }
393  
osd_req_op_data_release(struct ceph_osd_request * osd_req,unsigned int which)394  static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
395  			unsigned int which)
396  {
397  	struct ceph_osd_req_op *op;
398  
399  	BUG_ON(which >= osd_req->r_num_ops);
400  	op = &osd_req->r_ops[which];
401  
402  	switch (op->op) {
403  	case CEPH_OSD_OP_READ:
404  	case CEPH_OSD_OP_SPARSE_READ:
405  	case CEPH_OSD_OP_WRITE:
406  	case CEPH_OSD_OP_WRITEFULL:
407  		kfree(op->extent.sparse_ext);
408  		ceph_osd_data_release(&op->extent.osd_data);
409  		break;
410  	case CEPH_OSD_OP_CALL:
411  		ceph_osd_data_release(&op->cls.request_info);
412  		ceph_osd_data_release(&op->cls.request_data);
413  		ceph_osd_data_release(&op->cls.response_data);
414  		break;
415  	case CEPH_OSD_OP_SETXATTR:
416  	case CEPH_OSD_OP_CMPXATTR:
417  		ceph_osd_data_release(&op->xattr.osd_data);
418  		break;
419  	case CEPH_OSD_OP_STAT:
420  		ceph_osd_data_release(&op->raw_data_in);
421  		break;
422  	case CEPH_OSD_OP_NOTIFY_ACK:
423  		ceph_osd_data_release(&op->notify_ack.request_data);
424  		break;
425  	case CEPH_OSD_OP_NOTIFY:
426  		ceph_osd_data_release(&op->notify.request_data);
427  		ceph_osd_data_release(&op->notify.response_data);
428  		break;
429  	case CEPH_OSD_OP_LIST_WATCHERS:
430  		ceph_osd_data_release(&op->list_watchers.response_data);
431  		break;
432  	case CEPH_OSD_OP_COPY_FROM2:
433  		ceph_osd_data_release(&op->copy_from.osd_data);
434  		break;
435  	default:
436  		break;
437  	}
438  }
439  
440  /*
441   * Assumes @t is zero-initialized.
442   */
target_init(struct ceph_osd_request_target * t)443  static void target_init(struct ceph_osd_request_target *t)
444  {
445  	ceph_oid_init(&t->base_oid);
446  	ceph_oloc_init(&t->base_oloc);
447  	ceph_oid_init(&t->target_oid);
448  	ceph_oloc_init(&t->target_oloc);
449  
450  	ceph_osds_init(&t->acting);
451  	ceph_osds_init(&t->up);
452  	t->size = -1;
453  	t->min_size = -1;
454  
455  	t->osd = CEPH_HOMELESS_OSD;
456  }
457  
target_copy(struct ceph_osd_request_target * dest,const struct ceph_osd_request_target * src)458  static void target_copy(struct ceph_osd_request_target *dest,
459  			const struct ceph_osd_request_target *src)
460  {
461  	ceph_oid_copy(&dest->base_oid, &src->base_oid);
462  	ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
463  	ceph_oid_copy(&dest->target_oid, &src->target_oid);
464  	ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
465  
466  	dest->pgid = src->pgid; /* struct */
467  	dest->spgid = src->spgid; /* struct */
468  	dest->pg_num = src->pg_num;
469  	dest->pg_num_mask = src->pg_num_mask;
470  	ceph_osds_copy(&dest->acting, &src->acting);
471  	ceph_osds_copy(&dest->up, &src->up);
472  	dest->size = src->size;
473  	dest->min_size = src->min_size;
474  	dest->sort_bitwise = src->sort_bitwise;
475  	dest->recovery_deletes = src->recovery_deletes;
476  
477  	dest->flags = src->flags;
478  	dest->used_replica = src->used_replica;
479  	dest->paused = src->paused;
480  
481  	dest->epoch = src->epoch;
482  	dest->last_force_resend = src->last_force_resend;
483  
484  	dest->osd = src->osd;
485  }
486  
target_destroy(struct ceph_osd_request_target * t)487  static void target_destroy(struct ceph_osd_request_target *t)
488  {
489  	ceph_oid_destroy(&t->base_oid);
490  	ceph_oloc_destroy(&t->base_oloc);
491  	ceph_oid_destroy(&t->target_oid);
492  	ceph_oloc_destroy(&t->target_oloc);
493  }
494  
495  /*
496   * requests
497   */
request_release_checks(struct ceph_osd_request * req)498  static void request_release_checks(struct ceph_osd_request *req)
499  {
500  	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
501  	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
502  	WARN_ON(!list_empty(&req->r_private_item));
503  	WARN_ON(req->r_osd);
504  }
505  
ceph_osdc_release_request(struct kref * kref)506  static void ceph_osdc_release_request(struct kref *kref)
507  {
508  	struct ceph_osd_request *req = container_of(kref,
509  					    struct ceph_osd_request, r_kref);
510  	unsigned int which;
511  
512  	dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
513  	     req->r_request, req->r_reply);
514  	request_release_checks(req);
515  
516  	if (req->r_request)
517  		ceph_msg_put(req->r_request);
518  	if (req->r_reply)
519  		ceph_msg_put(req->r_reply);
520  
521  	for (which = 0; which < req->r_num_ops; which++)
522  		osd_req_op_data_release(req, which);
523  
524  	target_destroy(&req->r_t);
525  	ceph_put_snap_context(req->r_snapc);
526  
527  	if (req->r_mempool)
528  		mempool_free(req, req->r_osdc->req_mempool);
529  	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
530  		kmem_cache_free(ceph_osd_request_cache, req);
531  	else
532  		kfree(req);
533  }
534  
ceph_osdc_get_request(struct ceph_osd_request * req)535  void ceph_osdc_get_request(struct ceph_osd_request *req)
536  {
537  	dout("%s %p (was %d)\n", __func__, req,
538  	     kref_read(&req->r_kref));
539  	kref_get(&req->r_kref);
540  }
541  EXPORT_SYMBOL(ceph_osdc_get_request);
542  
ceph_osdc_put_request(struct ceph_osd_request * req)543  void ceph_osdc_put_request(struct ceph_osd_request *req)
544  {
545  	if (req) {
546  		dout("%s %p (was %d)\n", __func__, req,
547  		     kref_read(&req->r_kref));
548  		kref_put(&req->r_kref, ceph_osdc_release_request);
549  	}
550  }
551  EXPORT_SYMBOL(ceph_osdc_put_request);
552  
request_init(struct ceph_osd_request * req)553  static void request_init(struct ceph_osd_request *req)
554  {
555  	/* req only, each op is zeroed in osd_req_op_init() */
556  	memset(req, 0, sizeof(*req));
557  
558  	kref_init(&req->r_kref);
559  	init_completion(&req->r_completion);
560  	RB_CLEAR_NODE(&req->r_node);
561  	RB_CLEAR_NODE(&req->r_mc_node);
562  	INIT_LIST_HEAD(&req->r_private_item);
563  
564  	target_init(&req->r_t);
565  }
566  
ceph_osdc_alloc_request(struct ceph_osd_client * osdc,struct ceph_snap_context * snapc,unsigned int num_ops,bool use_mempool,gfp_t gfp_flags)567  struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
568  					       struct ceph_snap_context *snapc,
569  					       unsigned int num_ops,
570  					       bool use_mempool,
571  					       gfp_t gfp_flags)
572  {
573  	struct ceph_osd_request *req;
574  
575  	if (use_mempool) {
576  		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
577  		req = mempool_alloc(osdc->req_mempool, gfp_flags);
578  	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
579  		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
580  	} else {
581  		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
582  		req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
583  	}
584  	if (unlikely(!req))
585  		return NULL;
586  
587  	request_init(req);
588  	req->r_osdc = osdc;
589  	req->r_mempool = use_mempool;
590  	req->r_num_ops = num_ops;
591  	req->r_snapid = CEPH_NOSNAP;
592  	req->r_snapc = ceph_get_snap_context(snapc);
593  
594  	dout("%s req %p\n", __func__, req);
595  	return req;
596  }
597  EXPORT_SYMBOL(ceph_osdc_alloc_request);
598  
ceph_oloc_encoding_size(const struct ceph_object_locator * oloc)599  static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
600  {
601  	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
602  }
603  
__ceph_osdc_alloc_messages(struct ceph_osd_request * req,gfp_t gfp,int num_request_data_items,int num_reply_data_items)604  static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
605  				      int num_request_data_items,
606  				      int num_reply_data_items)
607  {
608  	struct ceph_osd_client *osdc = req->r_osdc;
609  	struct ceph_msg *msg;
610  	int msg_size;
611  
612  	WARN_ON(req->r_request || req->r_reply);
613  	WARN_ON(ceph_oid_empty(&req->r_base_oid));
614  	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
615  
616  	/* create request message */
617  	msg_size = CEPH_ENCODING_START_BLK_LEN +
618  			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
619  	msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
620  	msg_size += CEPH_ENCODING_START_BLK_LEN +
621  			sizeof(struct ceph_osd_reqid); /* reqid */
622  	msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
623  	msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
624  	msg_size += CEPH_ENCODING_START_BLK_LEN +
625  			ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
626  	msg_size += 4 + req->r_base_oid.name_len; /* oid */
627  	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
628  	msg_size += 8; /* snapid */
629  	msg_size += 8; /* snap_seq */
630  	msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
631  	msg_size += 4 + 8; /* retry_attempt, features */
632  
633  	if (req->r_mempool)
634  		msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
635  				       num_request_data_items);
636  	else
637  		msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
638  				    num_request_data_items, gfp, true);
639  	if (!msg)
640  		return -ENOMEM;
641  
642  	memset(msg->front.iov_base, 0, msg->front.iov_len);
643  	req->r_request = msg;
644  
645  	/* create reply message */
646  	msg_size = OSD_OPREPLY_FRONT_LEN;
647  	msg_size += req->r_base_oid.name_len;
648  	msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
649  
650  	if (req->r_mempool)
651  		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
652  				       num_reply_data_items);
653  	else
654  		msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
655  				    num_reply_data_items, gfp, true);
656  	if (!msg)
657  		return -ENOMEM;
658  
659  	req->r_reply = msg;
660  
661  	return 0;
662  }
663  
osd_req_opcode_valid(u16 opcode)664  static bool osd_req_opcode_valid(u16 opcode)
665  {
666  	switch (opcode) {
667  #define GENERATE_CASE(op, opcode, str)	case CEPH_OSD_OP_##op: return true;
668  __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
669  #undef GENERATE_CASE
670  	default:
671  		return false;
672  	}
673  }
674  
get_num_data_items(struct ceph_osd_request * req,int * num_request_data_items,int * num_reply_data_items)675  static void get_num_data_items(struct ceph_osd_request *req,
676  			       int *num_request_data_items,
677  			       int *num_reply_data_items)
678  {
679  	struct ceph_osd_req_op *op;
680  
681  	*num_request_data_items = 0;
682  	*num_reply_data_items = 0;
683  
684  	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
685  		switch (op->op) {
686  		/* request */
687  		case CEPH_OSD_OP_WRITE:
688  		case CEPH_OSD_OP_WRITEFULL:
689  		case CEPH_OSD_OP_SETXATTR:
690  		case CEPH_OSD_OP_CMPXATTR:
691  		case CEPH_OSD_OP_NOTIFY_ACK:
692  		case CEPH_OSD_OP_COPY_FROM2:
693  			*num_request_data_items += 1;
694  			break;
695  
696  		/* reply */
697  		case CEPH_OSD_OP_STAT:
698  		case CEPH_OSD_OP_READ:
699  		case CEPH_OSD_OP_SPARSE_READ:
700  		case CEPH_OSD_OP_LIST_WATCHERS:
701  			*num_reply_data_items += 1;
702  			break;
703  
704  		/* both */
705  		case CEPH_OSD_OP_NOTIFY:
706  			*num_request_data_items += 1;
707  			*num_reply_data_items += 1;
708  			break;
709  		case CEPH_OSD_OP_CALL:
710  			*num_request_data_items += 2;
711  			*num_reply_data_items += 1;
712  			break;
713  
714  		default:
715  			WARN_ON(!osd_req_opcode_valid(op->op));
716  			break;
717  		}
718  	}
719  }
720  
721  /*
722   * oid, oloc and OSD op opcode(s) must be filled in before this function
723   * is called.
724   */
ceph_osdc_alloc_messages(struct ceph_osd_request * req,gfp_t gfp)725  int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
726  {
727  	int num_request_data_items, num_reply_data_items;
728  
729  	get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
730  	return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
731  					  num_reply_data_items);
732  }
733  EXPORT_SYMBOL(ceph_osdc_alloc_messages);
734  
735  /*
736   * This is an osd op init function for opcodes that have no data or
737   * other information associated with them.  It also serves as a
738   * common init routine for all the other init functions, below.
739   */
740  struct ceph_osd_req_op *
osd_req_op_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,u32 flags)741  osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
742  		 u16 opcode, u32 flags)
743  {
744  	struct ceph_osd_req_op *op;
745  
746  	BUG_ON(which >= osd_req->r_num_ops);
747  	BUG_ON(!osd_req_opcode_valid(opcode));
748  
749  	op = &osd_req->r_ops[which];
750  	memset(op, 0, sizeof (*op));
751  	op->op = opcode;
752  	op->flags = flags;
753  
754  	return op;
755  }
756  EXPORT_SYMBOL(osd_req_op_init);
757  
osd_req_op_extent_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,u64 offset,u64 length,u64 truncate_size,u32 truncate_seq)758  void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
759  				unsigned int which, u16 opcode,
760  				u64 offset, u64 length,
761  				u64 truncate_size, u32 truncate_seq)
762  {
763  	struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
764  						     opcode, 0);
765  	size_t payload_len = 0;
766  
767  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
768  	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
769  	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
770  
771  	op->extent.offset = offset;
772  	op->extent.length = length;
773  	op->extent.truncate_size = truncate_size;
774  	op->extent.truncate_seq = truncate_seq;
775  	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
776  		payload_len += length;
777  
778  	op->indata_len = payload_len;
779  }
780  EXPORT_SYMBOL(osd_req_op_extent_init);
781  
osd_req_op_extent_update(struct ceph_osd_request * osd_req,unsigned int which,u64 length)782  void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
783  				unsigned int which, u64 length)
784  {
785  	struct ceph_osd_req_op *op;
786  	u64 previous;
787  
788  	BUG_ON(which >= osd_req->r_num_ops);
789  	op = &osd_req->r_ops[which];
790  	previous = op->extent.length;
791  
792  	if (length == previous)
793  		return;		/* Nothing to do */
794  	BUG_ON(length > previous);
795  
796  	op->extent.length = length;
797  	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
798  		op->indata_len -= previous - length;
799  }
800  EXPORT_SYMBOL(osd_req_op_extent_update);
801  
osd_req_op_extent_dup_last(struct ceph_osd_request * osd_req,unsigned int which,u64 offset_inc)802  void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
803  				unsigned int which, u64 offset_inc)
804  {
805  	struct ceph_osd_req_op *op, *prev_op;
806  
807  	BUG_ON(which + 1 >= osd_req->r_num_ops);
808  
809  	prev_op = &osd_req->r_ops[which];
810  	op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
811  	/* dup previous one */
812  	op->indata_len = prev_op->indata_len;
813  	op->outdata_len = prev_op->outdata_len;
814  	op->extent = prev_op->extent;
815  	/* adjust offset */
816  	op->extent.offset += offset_inc;
817  	op->extent.length -= offset_inc;
818  
819  	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
820  		op->indata_len -= offset_inc;
821  }
822  EXPORT_SYMBOL(osd_req_op_extent_dup_last);
823  
osd_req_op_cls_init(struct ceph_osd_request * osd_req,unsigned int which,const char * class,const char * method)824  int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
825  			const char *class, const char *method)
826  {
827  	struct ceph_osd_req_op *op;
828  	struct ceph_pagelist *pagelist;
829  	size_t payload_len = 0;
830  	size_t size;
831  	int ret;
832  
833  	op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
834  
835  	pagelist = ceph_pagelist_alloc(GFP_NOFS);
836  	if (!pagelist)
837  		return -ENOMEM;
838  
839  	op->cls.class_name = class;
840  	size = strlen(class);
841  	BUG_ON(size > (size_t) U8_MAX);
842  	op->cls.class_len = size;
843  	ret = ceph_pagelist_append(pagelist, class, size);
844  	if (ret)
845  		goto err_pagelist_free;
846  	payload_len += size;
847  
848  	op->cls.method_name = method;
849  	size = strlen(method);
850  	BUG_ON(size > (size_t) U8_MAX);
851  	op->cls.method_len = size;
852  	ret = ceph_pagelist_append(pagelist, method, size);
853  	if (ret)
854  		goto err_pagelist_free;
855  	payload_len += size;
856  
857  	osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
858  	op->indata_len = payload_len;
859  	return 0;
860  
861  err_pagelist_free:
862  	ceph_pagelist_release(pagelist);
863  	return ret;
864  }
865  EXPORT_SYMBOL(osd_req_op_cls_init);
866  
osd_req_op_xattr_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,const char * name,const void * value,size_t size,u8 cmp_op,u8 cmp_mode)867  int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
868  			  u16 opcode, const char *name, const void *value,
869  			  size_t size, u8 cmp_op, u8 cmp_mode)
870  {
871  	struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
872  						     opcode, 0);
873  	struct ceph_pagelist *pagelist;
874  	size_t payload_len;
875  	int ret;
876  
877  	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
878  
879  	pagelist = ceph_pagelist_alloc(GFP_NOFS);
880  	if (!pagelist)
881  		return -ENOMEM;
882  
883  	payload_len = strlen(name);
884  	op->xattr.name_len = payload_len;
885  	ret = ceph_pagelist_append(pagelist, name, payload_len);
886  	if (ret)
887  		goto err_pagelist_free;
888  
889  	op->xattr.value_len = size;
890  	ret = ceph_pagelist_append(pagelist, value, size);
891  	if (ret)
892  		goto err_pagelist_free;
893  	payload_len += size;
894  
895  	op->xattr.cmp_op = cmp_op;
896  	op->xattr.cmp_mode = cmp_mode;
897  
898  	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
899  	op->indata_len = payload_len;
900  	return 0;
901  
902  err_pagelist_free:
903  	ceph_pagelist_release(pagelist);
904  	return ret;
905  }
906  EXPORT_SYMBOL(osd_req_op_xattr_init);
907  
908  /*
909   * @watch_opcode: CEPH_OSD_WATCH_OP_*
910   */
osd_req_op_watch_init(struct ceph_osd_request * req,int which,u8 watch_opcode,u64 cookie,u32 gen)911  static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
912  				  u8 watch_opcode, u64 cookie, u32 gen)
913  {
914  	struct ceph_osd_req_op *op;
915  
916  	op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
917  	op->watch.cookie = cookie;
918  	op->watch.op = watch_opcode;
919  	op->watch.gen = gen;
920  }
921  
922  /*
923   * prot_ver, timeout and notify payload (may be empty) should already be
924   * encoded in @request_pl
925   */
osd_req_op_notify_init(struct ceph_osd_request * req,int which,u64 cookie,struct ceph_pagelist * request_pl)926  static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
927  				   u64 cookie, struct ceph_pagelist *request_pl)
928  {
929  	struct ceph_osd_req_op *op;
930  
931  	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
932  	op->notify.cookie = cookie;
933  
934  	ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
935  	op->indata_len = request_pl->length;
936  }
937  
938  /*
939   * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
940   */
osd_req_op_alloc_hint_init(struct ceph_osd_request * osd_req,unsigned int which,u64 expected_object_size,u64 expected_write_size,u32 flags)941  void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
942  				unsigned int which,
943  				u64 expected_object_size,
944  				u64 expected_write_size,
945  				u32 flags)
946  {
947  	struct ceph_osd_req_op *op;
948  
949  	op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
950  	op->alloc_hint.expected_object_size = expected_object_size;
951  	op->alloc_hint.expected_write_size = expected_write_size;
952  	op->alloc_hint.flags = flags;
953  
954  	/*
955  	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
956  	 * not worth a feature bit.  Set FAILOK per-op flag to make
957  	 * sure older osds don't trip over an unsupported opcode.
958  	 */
959  	op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
960  }
961  EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
962  
ceph_osdc_msg_data_add(struct ceph_msg * msg,struct ceph_osd_data * osd_data)963  static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
964  				struct ceph_osd_data *osd_data)
965  {
966  	u64 length = ceph_osd_data_length(osd_data);
967  
968  	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
969  		BUG_ON(length > (u64) SIZE_MAX);
970  		if (length)
971  			ceph_msg_data_add_pages(msg, osd_data->pages,
972  					length, osd_data->alignment, false);
973  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
974  		BUG_ON(!length);
975  		ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
976  #ifdef CONFIG_BLOCK
977  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
978  		ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
979  #endif
980  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
981  		ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
982  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
983  		ceph_msg_data_add_iter(msg, &osd_data->iter);
984  	} else {
985  		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
986  	}
987  }
988  
osd_req_encode_op(struct ceph_osd_op * dst,const struct ceph_osd_req_op * src)989  static u32 osd_req_encode_op(struct ceph_osd_op *dst,
990  			     const struct ceph_osd_req_op *src)
991  {
992  	switch (src->op) {
993  	case CEPH_OSD_OP_STAT:
994  		break;
995  	case CEPH_OSD_OP_READ:
996  	case CEPH_OSD_OP_SPARSE_READ:
997  	case CEPH_OSD_OP_WRITE:
998  	case CEPH_OSD_OP_WRITEFULL:
999  	case CEPH_OSD_OP_ZERO:
1000  	case CEPH_OSD_OP_TRUNCATE:
1001  		dst->extent.offset = cpu_to_le64(src->extent.offset);
1002  		dst->extent.length = cpu_to_le64(src->extent.length);
1003  		dst->extent.truncate_size =
1004  			cpu_to_le64(src->extent.truncate_size);
1005  		dst->extent.truncate_seq =
1006  			cpu_to_le32(src->extent.truncate_seq);
1007  		break;
1008  	case CEPH_OSD_OP_CALL:
1009  		dst->cls.class_len = src->cls.class_len;
1010  		dst->cls.method_len = src->cls.method_len;
1011  		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
1012  		break;
1013  	case CEPH_OSD_OP_WATCH:
1014  		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
1015  		dst->watch.ver = cpu_to_le64(0);
1016  		dst->watch.op = src->watch.op;
1017  		dst->watch.gen = cpu_to_le32(src->watch.gen);
1018  		break;
1019  	case CEPH_OSD_OP_NOTIFY_ACK:
1020  		break;
1021  	case CEPH_OSD_OP_NOTIFY:
1022  		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
1023  		break;
1024  	case CEPH_OSD_OP_LIST_WATCHERS:
1025  		break;
1026  	case CEPH_OSD_OP_SETALLOCHINT:
1027  		dst->alloc_hint.expected_object_size =
1028  		    cpu_to_le64(src->alloc_hint.expected_object_size);
1029  		dst->alloc_hint.expected_write_size =
1030  		    cpu_to_le64(src->alloc_hint.expected_write_size);
1031  		dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
1032  		break;
1033  	case CEPH_OSD_OP_SETXATTR:
1034  	case CEPH_OSD_OP_CMPXATTR:
1035  		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
1036  		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
1037  		dst->xattr.cmp_op = src->xattr.cmp_op;
1038  		dst->xattr.cmp_mode = src->xattr.cmp_mode;
1039  		break;
1040  	case CEPH_OSD_OP_CREATE:
1041  	case CEPH_OSD_OP_DELETE:
1042  		break;
1043  	case CEPH_OSD_OP_COPY_FROM2:
1044  		dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1045  		dst->copy_from.src_version =
1046  			cpu_to_le64(src->copy_from.src_version);
1047  		dst->copy_from.flags = src->copy_from.flags;
1048  		dst->copy_from.src_fadvise_flags =
1049  			cpu_to_le32(src->copy_from.src_fadvise_flags);
1050  		break;
1051  	case CEPH_OSD_OP_ASSERT_VER:
1052  		dst->assert_ver.unused = cpu_to_le64(0);
1053  		dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver);
1054  		break;
1055  	default:
1056  		pr_err("unsupported osd opcode %s\n",
1057  			ceph_osd_op_name(src->op));
1058  		WARN_ON(1);
1059  
1060  		return 0;
1061  	}
1062  
1063  	dst->op = cpu_to_le16(src->op);
1064  	dst->flags = cpu_to_le32(src->flags);
1065  	dst->payload_len = cpu_to_le32(src->indata_len);
1066  
1067  	return src->indata_len;
1068  }
1069  
1070  /*
1071   * build new request AND message, calculate layout, and adjust file
1072   * extent as needed.
1073   *
1074   * if the file was recently truncated, we include information about its
1075   * old and new size so that the object can be updated appropriately.  (we
1076   * avoid synchronously deleting truncated objects because it's slow.)
1077   */
ceph_osdc_new_request(struct ceph_osd_client * osdc,struct ceph_file_layout * layout,struct ceph_vino vino,u64 off,u64 * plen,unsigned int which,int num_ops,int opcode,int flags,struct ceph_snap_context * snapc,u32 truncate_seq,u64 truncate_size,bool use_mempool)1078  struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1079  					       struct ceph_file_layout *layout,
1080  					       struct ceph_vino vino,
1081  					       u64 off, u64 *plen,
1082  					       unsigned int which, int num_ops,
1083  					       int opcode, int flags,
1084  					       struct ceph_snap_context *snapc,
1085  					       u32 truncate_seq,
1086  					       u64 truncate_size,
1087  					       bool use_mempool)
1088  {
1089  	struct ceph_osd_request *req;
1090  	u64 objnum = 0;
1091  	u64 objoff = 0;
1092  	u64 objlen = 0;
1093  	int r;
1094  
1095  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
1096  	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
1097  	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
1098  	       opcode != CEPH_OSD_OP_SPARSE_READ);
1099  
1100  	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
1101  					GFP_NOFS);
1102  	if (!req) {
1103  		r = -ENOMEM;
1104  		goto fail;
1105  	}
1106  
1107  	/* calculate max write size */
1108  	r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
1109  	if (r)
1110  		goto fail;
1111  
1112  	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
1113  		osd_req_op_init(req, which, opcode, 0);
1114  	} else {
1115  		u32 object_size = layout->object_size;
1116  		u32 object_base = off - objoff;
1117  		if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
1118  			if (truncate_size <= object_base) {
1119  				truncate_size = 0;
1120  			} else {
1121  				truncate_size -= object_base;
1122  				if (truncate_size > object_size)
1123  					truncate_size = object_size;
1124  			}
1125  		}
1126  		osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1127  				       truncate_size, truncate_seq);
1128  	}
1129  
1130  	req->r_base_oloc.pool = layout->pool_id;
1131  	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1132  	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1133  	req->r_flags = flags | osdc->client->options->read_from_replica;
1134  
1135  	req->r_snapid = vino.snap;
1136  	if (flags & CEPH_OSD_FLAG_WRITE)
1137  		req->r_data_offset = off;
1138  
1139  	if (num_ops > 1) {
1140  		int num_req_ops, num_rep_ops;
1141  
1142  		/*
1143  		 * If this is a multi-op write request, assume that we'll need
1144  		 * request ops. If it's a multi-op read then assume we'll need
1145  		 * reply ops. Anything else and call it -EINVAL.
1146  		 */
1147  		if (flags & CEPH_OSD_FLAG_WRITE) {
1148  			num_req_ops = num_ops;
1149  			num_rep_ops = 0;
1150  		} else if (flags & CEPH_OSD_FLAG_READ) {
1151  			num_req_ops = 0;
1152  			num_rep_ops = num_ops;
1153  		} else {
1154  			r = -EINVAL;
1155  			goto fail;
1156  		}
1157  
1158  		r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_req_ops,
1159  					       num_rep_ops);
1160  	} else {
1161  		r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1162  	}
1163  	if (r)
1164  		goto fail;
1165  
1166  	return req;
1167  
1168  fail:
1169  	ceph_osdc_put_request(req);
1170  	return ERR_PTR(r);
1171  }
1172  EXPORT_SYMBOL(ceph_osdc_new_request);
1173  
__ceph_alloc_sparse_ext_map(struct ceph_osd_req_op * op,int cnt)1174  int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt)
1175  {
1176  	op->extent.sparse_ext_cnt = cnt;
1177  	op->extent.sparse_ext = kmalloc_array(cnt,
1178  					      sizeof(*op->extent.sparse_ext),
1179  					      GFP_NOFS);
1180  	if (!op->extent.sparse_ext)
1181  		return -ENOMEM;
1182  	return 0;
1183  }
1184  EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map);
1185  
1186  /*
1187   * We keep osd requests in an rbtree, sorted by ->r_tid.
1188   */
DEFINE_RB_FUNCS(request,struct ceph_osd_request,r_tid,r_node)1189  DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1190  DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1191  
1192  /*
1193   * Call @fn on each OSD request as long as @fn returns 0.
1194   */
1195  static void for_each_request(struct ceph_osd_client *osdc,
1196  			int (*fn)(struct ceph_osd_request *req, void *arg),
1197  			void *arg)
1198  {
1199  	struct rb_node *n, *p;
1200  
1201  	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1202  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1203  
1204  		for (p = rb_first(&osd->o_requests); p; ) {
1205  			struct ceph_osd_request *req =
1206  			    rb_entry(p, struct ceph_osd_request, r_node);
1207  
1208  			p = rb_next(p);
1209  			if (fn(req, arg))
1210  				return;
1211  		}
1212  	}
1213  
1214  	for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1215  		struct ceph_osd_request *req =
1216  		    rb_entry(p, struct ceph_osd_request, r_node);
1217  
1218  		p = rb_next(p);
1219  		if (fn(req, arg))
1220  			return;
1221  	}
1222  }
1223  
osd_homeless(struct ceph_osd * osd)1224  static bool osd_homeless(struct ceph_osd *osd)
1225  {
1226  	return osd->o_osd == CEPH_HOMELESS_OSD;
1227  }
1228  
osd_registered(struct ceph_osd * osd)1229  static bool osd_registered(struct ceph_osd *osd)
1230  {
1231  	verify_osdc_locked(osd->o_osdc);
1232  
1233  	return !RB_EMPTY_NODE(&osd->o_node);
1234  }
1235  
1236  /*
1237   * Assumes @osd is zero-initialized.
1238   */
osd_init(struct ceph_osd * osd)1239  static void osd_init(struct ceph_osd *osd)
1240  {
1241  	refcount_set(&osd->o_ref, 1);
1242  	RB_CLEAR_NODE(&osd->o_node);
1243  	spin_lock_init(&osd->o_requests_lock);
1244  	osd->o_requests = RB_ROOT;
1245  	osd->o_linger_requests = RB_ROOT;
1246  	osd->o_backoff_mappings = RB_ROOT;
1247  	osd->o_backoffs_by_id = RB_ROOT;
1248  	INIT_LIST_HEAD(&osd->o_osd_lru);
1249  	INIT_LIST_HEAD(&osd->o_keepalive_item);
1250  	osd->o_incarnation = 1;
1251  	mutex_init(&osd->lock);
1252  }
1253  
ceph_init_sparse_read(struct ceph_sparse_read * sr)1254  static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
1255  {
1256  	kfree(sr->sr_extent);
1257  	memset(sr, '\0', sizeof(*sr));
1258  	sr->sr_state = CEPH_SPARSE_READ_HDR;
1259  }
1260  
osd_cleanup(struct ceph_osd * osd)1261  static void osd_cleanup(struct ceph_osd *osd)
1262  {
1263  	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1264  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1265  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1266  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1267  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1268  	WARN_ON(!list_empty(&osd->o_osd_lru));
1269  	WARN_ON(!list_empty(&osd->o_keepalive_item));
1270  
1271  	ceph_init_sparse_read(&osd->o_sparse_read);
1272  
1273  	if (osd->o_auth.authorizer) {
1274  		WARN_ON(osd_homeless(osd));
1275  		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1276  	}
1277  }
1278  
1279  /*
1280   * Track open sessions with osds.
1281   */
create_osd(struct ceph_osd_client * osdc,int onum)1282  static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1283  {
1284  	struct ceph_osd *osd;
1285  
1286  	WARN_ON(onum == CEPH_HOMELESS_OSD);
1287  
1288  	osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1289  	osd_init(osd);
1290  	osd->o_osdc = osdc;
1291  	osd->o_osd = onum;
1292  	osd->o_sparse_op_idx = -1;
1293  
1294  	ceph_init_sparse_read(&osd->o_sparse_read);
1295  
1296  	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1297  
1298  	return osd;
1299  }
1300  
get_osd(struct ceph_osd * osd)1301  static struct ceph_osd *get_osd(struct ceph_osd *osd)
1302  {
1303  	if (refcount_inc_not_zero(&osd->o_ref)) {
1304  		dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1305  		     refcount_read(&osd->o_ref));
1306  		return osd;
1307  	} else {
1308  		dout("get_osd %p FAIL\n", osd);
1309  		return NULL;
1310  	}
1311  }
1312  
put_osd(struct ceph_osd * osd)1313  static void put_osd(struct ceph_osd *osd)
1314  {
1315  	dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1316  	     refcount_read(&osd->o_ref) - 1);
1317  	if (refcount_dec_and_test(&osd->o_ref)) {
1318  		osd_cleanup(osd);
1319  		kfree(osd);
1320  	}
1321  }
1322  
DEFINE_RB_FUNCS(osd,struct ceph_osd,o_osd,o_node)1323  DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1324  
1325  static void __move_osd_to_lru(struct ceph_osd *osd)
1326  {
1327  	struct ceph_osd_client *osdc = osd->o_osdc;
1328  
1329  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1330  	BUG_ON(!list_empty(&osd->o_osd_lru));
1331  
1332  	spin_lock(&osdc->osd_lru_lock);
1333  	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1334  	spin_unlock(&osdc->osd_lru_lock);
1335  
1336  	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1337  }
1338  
maybe_move_osd_to_lru(struct ceph_osd * osd)1339  static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1340  {
1341  	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1342  	    RB_EMPTY_ROOT(&osd->o_linger_requests))
1343  		__move_osd_to_lru(osd);
1344  }
1345  
__remove_osd_from_lru(struct ceph_osd * osd)1346  static void __remove_osd_from_lru(struct ceph_osd *osd)
1347  {
1348  	struct ceph_osd_client *osdc = osd->o_osdc;
1349  
1350  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1351  
1352  	spin_lock(&osdc->osd_lru_lock);
1353  	if (!list_empty(&osd->o_osd_lru))
1354  		list_del_init(&osd->o_osd_lru);
1355  	spin_unlock(&osdc->osd_lru_lock);
1356  }
1357  
1358  /*
1359   * Close the connection and assign any leftover requests to the
1360   * homeless session.
1361   */
close_osd(struct ceph_osd * osd)1362  static void close_osd(struct ceph_osd *osd)
1363  {
1364  	struct ceph_osd_client *osdc = osd->o_osdc;
1365  	struct rb_node *n;
1366  
1367  	verify_osdc_wrlocked(osdc);
1368  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1369  
1370  	ceph_con_close(&osd->o_con);
1371  
1372  	for (n = rb_first(&osd->o_requests); n; ) {
1373  		struct ceph_osd_request *req =
1374  		    rb_entry(n, struct ceph_osd_request, r_node);
1375  
1376  		n = rb_next(n); /* unlink_request() */
1377  
1378  		dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1379  		unlink_request(osd, req);
1380  		link_request(&osdc->homeless_osd, req);
1381  	}
1382  	for (n = rb_first(&osd->o_linger_requests); n; ) {
1383  		struct ceph_osd_linger_request *lreq =
1384  		    rb_entry(n, struct ceph_osd_linger_request, node);
1385  
1386  		n = rb_next(n); /* unlink_linger() */
1387  
1388  		dout(" reassigning lreq %p linger_id %llu\n", lreq,
1389  		     lreq->linger_id);
1390  		unlink_linger(osd, lreq);
1391  		link_linger(&osdc->homeless_osd, lreq);
1392  	}
1393  	clear_backoffs(osd);
1394  
1395  	__remove_osd_from_lru(osd);
1396  	erase_osd(&osdc->osds, osd);
1397  	put_osd(osd);
1398  }
1399  
1400  /*
1401   * reset osd connect
1402   */
reopen_osd(struct ceph_osd * osd)1403  static int reopen_osd(struct ceph_osd *osd)
1404  {
1405  	struct ceph_entity_addr *peer_addr;
1406  
1407  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1408  
1409  	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1410  	    RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1411  		close_osd(osd);
1412  		return -ENODEV;
1413  	}
1414  
1415  	peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1416  	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1417  			!ceph_con_opened(&osd->o_con)) {
1418  		struct rb_node *n;
1419  
1420  		dout("osd addr hasn't changed and connection never opened, "
1421  		     "letting msgr retry\n");
1422  		/* touch each r_stamp for handle_timeout()'s benfit */
1423  		for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1424  			struct ceph_osd_request *req =
1425  			    rb_entry(n, struct ceph_osd_request, r_node);
1426  			req->r_stamp = jiffies;
1427  		}
1428  
1429  		return -EAGAIN;
1430  	}
1431  
1432  	ceph_con_close(&osd->o_con);
1433  	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1434  	osd->o_incarnation++;
1435  
1436  	return 0;
1437  }
1438  
lookup_create_osd(struct ceph_osd_client * osdc,int o,bool wrlocked)1439  static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1440  					  bool wrlocked)
1441  {
1442  	struct ceph_osd *osd;
1443  
1444  	if (wrlocked)
1445  		verify_osdc_wrlocked(osdc);
1446  	else
1447  		verify_osdc_locked(osdc);
1448  
1449  	if (o != CEPH_HOMELESS_OSD)
1450  		osd = lookup_osd(&osdc->osds, o);
1451  	else
1452  		osd = &osdc->homeless_osd;
1453  	if (!osd) {
1454  		if (!wrlocked)
1455  			return ERR_PTR(-EAGAIN);
1456  
1457  		osd = create_osd(osdc, o);
1458  		insert_osd(&osdc->osds, osd);
1459  		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1460  			      &osdc->osdmap->osd_addr[osd->o_osd]);
1461  	}
1462  
1463  	dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1464  	return osd;
1465  }
1466  
1467  /*
1468   * Create request <-> OSD session relation.
1469   *
1470   * @req has to be assigned a tid, @osd may be homeless.
1471   */
link_request(struct ceph_osd * osd,struct ceph_osd_request * req)1472  static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1473  {
1474  	verify_osd_locked(osd);
1475  	WARN_ON(!req->r_tid || req->r_osd);
1476  	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1477  	     req, req->r_tid);
1478  
1479  	if (!osd_homeless(osd))
1480  		__remove_osd_from_lru(osd);
1481  	else
1482  		atomic_inc(&osd->o_osdc->num_homeless);
1483  
1484  	get_osd(osd);
1485  	spin_lock(&osd->o_requests_lock);
1486  	insert_request(&osd->o_requests, req);
1487  	spin_unlock(&osd->o_requests_lock);
1488  	req->r_osd = osd;
1489  }
1490  
unlink_request(struct ceph_osd * osd,struct ceph_osd_request * req)1491  static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1492  {
1493  	verify_osd_locked(osd);
1494  	WARN_ON(req->r_osd != osd);
1495  	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1496  	     req, req->r_tid);
1497  
1498  	req->r_osd = NULL;
1499  	spin_lock(&osd->o_requests_lock);
1500  	erase_request(&osd->o_requests, req);
1501  	spin_unlock(&osd->o_requests_lock);
1502  	put_osd(osd);
1503  
1504  	if (!osd_homeless(osd))
1505  		maybe_move_osd_to_lru(osd);
1506  	else
1507  		atomic_dec(&osd->o_osdc->num_homeless);
1508  }
1509  
__pool_full(struct ceph_pg_pool_info * pi)1510  static bool __pool_full(struct ceph_pg_pool_info *pi)
1511  {
1512  	return pi->flags & CEPH_POOL_FLAG_FULL;
1513  }
1514  
have_pool_full(struct ceph_osd_client * osdc)1515  static bool have_pool_full(struct ceph_osd_client *osdc)
1516  {
1517  	struct rb_node *n;
1518  
1519  	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1520  		struct ceph_pg_pool_info *pi =
1521  		    rb_entry(n, struct ceph_pg_pool_info, node);
1522  
1523  		if (__pool_full(pi))
1524  			return true;
1525  	}
1526  
1527  	return false;
1528  }
1529  
pool_full(struct ceph_osd_client * osdc,s64 pool_id)1530  static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1531  {
1532  	struct ceph_pg_pool_info *pi;
1533  
1534  	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1535  	if (!pi)
1536  		return false;
1537  
1538  	return __pool_full(pi);
1539  }
1540  
1541  /*
1542   * Returns whether a request should be blocked from being sent
1543   * based on the current osdmap and osd_client settings.
1544   */
target_should_be_paused(struct ceph_osd_client * osdc,const struct ceph_osd_request_target * t,struct ceph_pg_pool_info * pi)1545  static bool target_should_be_paused(struct ceph_osd_client *osdc,
1546  				    const struct ceph_osd_request_target *t,
1547  				    struct ceph_pg_pool_info *pi)
1548  {
1549  	bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1550  	bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1551  		       ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1552  		       __pool_full(pi);
1553  
1554  	WARN_ON(pi->id != t->target_oloc.pool);
1555  	return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1556  	       ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1557  	       (osdc->osdmap->epoch < osdc->epoch_barrier);
1558  }
1559  
pick_random_replica(const struct ceph_osds * acting)1560  static int pick_random_replica(const struct ceph_osds *acting)
1561  {
1562  	int i = get_random_u32_below(acting->size);
1563  
1564  	dout("%s picked osd%d, primary osd%d\n", __func__,
1565  	     acting->osds[i], acting->primary);
1566  	return i;
1567  }
1568  
1569  /*
1570   * Picks the closest replica based on client's location given by
1571   * crush_location option.  Prefers the primary if the locality is
1572   * the same.
1573   */
pick_closest_replica(struct ceph_osd_client * osdc,const struct ceph_osds * acting)1574  static int pick_closest_replica(struct ceph_osd_client *osdc,
1575  				const struct ceph_osds *acting)
1576  {
1577  	struct ceph_options *opt = osdc->client->options;
1578  	int best_i, best_locality;
1579  	int i = 0, locality;
1580  
1581  	do {
1582  		locality = ceph_get_crush_locality(osdc->osdmap,
1583  						   acting->osds[i],
1584  						   &opt->crush_locs);
1585  		if (i == 0 ||
1586  		    (locality >= 0 && best_locality < 0) ||
1587  		    (locality >= 0 && best_locality >= 0 &&
1588  		     locality < best_locality)) {
1589  			best_i = i;
1590  			best_locality = locality;
1591  		}
1592  	} while (++i < acting->size);
1593  
1594  	dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1595  	     acting->osds[best_i], best_locality, acting->primary);
1596  	return best_i;
1597  }
1598  
1599  enum calc_target_result {
1600  	CALC_TARGET_NO_ACTION = 0,
1601  	CALC_TARGET_NEED_RESEND,
1602  	CALC_TARGET_POOL_DNE,
1603  };
1604  
calc_target(struct ceph_osd_client * osdc,struct ceph_osd_request_target * t,bool any_change)1605  static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1606  					   struct ceph_osd_request_target *t,
1607  					   bool any_change)
1608  {
1609  	struct ceph_pg_pool_info *pi;
1610  	struct ceph_pg pgid, last_pgid;
1611  	struct ceph_osds up, acting;
1612  	bool is_read = t->flags & CEPH_OSD_FLAG_READ;
1613  	bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
1614  	bool force_resend = false;
1615  	bool unpaused = false;
1616  	bool legacy_change = false;
1617  	bool split = false;
1618  	bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1619  	bool recovery_deletes = ceph_osdmap_flag(osdc,
1620  						 CEPH_OSDMAP_RECOVERY_DELETES);
1621  	enum calc_target_result ct_res;
1622  
1623  	t->epoch = osdc->osdmap->epoch;
1624  	pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1625  	if (!pi) {
1626  		t->osd = CEPH_HOMELESS_OSD;
1627  		ct_res = CALC_TARGET_POOL_DNE;
1628  		goto out;
1629  	}
1630  
1631  	if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1632  		if (t->last_force_resend < pi->last_force_request_resend) {
1633  			t->last_force_resend = pi->last_force_request_resend;
1634  			force_resend = true;
1635  		} else if (t->last_force_resend == 0) {
1636  			force_resend = true;
1637  		}
1638  	}
1639  
1640  	/* apply tiering */
1641  	ceph_oid_copy(&t->target_oid, &t->base_oid);
1642  	ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1643  	if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1644  		if (is_read && pi->read_tier >= 0)
1645  			t->target_oloc.pool = pi->read_tier;
1646  		if (is_write && pi->write_tier >= 0)
1647  			t->target_oloc.pool = pi->write_tier;
1648  
1649  		pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1650  		if (!pi) {
1651  			t->osd = CEPH_HOMELESS_OSD;
1652  			ct_res = CALC_TARGET_POOL_DNE;
1653  			goto out;
1654  		}
1655  	}
1656  
1657  	__ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1658  	last_pgid.pool = pgid.pool;
1659  	last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1660  
1661  	ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1662  	if (any_change &&
1663  	    ceph_is_new_interval(&t->acting,
1664  				 &acting,
1665  				 &t->up,
1666  				 &up,
1667  				 t->size,
1668  				 pi->size,
1669  				 t->min_size,
1670  				 pi->min_size,
1671  				 t->pg_num,
1672  				 pi->pg_num,
1673  				 t->sort_bitwise,
1674  				 sort_bitwise,
1675  				 t->recovery_deletes,
1676  				 recovery_deletes,
1677  				 &last_pgid))
1678  		force_resend = true;
1679  
1680  	if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1681  		t->paused = false;
1682  		unpaused = true;
1683  	}
1684  	legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1685  			ceph_osds_changed(&t->acting, &acting,
1686  					  t->used_replica || any_change);
1687  	if (t->pg_num)
1688  		split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1689  
1690  	if (legacy_change || force_resend || split) {
1691  		t->pgid = pgid; /* struct */
1692  		ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1693  		ceph_osds_copy(&t->acting, &acting);
1694  		ceph_osds_copy(&t->up, &up);
1695  		t->size = pi->size;
1696  		t->min_size = pi->min_size;
1697  		t->pg_num = pi->pg_num;
1698  		t->pg_num_mask = pi->pg_num_mask;
1699  		t->sort_bitwise = sort_bitwise;
1700  		t->recovery_deletes = recovery_deletes;
1701  
1702  		if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
1703  				 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
1704  		    !is_write && pi->type == CEPH_POOL_TYPE_REP &&
1705  		    acting.size > 1) {
1706  			int pos;
1707  
1708  			WARN_ON(!is_read || acting.osds[0] != acting.primary);
1709  			if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
1710  				pos = pick_random_replica(&acting);
1711  			} else {
1712  				pos = pick_closest_replica(osdc, &acting);
1713  			}
1714  			t->osd = acting.osds[pos];
1715  			t->used_replica = pos > 0;
1716  		} else {
1717  			t->osd = acting.primary;
1718  			t->used_replica = false;
1719  		}
1720  	}
1721  
1722  	if (unpaused || legacy_change || force_resend || split)
1723  		ct_res = CALC_TARGET_NEED_RESEND;
1724  	else
1725  		ct_res = CALC_TARGET_NO_ACTION;
1726  
1727  out:
1728  	dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1729  	     legacy_change, force_resend, split, ct_res, t->osd);
1730  	return ct_res;
1731  }
1732  
alloc_spg_mapping(void)1733  static struct ceph_spg_mapping *alloc_spg_mapping(void)
1734  {
1735  	struct ceph_spg_mapping *spg;
1736  
1737  	spg = kmalloc(sizeof(*spg), GFP_NOIO);
1738  	if (!spg)
1739  		return NULL;
1740  
1741  	RB_CLEAR_NODE(&spg->node);
1742  	spg->backoffs = RB_ROOT;
1743  	return spg;
1744  }
1745  
free_spg_mapping(struct ceph_spg_mapping * spg)1746  static void free_spg_mapping(struct ceph_spg_mapping *spg)
1747  {
1748  	WARN_ON(!RB_EMPTY_NODE(&spg->node));
1749  	WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1750  
1751  	kfree(spg);
1752  }
1753  
1754  /*
1755   * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1756   * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
1757   * defined only within a specific spgid; it does not pass anything to
1758   * children on split, or to another primary.
1759   */
DEFINE_RB_FUNCS2(spg_mapping,struct ceph_spg_mapping,spgid,ceph_spg_compare,RB_BYPTR,const struct ceph_spg *,node)1760  DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1761  		 RB_BYPTR, const struct ceph_spg *, node)
1762  
1763  static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1764  {
1765  	return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1766  }
1767  
hoid_get_effective_key(const struct ceph_hobject_id * hoid,void ** pkey,size_t * pkey_len)1768  static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1769  				   void **pkey, size_t *pkey_len)
1770  {
1771  	if (hoid->key_len) {
1772  		*pkey = hoid->key;
1773  		*pkey_len = hoid->key_len;
1774  	} else {
1775  		*pkey = hoid->oid;
1776  		*pkey_len = hoid->oid_len;
1777  	}
1778  }
1779  
compare_names(const void * name1,size_t name1_len,const void * name2,size_t name2_len)1780  static int compare_names(const void *name1, size_t name1_len,
1781  			 const void *name2, size_t name2_len)
1782  {
1783  	int ret;
1784  
1785  	ret = memcmp(name1, name2, min(name1_len, name2_len));
1786  	if (!ret) {
1787  		if (name1_len < name2_len)
1788  			ret = -1;
1789  		else if (name1_len > name2_len)
1790  			ret = 1;
1791  	}
1792  	return ret;
1793  }
1794  
hoid_compare(const struct ceph_hobject_id * lhs,const struct ceph_hobject_id * rhs)1795  static int hoid_compare(const struct ceph_hobject_id *lhs,
1796  			const struct ceph_hobject_id *rhs)
1797  {
1798  	void *effective_key1, *effective_key2;
1799  	size_t effective_key1_len, effective_key2_len;
1800  	int ret;
1801  
1802  	if (lhs->is_max < rhs->is_max)
1803  		return -1;
1804  	if (lhs->is_max > rhs->is_max)
1805  		return 1;
1806  
1807  	if (lhs->pool < rhs->pool)
1808  		return -1;
1809  	if (lhs->pool > rhs->pool)
1810  		return 1;
1811  
1812  	if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1813  		return -1;
1814  	if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1815  		return 1;
1816  
1817  	ret = compare_names(lhs->nspace, lhs->nspace_len,
1818  			    rhs->nspace, rhs->nspace_len);
1819  	if (ret)
1820  		return ret;
1821  
1822  	hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1823  	hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1824  	ret = compare_names(effective_key1, effective_key1_len,
1825  			    effective_key2, effective_key2_len);
1826  	if (ret)
1827  		return ret;
1828  
1829  	ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1830  	if (ret)
1831  		return ret;
1832  
1833  	if (lhs->snapid < rhs->snapid)
1834  		return -1;
1835  	if (lhs->snapid > rhs->snapid)
1836  		return 1;
1837  
1838  	return 0;
1839  }
1840  
1841  /*
1842   * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1843   * compat stuff here.
1844   *
1845   * Assumes @hoid is zero-initialized.
1846   */
decode_hoid(void ** p,void * end,struct ceph_hobject_id * hoid)1847  static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1848  {
1849  	u8 struct_v;
1850  	u32 struct_len;
1851  	int ret;
1852  
1853  	ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1854  				  &struct_len);
1855  	if (ret)
1856  		return ret;
1857  
1858  	if (struct_v < 4) {
1859  		pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1860  		goto e_inval;
1861  	}
1862  
1863  	hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1864  						GFP_NOIO);
1865  	if (IS_ERR(hoid->key)) {
1866  		ret = PTR_ERR(hoid->key);
1867  		hoid->key = NULL;
1868  		return ret;
1869  	}
1870  
1871  	hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1872  						GFP_NOIO);
1873  	if (IS_ERR(hoid->oid)) {
1874  		ret = PTR_ERR(hoid->oid);
1875  		hoid->oid = NULL;
1876  		return ret;
1877  	}
1878  
1879  	ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1880  	ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1881  	ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1882  
1883  	hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1884  						   GFP_NOIO);
1885  	if (IS_ERR(hoid->nspace)) {
1886  		ret = PTR_ERR(hoid->nspace);
1887  		hoid->nspace = NULL;
1888  		return ret;
1889  	}
1890  
1891  	ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1892  
1893  	ceph_hoid_build_hash_cache(hoid);
1894  	return 0;
1895  
1896  e_inval:
1897  	return -EINVAL;
1898  }
1899  
hoid_encoding_size(const struct ceph_hobject_id * hoid)1900  static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1901  {
1902  	return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1903  	       4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1904  }
1905  
encode_hoid(void ** p,void * end,const struct ceph_hobject_id * hoid)1906  static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1907  {
1908  	ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1909  	ceph_encode_string(p, end, hoid->key, hoid->key_len);
1910  	ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1911  	ceph_encode_64(p, hoid->snapid);
1912  	ceph_encode_32(p, hoid->hash);
1913  	ceph_encode_8(p, hoid->is_max);
1914  	ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1915  	ceph_encode_64(p, hoid->pool);
1916  }
1917  
free_hoid(struct ceph_hobject_id * hoid)1918  static void free_hoid(struct ceph_hobject_id *hoid)
1919  {
1920  	if (hoid) {
1921  		kfree(hoid->key);
1922  		kfree(hoid->oid);
1923  		kfree(hoid->nspace);
1924  		kfree(hoid);
1925  	}
1926  }
1927  
alloc_backoff(void)1928  static struct ceph_osd_backoff *alloc_backoff(void)
1929  {
1930  	struct ceph_osd_backoff *backoff;
1931  
1932  	backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1933  	if (!backoff)
1934  		return NULL;
1935  
1936  	RB_CLEAR_NODE(&backoff->spg_node);
1937  	RB_CLEAR_NODE(&backoff->id_node);
1938  	return backoff;
1939  }
1940  
free_backoff(struct ceph_osd_backoff * backoff)1941  static void free_backoff(struct ceph_osd_backoff *backoff)
1942  {
1943  	WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1944  	WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1945  
1946  	free_hoid(backoff->begin);
1947  	free_hoid(backoff->end);
1948  	kfree(backoff);
1949  }
1950  
1951  /*
1952   * Within a specific spgid, backoffs are managed by ->begin hoid.
1953   */
1954  DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1955  			RB_BYVAL, spg_node);
1956  
lookup_containing_backoff(struct rb_root * root,const struct ceph_hobject_id * hoid)1957  static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1958  					    const struct ceph_hobject_id *hoid)
1959  {
1960  	struct rb_node *n = root->rb_node;
1961  
1962  	while (n) {
1963  		struct ceph_osd_backoff *cur =
1964  		    rb_entry(n, struct ceph_osd_backoff, spg_node);
1965  		int cmp;
1966  
1967  		cmp = hoid_compare(hoid, cur->begin);
1968  		if (cmp < 0) {
1969  			n = n->rb_left;
1970  		} else if (cmp > 0) {
1971  			if (hoid_compare(hoid, cur->end) < 0)
1972  				return cur;
1973  
1974  			n = n->rb_right;
1975  		} else {
1976  			return cur;
1977  		}
1978  	}
1979  
1980  	return NULL;
1981  }
1982  
1983  /*
1984   * Each backoff has a unique id within its OSD session.
1985   */
DEFINE_RB_FUNCS(backoff_by_id,struct ceph_osd_backoff,id,id_node)1986  DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1987  
1988  static void clear_backoffs(struct ceph_osd *osd)
1989  {
1990  	while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1991  		struct ceph_spg_mapping *spg =
1992  		    rb_entry(rb_first(&osd->o_backoff_mappings),
1993  			     struct ceph_spg_mapping, node);
1994  
1995  		while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1996  			struct ceph_osd_backoff *backoff =
1997  			    rb_entry(rb_first(&spg->backoffs),
1998  				     struct ceph_osd_backoff, spg_node);
1999  
2000  			erase_backoff(&spg->backoffs, backoff);
2001  			erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
2002  			free_backoff(backoff);
2003  		}
2004  		erase_spg_mapping(&osd->o_backoff_mappings, spg);
2005  		free_spg_mapping(spg);
2006  	}
2007  }
2008  
2009  /*
2010   * Set up a temporary, non-owning view into @t.
2011   */
hoid_fill_from_target(struct ceph_hobject_id * hoid,const struct ceph_osd_request_target * t)2012  static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
2013  				  const struct ceph_osd_request_target *t)
2014  {
2015  	hoid->key = NULL;
2016  	hoid->key_len = 0;
2017  	hoid->oid = t->target_oid.name;
2018  	hoid->oid_len = t->target_oid.name_len;
2019  	hoid->snapid = CEPH_NOSNAP;
2020  	hoid->hash = t->pgid.seed;
2021  	hoid->is_max = false;
2022  	if (t->target_oloc.pool_ns) {
2023  		hoid->nspace = t->target_oloc.pool_ns->str;
2024  		hoid->nspace_len = t->target_oloc.pool_ns->len;
2025  	} else {
2026  		hoid->nspace = NULL;
2027  		hoid->nspace_len = 0;
2028  	}
2029  	hoid->pool = t->target_oloc.pool;
2030  	ceph_hoid_build_hash_cache(hoid);
2031  }
2032  
should_plug_request(struct ceph_osd_request * req)2033  static bool should_plug_request(struct ceph_osd_request *req)
2034  {
2035  	struct ceph_osd *osd = req->r_osd;
2036  	struct ceph_spg_mapping *spg;
2037  	struct ceph_osd_backoff *backoff;
2038  	struct ceph_hobject_id hoid;
2039  
2040  	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
2041  	if (!spg)
2042  		return false;
2043  
2044  	hoid_fill_from_target(&hoid, &req->r_t);
2045  	backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
2046  	if (!backoff)
2047  		return false;
2048  
2049  	dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
2050  	     __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
2051  	     backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
2052  	return true;
2053  }
2054  
2055  /*
2056   * Keep get_num_data_items() in sync with this function.
2057   */
setup_request_data(struct ceph_osd_request * req)2058  static void setup_request_data(struct ceph_osd_request *req)
2059  {
2060  	struct ceph_msg *request_msg = req->r_request;
2061  	struct ceph_msg *reply_msg = req->r_reply;
2062  	struct ceph_osd_req_op *op;
2063  
2064  	if (req->r_request->num_data_items || req->r_reply->num_data_items)
2065  		return;
2066  
2067  	WARN_ON(request_msg->data_length || reply_msg->data_length);
2068  	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
2069  		switch (op->op) {
2070  		/* request */
2071  		case CEPH_OSD_OP_WRITE:
2072  		case CEPH_OSD_OP_WRITEFULL:
2073  			WARN_ON(op->indata_len != op->extent.length);
2074  			ceph_osdc_msg_data_add(request_msg,
2075  					       &op->extent.osd_data);
2076  			break;
2077  		case CEPH_OSD_OP_SETXATTR:
2078  		case CEPH_OSD_OP_CMPXATTR:
2079  			WARN_ON(op->indata_len != op->xattr.name_len +
2080  						  op->xattr.value_len);
2081  			ceph_osdc_msg_data_add(request_msg,
2082  					       &op->xattr.osd_data);
2083  			break;
2084  		case CEPH_OSD_OP_NOTIFY_ACK:
2085  			ceph_osdc_msg_data_add(request_msg,
2086  					       &op->notify_ack.request_data);
2087  			break;
2088  		case CEPH_OSD_OP_COPY_FROM2:
2089  			ceph_osdc_msg_data_add(request_msg,
2090  					       &op->copy_from.osd_data);
2091  			break;
2092  
2093  		/* reply */
2094  		case CEPH_OSD_OP_STAT:
2095  			ceph_osdc_msg_data_add(reply_msg,
2096  					       &op->raw_data_in);
2097  			break;
2098  		case CEPH_OSD_OP_READ:
2099  		case CEPH_OSD_OP_SPARSE_READ:
2100  			ceph_osdc_msg_data_add(reply_msg,
2101  					       &op->extent.osd_data);
2102  			break;
2103  		case CEPH_OSD_OP_LIST_WATCHERS:
2104  			ceph_osdc_msg_data_add(reply_msg,
2105  					       &op->list_watchers.response_data);
2106  			break;
2107  
2108  		/* both */
2109  		case CEPH_OSD_OP_CALL:
2110  			WARN_ON(op->indata_len != op->cls.class_len +
2111  						  op->cls.method_len +
2112  						  op->cls.indata_len);
2113  			ceph_osdc_msg_data_add(request_msg,
2114  					       &op->cls.request_info);
2115  			/* optional, can be NONE */
2116  			ceph_osdc_msg_data_add(request_msg,
2117  					       &op->cls.request_data);
2118  			/* optional, can be NONE */
2119  			ceph_osdc_msg_data_add(reply_msg,
2120  					       &op->cls.response_data);
2121  			break;
2122  		case CEPH_OSD_OP_NOTIFY:
2123  			ceph_osdc_msg_data_add(request_msg,
2124  					       &op->notify.request_data);
2125  			ceph_osdc_msg_data_add(reply_msg,
2126  					       &op->notify.response_data);
2127  			break;
2128  		}
2129  	}
2130  }
2131  
encode_pgid(void ** p,const struct ceph_pg * pgid)2132  static void encode_pgid(void **p, const struct ceph_pg *pgid)
2133  {
2134  	ceph_encode_8(p, 1);
2135  	ceph_encode_64(p, pgid->pool);
2136  	ceph_encode_32(p, pgid->seed);
2137  	ceph_encode_32(p, -1); /* preferred */
2138  }
2139  
encode_spgid(void ** p,const struct ceph_spg * spgid)2140  static void encode_spgid(void **p, const struct ceph_spg *spgid)
2141  {
2142  	ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
2143  	encode_pgid(p, &spgid->pgid);
2144  	ceph_encode_8(p, spgid->shard);
2145  }
2146  
encode_oloc(void ** p,void * end,const struct ceph_object_locator * oloc)2147  static void encode_oloc(void **p, void *end,
2148  			const struct ceph_object_locator *oloc)
2149  {
2150  	ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
2151  	ceph_encode_64(p, oloc->pool);
2152  	ceph_encode_32(p, -1); /* preferred */
2153  	ceph_encode_32(p, 0);  /* key len */
2154  	if (oloc->pool_ns)
2155  		ceph_encode_string(p, end, oloc->pool_ns->str,
2156  				   oloc->pool_ns->len);
2157  	else
2158  		ceph_encode_32(p, 0);
2159  }
2160  
encode_request_partial(struct ceph_osd_request * req,struct ceph_msg * msg)2161  static void encode_request_partial(struct ceph_osd_request *req,
2162  				   struct ceph_msg *msg)
2163  {
2164  	void *p = msg->front.iov_base;
2165  	void *const end = p + msg->front_alloc_len;
2166  	u32 data_len = 0;
2167  	int i;
2168  
2169  	if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
2170  		/* snapshots aren't writeable */
2171  		WARN_ON(req->r_snapid != CEPH_NOSNAP);
2172  	} else {
2173  		WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
2174  			req->r_data_offset || req->r_snapc);
2175  	}
2176  
2177  	setup_request_data(req);
2178  
2179  	encode_spgid(&p, &req->r_t.spgid); /* actual spg */
2180  	ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
2181  	ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
2182  	ceph_encode_32(&p, req->r_flags);
2183  
2184  	/* reqid */
2185  	ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
2186  	memset(p, 0, sizeof(struct ceph_osd_reqid));
2187  	p += sizeof(struct ceph_osd_reqid);
2188  
2189  	/* trace */
2190  	memset(p, 0, sizeof(struct ceph_blkin_trace_info));
2191  	p += sizeof(struct ceph_blkin_trace_info);
2192  
2193  	ceph_encode_32(&p, 0); /* client_inc, always 0 */
2194  	ceph_encode_timespec64(p, &req->r_mtime);
2195  	p += sizeof(struct ceph_timespec);
2196  
2197  	encode_oloc(&p, end, &req->r_t.target_oloc);
2198  	ceph_encode_string(&p, end, req->r_t.target_oid.name,
2199  			   req->r_t.target_oid.name_len);
2200  
2201  	/* ops, can imply data */
2202  	ceph_encode_16(&p, req->r_num_ops);
2203  	for (i = 0; i < req->r_num_ops; i++) {
2204  		data_len += osd_req_encode_op(p, &req->r_ops[i]);
2205  		p += sizeof(struct ceph_osd_op);
2206  	}
2207  
2208  	ceph_encode_64(&p, req->r_snapid); /* snapid */
2209  	if (req->r_snapc) {
2210  		ceph_encode_64(&p, req->r_snapc->seq);
2211  		ceph_encode_32(&p, req->r_snapc->num_snaps);
2212  		for (i = 0; i < req->r_snapc->num_snaps; i++)
2213  			ceph_encode_64(&p, req->r_snapc->snaps[i]);
2214  	} else {
2215  		ceph_encode_64(&p, 0); /* snap_seq */
2216  		ceph_encode_32(&p, 0); /* snaps len */
2217  	}
2218  
2219  	ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
2220  	BUG_ON(p > end - 8); /* space for features */
2221  
2222  	msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
2223  	/* front_len is finalized in encode_request_finish() */
2224  	msg->front.iov_len = p - msg->front.iov_base;
2225  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2226  	msg->hdr.data_len = cpu_to_le32(data_len);
2227  	/*
2228  	 * The header "data_off" is a hint to the receiver allowing it
2229  	 * to align received data into its buffers such that there's no
2230  	 * need to re-copy it before writing it to disk (direct I/O).
2231  	 */
2232  	msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
2233  
2234  	dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
2235  	     req->r_t.target_oid.name, req->r_t.target_oid.name_len);
2236  }
2237  
encode_request_finish(struct ceph_msg * msg)2238  static void encode_request_finish(struct ceph_msg *msg)
2239  {
2240  	void *p = msg->front.iov_base;
2241  	void *const partial_end = p + msg->front.iov_len;
2242  	void *const end = p + msg->front_alloc_len;
2243  
2244  	if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
2245  		/* luminous OSD -- encode features and be done */
2246  		p = partial_end;
2247  		ceph_encode_64(&p, msg->con->peer_features);
2248  	} else {
2249  		struct {
2250  			char spgid[CEPH_ENCODING_START_BLK_LEN +
2251  				   CEPH_PGID_ENCODING_LEN + 1];
2252  			__le32 hash;
2253  			__le32 epoch;
2254  			__le32 flags;
2255  			char reqid[CEPH_ENCODING_START_BLK_LEN +
2256  				   sizeof(struct ceph_osd_reqid)];
2257  			char trace[sizeof(struct ceph_blkin_trace_info)];
2258  			__le32 client_inc;
2259  			struct ceph_timespec mtime;
2260  		} __packed head;
2261  		struct ceph_pg pgid;
2262  		void *oloc, *oid, *tail;
2263  		int oloc_len, oid_len, tail_len;
2264  		int len;
2265  
2266  		/*
2267  		 * Pre-luminous OSD -- reencode v8 into v4 using @head
2268  		 * as a temporary buffer.  Encode the raw PG; the rest
2269  		 * is just a matter of moving oloc, oid and tail blobs
2270  		 * around.
2271  		 */
2272  		memcpy(&head, p, sizeof(head));
2273  		p += sizeof(head);
2274  
2275  		oloc = p;
2276  		p += CEPH_ENCODING_START_BLK_LEN;
2277  		pgid.pool = ceph_decode_64(&p);
2278  		p += 4 + 4; /* preferred, key len */
2279  		len = ceph_decode_32(&p);
2280  		p += len;   /* nspace */
2281  		oloc_len = p - oloc;
2282  
2283  		oid = p;
2284  		len = ceph_decode_32(&p);
2285  		p += len;
2286  		oid_len = p - oid;
2287  
2288  		tail = p;
2289  		tail_len = partial_end - p;
2290  
2291  		p = msg->front.iov_base;
2292  		ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2293  		ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2294  		ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2295  		ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2296  
2297  		/* reassert_version */
2298  		memset(p, 0, sizeof(struct ceph_eversion));
2299  		p += sizeof(struct ceph_eversion);
2300  
2301  		BUG_ON(p >= oloc);
2302  		memmove(p, oloc, oloc_len);
2303  		p += oloc_len;
2304  
2305  		pgid.seed = le32_to_cpu(head.hash);
2306  		encode_pgid(&p, &pgid); /* raw pg */
2307  
2308  		BUG_ON(p >= oid);
2309  		memmove(p, oid, oid_len);
2310  		p += oid_len;
2311  
2312  		/* tail -- ops, snapid, snapc, retry_attempt */
2313  		BUG_ON(p >= tail);
2314  		memmove(p, tail, tail_len);
2315  		p += tail_len;
2316  
2317  		msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2318  	}
2319  
2320  	BUG_ON(p > end);
2321  	msg->front.iov_len = p - msg->front.iov_base;
2322  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2323  
2324  	dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2325  	     le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2326  	     le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2327  	     le16_to_cpu(msg->hdr.version));
2328  }
2329  
2330  /*
2331   * @req has to be assigned a tid and registered.
2332   */
send_request(struct ceph_osd_request * req)2333  static void send_request(struct ceph_osd_request *req)
2334  {
2335  	struct ceph_osd *osd = req->r_osd;
2336  
2337  	verify_osd_locked(osd);
2338  	WARN_ON(osd->o_osd != req->r_t.osd);
2339  
2340  	/* backoff? */
2341  	if (should_plug_request(req))
2342  		return;
2343  
2344  	/*
2345  	 * We may have a previously queued request message hanging
2346  	 * around.  Cancel it to avoid corrupting the msgr.
2347  	 */
2348  	if (req->r_sent)
2349  		ceph_msg_revoke(req->r_request);
2350  
2351  	req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2352  	if (req->r_attempts)
2353  		req->r_flags |= CEPH_OSD_FLAG_RETRY;
2354  	else
2355  		WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2356  
2357  	encode_request_partial(req, req->r_request);
2358  
2359  	dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2360  	     __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2361  	     req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2362  	     req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2363  	     req->r_attempts);
2364  
2365  	req->r_t.paused = false;
2366  	req->r_stamp = jiffies;
2367  	req->r_attempts++;
2368  
2369  	req->r_sent = osd->o_incarnation;
2370  	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2371  	ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2372  }
2373  
maybe_request_map(struct ceph_osd_client * osdc)2374  static void maybe_request_map(struct ceph_osd_client *osdc)
2375  {
2376  	bool continuous = false;
2377  
2378  	verify_osdc_locked(osdc);
2379  	WARN_ON(!osdc->osdmap->epoch);
2380  
2381  	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2382  	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2383  	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2384  		dout("%s osdc %p continuous\n", __func__, osdc);
2385  		continuous = true;
2386  	} else {
2387  		dout("%s osdc %p onetime\n", __func__, osdc);
2388  	}
2389  
2390  	if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2391  			       osdc->osdmap->epoch + 1, continuous))
2392  		ceph_monc_renew_subs(&osdc->client->monc);
2393  }
2394  
2395  static void complete_request(struct ceph_osd_request *req, int err);
2396  static void send_map_check(struct ceph_osd_request *req);
2397  
__submit_request(struct ceph_osd_request * req,bool wrlocked)2398  static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2399  {
2400  	struct ceph_osd_client *osdc = req->r_osdc;
2401  	struct ceph_osd *osd;
2402  	enum calc_target_result ct_res;
2403  	int err = 0;
2404  	bool need_send = false;
2405  	bool promoted = false;
2406  
2407  	WARN_ON(req->r_tid);
2408  	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2409  
2410  again:
2411  	ct_res = calc_target(osdc, &req->r_t, false);
2412  	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2413  		goto promote;
2414  
2415  	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2416  	if (IS_ERR(osd)) {
2417  		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2418  		goto promote;
2419  	}
2420  
2421  	if (osdc->abort_err) {
2422  		dout("req %p abort_err %d\n", req, osdc->abort_err);
2423  		err = osdc->abort_err;
2424  	} else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2425  		dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2426  		     osdc->epoch_barrier);
2427  		req->r_t.paused = true;
2428  		maybe_request_map(osdc);
2429  	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2430  		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2431  		dout("req %p pausewr\n", req);
2432  		req->r_t.paused = true;
2433  		maybe_request_map(osdc);
2434  	} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2435  		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2436  		dout("req %p pauserd\n", req);
2437  		req->r_t.paused = true;
2438  		maybe_request_map(osdc);
2439  	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2440  		   !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2441  				     CEPH_OSD_FLAG_FULL_FORCE)) &&
2442  		   (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2443  		    pool_full(osdc, req->r_t.base_oloc.pool))) {
2444  		dout("req %p full/pool_full\n", req);
2445  		if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
2446  			err = -ENOSPC;
2447  		} else {
2448  			if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL))
2449  				pr_warn_ratelimited("cluster is full (osdmap FULL)\n");
2450  			else
2451  				pr_warn_ratelimited("pool %lld is full or reached quota\n",
2452  						    req->r_t.base_oloc.pool);
2453  			req->r_t.paused = true;
2454  			maybe_request_map(osdc);
2455  		}
2456  	} else if (!osd_homeless(osd)) {
2457  		need_send = true;
2458  	} else {
2459  		maybe_request_map(osdc);
2460  	}
2461  
2462  	mutex_lock(&osd->lock);
2463  	/*
2464  	 * Assign the tid atomically with send_request() to protect
2465  	 * multiple writes to the same object from racing with each
2466  	 * other, resulting in out of order ops on the OSDs.
2467  	 */
2468  	req->r_tid = atomic64_inc_return(&osdc->last_tid);
2469  	link_request(osd, req);
2470  	if (need_send)
2471  		send_request(req);
2472  	else if (err)
2473  		complete_request(req, err);
2474  	mutex_unlock(&osd->lock);
2475  
2476  	if (!err && ct_res == CALC_TARGET_POOL_DNE)
2477  		send_map_check(req);
2478  
2479  	if (promoted)
2480  		downgrade_write(&osdc->lock);
2481  	return;
2482  
2483  promote:
2484  	up_read(&osdc->lock);
2485  	down_write(&osdc->lock);
2486  	wrlocked = true;
2487  	promoted = true;
2488  	goto again;
2489  }
2490  
account_request(struct ceph_osd_request * req)2491  static void account_request(struct ceph_osd_request *req)
2492  {
2493  	WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2494  	WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2495  
2496  	req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2497  	atomic_inc(&req->r_osdc->num_requests);
2498  
2499  	req->r_start_stamp = jiffies;
2500  	req->r_start_latency = ktime_get();
2501  }
2502  
submit_request(struct ceph_osd_request * req,bool wrlocked)2503  static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2504  {
2505  	ceph_osdc_get_request(req);
2506  	account_request(req);
2507  	__submit_request(req, wrlocked);
2508  }
2509  
finish_request(struct ceph_osd_request * req)2510  static void finish_request(struct ceph_osd_request *req)
2511  {
2512  	struct ceph_osd_client *osdc = req->r_osdc;
2513  
2514  	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2515  	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2516  
2517  	req->r_end_latency = ktime_get();
2518  
2519  	if (req->r_osd) {
2520  		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
2521  		unlink_request(req->r_osd, req);
2522  	}
2523  	atomic_dec(&osdc->num_requests);
2524  
2525  	/*
2526  	 * If an OSD has failed or returned and a request has been sent
2527  	 * twice, it's possible to get a reply and end up here while the
2528  	 * request message is queued for delivery.  We will ignore the
2529  	 * reply, so not a big deal, but better to try and catch it.
2530  	 */
2531  	ceph_msg_revoke(req->r_request);
2532  	ceph_msg_revoke_incoming(req->r_reply);
2533  }
2534  
__complete_request(struct ceph_osd_request * req)2535  static void __complete_request(struct ceph_osd_request *req)
2536  {
2537  	dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
2538  	     req->r_tid, req->r_callback, req->r_result);
2539  
2540  	if (req->r_callback)
2541  		req->r_callback(req);
2542  	complete_all(&req->r_completion);
2543  	ceph_osdc_put_request(req);
2544  }
2545  
complete_request_workfn(struct work_struct * work)2546  static void complete_request_workfn(struct work_struct *work)
2547  {
2548  	struct ceph_osd_request *req =
2549  	    container_of(work, struct ceph_osd_request, r_complete_work);
2550  
2551  	__complete_request(req);
2552  }
2553  
2554  /*
2555   * This is open-coded in handle_reply().
2556   */
complete_request(struct ceph_osd_request * req,int err)2557  static void complete_request(struct ceph_osd_request *req, int err)
2558  {
2559  	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2560  
2561  	req->r_result = err;
2562  	finish_request(req);
2563  
2564  	INIT_WORK(&req->r_complete_work, complete_request_workfn);
2565  	queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2566  }
2567  
cancel_map_check(struct ceph_osd_request * req)2568  static void cancel_map_check(struct ceph_osd_request *req)
2569  {
2570  	struct ceph_osd_client *osdc = req->r_osdc;
2571  	struct ceph_osd_request *lookup_req;
2572  
2573  	verify_osdc_wrlocked(osdc);
2574  
2575  	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2576  	if (!lookup_req)
2577  		return;
2578  
2579  	WARN_ON(lookup_req != req);
2580  	erase_request_mc(&osdc->map_checks, req);
2581  	ceph_osdc_put_request(req);
2582  }
2583  
cancel_request(struct ceph_osd_request * req)2584  static void cancel_request(struct ceph_osd_request *req)
2585  {
2586  	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2587  
2588  	cancel_map_check(req);
2589  	finish_request(req);
2590  	complete_all(&req->r_completion);
2591  	ceph_osdc_put_request(req);
2592  }
2593  
abort_request(struct ceph_osd_request * req,int err)2594  static void abort_request(struct ceph_osd_request *req, int err)
2595  {
2596  	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2597  
2598  	cancel_map_check(req);
2599  	complete_request(req, err);
2600  }
2601  
abort_fn(struct ceph_osd_request * req,void * arg)2602  static int abort_fn(struct ceph_osd_request *req, void *arg)
2603  {
2604  	int err = *(int *)arg;
2605  
2606  	abort_request(req, err);
2607  	return 0; /* continue iteration */
2608  }
2609  
2610  /*
2611   * Abort all in-flight requests with @err and arrange for all future
2612   * requests to be failed immediately.
2613   */
ceph_osdc_abort_requests(struct ceph_osd_client * osdc,int err)2614  void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2615  {
2616  	dout("%s osdc %p err %d\n", __func__, osdc, err);
2617  	down_write(&osdc->lock);
2618  	for_each_request(osdc, abort_fn, &err);
2619  	osdc->abort_err = err;
2620  	up_write(&osdc->lock);
2621  }
2622  EXPORT_SYMBOL(ceph_osdc_abort_requests);
2623  
ceph_osdc_clear_abort_err(struct ceph_osd_client * osdc)2624  void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
2625  {
2626  	down_write(&osdc->lock);
2627  	osdc->abort_err = 0;
2628  	up_write(&osdc->lock);
2629  }
2630  EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
2631  
update_epoch_barrier(struct ceph_osd_client * osdc,u32 eb)2632  static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2633  {
2634  	if (likely(eb > osdc->epoch_barrier)) {
2635  		dout("updating epoch_barrier from %u to %u\n",
2636  				osdc->epoch_barrier, eb);
2637  		osdc->epoch_barrier = eb;
2638  		/* Request map if we're not to the barrier yet */
2639  		if (eb > osdc->osdmap->epoch)
2640  			maybe_request_map(osdc);
2641  	}
2642  }
2643  
ceph_osdc_update_epoch_barrier(struct ceph_osd_client * osdc,u32 eb)2644  void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2645  {
2646  	down_read(&osdc->lock);
2647  	if (unlikely(eb > osdc->epoch_barrier)) {
2648  		up_read(&osdc->lock);
2649  		down_write(&osdc->lock);
2650  		update_epoch_barrier(osdc, eb);
2651  		up_write(&osdc->lock);
2652  	} else {
2653  		up_read(&osdc->lock);
2654  	}
2655  }
2656  EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2657  
2658  /*
2659   * We can end up releasing caps as a result of abort_request().
2660   * In that case, we probably want to ensure that the cap release message
2661   * has an updated epoch barrier in it, so set the epoch barrier prior to
2662   * aborting the first request.
2663   */
abort_on_full_fn(struct ceph_osd_request * req,void * arg)2664  static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2665  {
2666  	struct ceph_osd_client *osdc = req->r_osdc;
2667  	bool *victims = arg;
2668  
2669  	if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2670  	    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2671  	     pool_full(osdc, req->r_t.base_oloc.pool))) {
2672  		if (!*victims) {
2673  			update_epoch_barrier(osdc, osdc->osdmap->epoch);
2674  			*victims = true;
2675  		}
2676  		abort_request(req, -ENOSPC);
2677  	}
2678  
2679  	return 0; /* continue iteration */
2680  }
2681  
2682  /*
2683   * Drop all pending requests that are stalled waiting on a full condition to
2684   * clear, and complete them with ENOSPC as the return code. Set the
2685   * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2686   * cancelled.
2687   */
ceph_osdc_abort_on_full(struct ceph_osd_client * osdc)2688  static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2689  {
2690  	bool victims = false;
2691  
2692  	if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
2693  	    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2694  		for_each_request(osdc, abort_on_full_fn, &victims);
2695  }
2696  
check_pool_dne(struct ceph_osd_request * req)2697  static void check_pool_dne(struct ceph_osd_request *req)
2698  {
2699  	struct ceph_osd_client *osdc = req->r_osdc;
2700  	struct ceph_osdmap *map = osdc->osdmap;
2701  
2702  	verify_osdc_wrlocked(osdc);
2703  	WARN_ON(!map->epoch);
2704  
2705  	if (req->r_attempts) {
2706  		/*
2707  		 * We sent a request earlier, which means that
2708  		 * previously the pool existed, and now it does not
2709  		 * (i.e., it was deleted).
2710  		 */
2711  		req->r_map_dne_bound = map->epoch;
2712  		dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2713  		     req->r_tid);
2714  	} else {
2715  		dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2716  		     req, req->r_tid, req->r_map_dne_bound, map->epoch);
2717  	}
2718  
2719  	if (req->r_map_dne_bound) {
2720  		if (map->epoch >= req->r_map_dne_bound) {
2721  			/* we had a new enough map */
2722  			pr_info_ratelimited("tid %llu pool does not exist\n",
2723  					    req->r_tid);
2724  			complete_request(req, -ENOENT);
2725  		}
2726  	} else {
2727  		send_map_check(req);
2728  	}
2729  }
2730  
map_check_cb(struct ceph_mon_generic_request * greq)2731  static void map_check_cb(struct ceph_mon_generic_request *greq)
2732  {
2733  	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2734  	struct ceph_osd_request *req;
2735  	u64 tid = greq->private_data;
2736  
2737  	WARN_ON(greq->result || !greq->u.newest);
2738  
2739  	down_write(&osdc->lock);
2740  	req = lookup_request_mc(&osdc->map_checks, tid);
2741  	if (!req) {
2742  		dout("%s tid %llu dne\n", __func__, tid);
2743  		goto out_unlock;
2744  	}
2745  
2746  	dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2747  	     req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2748  	if (!req->r_map_dne_bound)
2749  		req->r_map_dne_bound = greq->u.newest;
2750  	erase_request_mc(&osdc->map_checks, req);
2751  	check_pool_dne(req);
2752  
2753  	ceph_osdc_put_request(req);
2754  out_unlock:
2755  	up_write(&osdc->lock);
2756  }
2757  
send_map_check(struct ceph_osd_request * req)2758  static void send_map_check(struct ceph_osd_request *req)
2759  {
2760  	struct ceph_osd_client *osdc = req->r_osdc;
2761  	struct ceph_osd_request *lookup_req;
2762  	int ret;
2763  
2764  	verify_osdc_wrlocked(osdc);
2765  
2766  	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2767  	if (lookup_req) {
2768  		WARN_ON(lookup_req != req);
2769  		return;
2770  	}
2771  
2772  	ceph_osdc_get_request(req);
2773  	insert_request_mc(&osdc->map_checks, req);
2774  	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2775  					  map_check_cb, req->r_tid);
2776  	WARN_ON(ret);
2777  }
2778  
2779  /*
2780   * lingering requests, watch/notify v2 infrastructure
2781   */
linger_release(struct kref * kref)2782  static void linger_release(struct kref *kref)
2783  {
2784  	struct ceph_osd_linger_request *lreq =
2785  	    container_of(kref, struct ceph_osd_linger_request, kref);
2786  
2787  	dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2788  	     lreq->reg_req, lreq->ping_req);
2789  	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2790  	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2791  	WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2792  	WARN_ON(!list_empty(&lreq->scan_item));
2793  	WARN_ON(!list_empty(&lreq->pending_lworks));
2794  	WARN_ON(lreq->osd);
2795  
2796  	if (lreq->request_pl)
2797  		ceph_pagelist_release(lreq->request_pl);
2798  	if (lreq->notify_id_pages)
2799  		ceph_release_page_vector(lreq->notify_id_pages, 1);
2800  
2801  	ceph_osdc_put_request(lreq->reg_req);
2802  	ceph_osdc_put_request(lreq->ping_req);
2803  	target_destroy(&lreq->t);
2804  	kfree(lreq);
2805  }
2806  
linger_put(struct ceph_osd_linger_request * lreq)2807  static void linger_put(struct ceph_osd_linger_request *lreq)
2808  {
2809  	if (lreq)
2810  		kref_put(&lreq->kref, linger_release);
2811  }
2812  
2813  static struct ceph_osd_linger_request *
linger_get(struct ceph_osd_linger_request * lreq)2814  linger_get(struct ceph_osd_linger_request *lreq)
2815  {
2816  	kref_get(&lreq->kref);
2817  	return lreq;
2818  }
2819  
2820  static struct ceph_osd_linger_request *
linger_alloc(struct ceph_osd_client * osdc)2821  linger_alloc(struct ceph_osd_client *osdc)
2822  {
2823  	struct ceph_osd_linger_request *lreq;
2824  
2825  	lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2826  	if (!lreq)
2827  		return NULL;
2828  
2829  	kref_init(&lreq->kref);
2830  	mutex_init(&lreq->lock);
2831  	RB_CLEAR_NODE(&lreq->node);
2832  	RB_CLEAR_NODE(&lreq->osdc_node);
2833  	RB_CLEAR_NODE(&lreq->mc_node);
2834  	INIT_LIST_HEAD(&lreq->scan_item);
2835  	INIT_LIST_HEAD(&lreq->pending_lworks);
2836  	init_completion(&lreq->reg_commit_wait);
2837  	init_completion(&lreq->notify_finish_wait);
2838  
2839  	lreq->osdc = osdc;
2840  	target_init(&lreq->t);
2841  
2842  	dout("%s lreq %p\n", __func__, lreq);
2843  	return lreq;
2844  }
2845  
DEFINE_RB_INSDEL_FUNCS(linger,struct ceph_osd_linger_request,linger_id,node)2846  DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2847  DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2848  DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2849  
2850  /*
2851   * Create linger request <-> OSD session relation.
2852   *
2853   * @lreq has to be registered, @osd may be homeless.
2854   */
2855  static void link_linger(struct ceph_osd *osd,
2856  			struct ceph_osd_linger_request *lreq)
2857  {
2858  	verify_osd_locked(osd);
2859  	WARN_ON(!lreq->linger_id || lreq->osd);
2860  	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2861  	     osd->o_osd, lreq, lreq->linger_id);
2862  
2863  	if (!osd_homeless(osd))
2864  		__remove_osd_from_lru(osd);
2865  	else
2866  		atomic_inc(&osd->o_osdc->num_homeless);
2867  
2868  	get_osd(osd);
2869  	insert_linger(&osd->o_linger_requests, lreq);
2870  	lreq->osd = osd;
2871  }
2872  
unlink_linger(struct ceph_osd * osd,struct ceph_osd_linger_request * lreq)2873  static void unlink_linger(struct ceph_osd *osd,
2874  			  struct ceph_osd_linger_request *lreq)
2875  {
2876  	verify_osd_locked(osd);
2877  	WARN_ON(lreq->osd != osd);
2878  	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2879  	     osd->o_osd, lreq, lreq->linger_id);
2880  
2881  	lreq->osd = NULL;
2882  	erase_linger(&osd->o_linger_requests, lreq);
2883  	put_osd(osd);
2884  
2885  	if (!osd_homeless(osd))
2886  		maybe_move_osd_to_lru(osd);
2887  	else
2888  		atomic_dec(&osd->o_osdc->num_homeless);
2889  }
2890  
__linger_registered(struct ceph_osd_linger_request * lreq)2891  static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2892  {
2893  	verify_osdc_locked(lreq->osdc);
2894  
2895  	return !RB_EMPTY_NODE(&lreq->osdc_node);
2896  }
2897  
linger_registered(struct ceph_osd_linger_request * lreq)2898  static bool linger_registered(struct ceph_osd_linger_request *lreq)
2899  {
2900  	struct ceph_osd_client *osdc = lreq->osdc;
2901  	bool registered;
2902  
2903  	down_read(&osdc->lock);
2904  	registered = __linger_registered(lreq);
2905  	up_read(&osdc->lock);
2906  
2907  	return registered;
2908  }
2909  
linger_register(struct ceph_osd_linger_request * lreq)2910  static void linger_register(struct ceph_osd_linger_request *lreq)
2911  {
2912  	struct ceph_osd_client *osdc = lreq->osdc;
2913  
2914  	verify_osdc_wrlocked(osdc);
2915  	WARN_ON(lreq->linger_id);
2916  
2917  	linger_get(lreq);
2918  	lreq->linger_id = ++osdc->last_linger_id;
2919  	insert_linger_osdc(&osdc->linger_requests, lreq);
2920  }
2921  
linger_unregister(struct ceph_osd_linger_request * lreq)2922  static void linger_unregister(struct ceph_osd_linger_request *lreq)
2923  {
2924  	struct ceph_osd_client *osdc = lreq->osdc;
2925  
2926  	verify_osdc_wrlocked(osdc);
2927  
2928  	erase_linger_osdc(&osdc->linger_requests, lreq);
2929  	linger_put(lreq);
2930  }
2931  
cancel_linger_request(struct ceph_osd_request * req)2932  static void cancel_linger_request(struct ceph_osd_request *req)
2933  {
2934  	struct ceph_osd_linger_request *lreq = req->r_priv;
2935  
2936  	WARN_ON(!req->r_linger);
2937  	cancel_request(req);
2938  	linger_put(lreq);
2939  }
2940  
2941  struct linger_work {
2942  	struct work_struct work;
2943  	struct ceph_osd_linger_request *lreq;
2944  	struct list_head pending_item;
2945  	unsigned long queued_stamp;
2946  
2947  	union {
2948  		struct {
2949  			u64 notify_id;
2950  			u64 notifier_id;
2951  			void *payload; /* points into @msg front */
2952  			size_t payload_len;
2953  
2954  			struct ceph_msg *msg; /* for ceph_msg_put() */
2955  		} notify;
2956  		struct {
2957  			int err;
2958  		} error;
2959  	};
2960  };
2961  
lwork_alloc(struct ceph_osd_linger_request * lreq,work_func_t workfn)2962  static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2963  				       work_func_t workfn)
2964  {
2965  	struct linger_work *lwork;
2966  
2967  	lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2968  	if (!lwork)
2969  		return NULL;
2970  
2971  	INIT_WORK(&lwork->work, workfn);
2972  	INIT_LIST_HEAD(&lwork->pending_item);
2973  	lwork->lreq = linger_get(lreq);
2974  
2975  	return lwork;
2976  }
2977  
lwork_free(struct linger_work * lwork)2978  static void lwork_free(struct linger_work *lwork)
2979  {
2980  	struct ceph_osd_linger_request *lreq = lwork->lreq;
2981  
2982  	mutex_lock(&lreq->lock);
2983  	list_del(&lwork->pending_item);
2984  	mutex_unlock(&lreq->lock);
2985  
2986  	linger_put(lreq);
2987  	kfree(lwork);
2988  }
2989  
lwork_queue(struct linger_work * lwork)2990  static void lwork_queue(struct linger_work *lwork)
2991  {
2992  	struct ceph_osd_linger_request *lreq = lwork->lreq;
2993  	struct ceph_osd_client *osdc = lreq->osdc;
2994  
2995  	verify_lreq_locked(lreq);
2996  	WARN_ON(!list_empty(&lwork->pending_item));
2997  
2998  	lwork->queued_stamp = jiffies;
2999  	list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
3000  	queue_work(osdc->notify_wq, &lwork->work);
3001  }
3002  
do_watch_notify(struct work_struct * w)3003  static void do_watch_notify(struct work_struct *w)
3004  {
3005  	struct linger_work *lwork = container_of(w, struct linger_work, work);
3006  	struct ceph_osd_linger_request *lreq = lwork->lreq;
3007  
3008  	if (!linger_registered(lreq)) {
3009  		dout("%s lreq %p not registered\n", __func__, lreq);
3010  		goto out;
3011  	}
3012  
3013  	WARN_ON(!lreq->is_watch);
3014  	dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
3015  	     __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
3016  	     lwork->notify.payload_len);
3017  	lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
3018  		  lwork->notify.notifier_id, lwork->notify.payload,
3019  		  lwork->notify.payload_len);
3020  
3021  out:
3022  	ceph_msg_put(lwork->notify.msg);
3023  	lwork_free(lwork);
3024  }
3025  
do_watch_error(struct work_struct * w)3026  static void do_watch_error(struct work_struct *w)
3027  {
3028  	struct linger_work *lwork = container_of(w, struct linger_work, work);
3029  	struct ceph_osd_linger_request *lreq = lwork->lreq;
3030  
3031  	if (!linger_registered(lreq)) {
3032  		dout("%s lreq %p not registered\n", __func__, lreq);
3033  		goto out;
3034  	}
3035  
3036  	dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
3037  	lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
3038  
3039  out:
3040  	lwork_free(lwork);
3041  }
3042  
queue_watch_error(struct ceph_osd_linger_request * lreq)3043  static void queue_watch_error(struct ceph_osd_linger_request *lreq)
3044  {
3045  	struct linger_work *lwork;
3046  
3047  	lwork = lwork_alloc(lreq, do_watch_error);
3048  	if (!lwork) {
3049  		pr_err("failed to allocate error-lwork\n");
3050  		return;
3051  	}
3052  
3053  	lwork->error.err = lreq->last_error;
3054  	lwork_queue(lwork);
3055  }
3056  
linger_reg_commit_complete(struct ceph_osd_linger_request * lreq,int result)3057  static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
3058  				       int result)
3059  {
3060  	if (!completion_done(&lreq->reg_commit_wait)) {
3061  		lreq->reg_commit_error = (result <= 0 ? result : 0);
3062  		complete_all(&lreq->reg_commit_wait);
3063  	}
3064  }
3065  
linger_commit_cb(struct ceph_osd_request * req)3066  static void linger_commit_cb(struct ceph_osd_request *req)
3067  {
3068  	struct ceph_osd_linger_request *lreq = req->r_priv;
3069  
3070  	mutex_lock(&lreq->lock);
3071  	if (req != lreq->reg_req) {
3072  		dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3073  		     __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3074  		goto out;
3075  	}
3076  
3077  	dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
3078  	     lreq->linger_id, req->r_result);
3079  	linger_reg_commit_complete(lreq, req->r_result);
3080  	lreq->committed = true;
3081  
3082  	if (!lreq->is_watch) {
3083  		struct ceph_osd_data *osd_data =
3084  		    osd_req_op_data(req, 0, notify, response_data);
3085  		void *p = page_address(osd_data->pages[0]);
3086  
3087  		WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
3088  			osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
3089  
3090  		/* make note of the notify_id */
3091  		if (req->r_ops[0].outdata_len >= sizeof(u64)) {
3092  			lreq->notify_id = ceph_decode_64(&p);
3093  			dout("lreq %p notify_id %llu\n", lreq,
3094  			     lreq->notify_id);
3095  		} else {
3096  			dout("lreq %p no notify_id\n", lreq);
3097  		}
3098  	}
3099  
3100  out:
3101  	mutex_unlock(&lreq->lock);
3102  	linger_put(lreq);
3103  }
3104  
normalize_watch_error(int err)3105  static int normalize_watch_error(int err)
3106  {
3107  	/*
3108  	 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
3109  	 * notification and a failure to reconnect because we raced with
3110  	 * the delete appear the same to the user.
3111  	 */
3112  	if (err == -ENOENT)
3113  		err = -ENOTCONN;
3114  
3115  	return err;
3116  }
3117  
linger_reconnect_cb(struct ceph_osd_request * req)3118  static void linger_reconnect_cb(struct ceph_osd_request *req)
3119  {
3120  	struct ceph_osd_linger_request *lreq = req->r_priv;
3121  
3122  	mutex_lock(&lreq->lock);
3123  	if (req != lreq->reg_req) {
3124  		dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3125  		     __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3126  		goto out;
3127  	}
3128  
3129  	dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
3130  	     lreq, lreq->linger_id, req->r_result, lreq->last_error);
3131  	if (req->r_result < 0) {
3132  		if (!lreq->last_error) {
3133  			lreq->last_error = normalize_watch_error(req->r_result);
3134  			queue_watch_error(lreq);
3135  		}
3136  	}
3137  
3138  out:
3139  	mutex_unlock(&lreq->lock);
3140  	linger_put(lreq);
3141  }
3142  
send_linger(struct ceph_osd_linger_request * lreq)3143  static void send_linger(struct ceph_osd_linger_request *lreq)
3144  {
3145  	struct ceph_osd_client *osdc = lreq->osdc;
3146  	struct ceph_osd_request *req;
3147  	int ret;
3148  
3149  	verify_osdc_wrlocked(osdc);
3150  	mutex_lock(&lreq->lock);
3151  	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3152  
3153  	if (lreq->reg_req) {
3154  		if (lreq->reg_req->r_osd)
3155  			cancel_linger_request(lreq->reg_req);
3156  		ceph_osdc_put_request(lreq->reg_req);
3157  	}
3158  
3159  	req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3160  	BUG_ON(!req);
3161  
3162  	target_copy(&req->r_t, &lreq->t);
3163  	req->r_mtime = lreq->mtime;
3164  
3165  	if (lreq->is_watch && lreq->committed) {
3166  		osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_RECONNECT,
3167  				      lreq->linger_id, ++lreq->register_gen);
3168  		dout("lreq %p reconnect register_gen %u\n", lreq,
3169  		     req->r_ops[0].watch.gen);
3170  		req->r_callback = linger_reconnect_cb;
3171  	} else {
3172  		if (lreq->is_watch) {
3173  			osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_WATCH,
3174  					      lreq->linger_id, 0);
3175  		} else {
3176  			lreq->notify_id = 0;
3177  
3178  			refcount_inc(&lreq->request_pl->refcnt);
3179  			osd_req_op_notify_init(req, 0, lreq->linger_id,
3180  					       lreq->request_pl);
3181  			ceph_osd_data_pages_init(
3182  			    osd_req_op_data(req, 0, notify, response_data),
3183  			    lreq->notify_id_pages, PAGE_SIZE, 0, false, false);
3184  		}
3185  		dout("lreq %p register\n", lreq);
3186  		req->r_callback = linger_commit_cb;
3187  	}
3188  
3189  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3190  	BUG_ON(ret);
3191  
3192  	req->r_priv = linger_get(lreq);
3193  	req->r_linger = true;
3194  	lreq->reg_req = req;
3195  	mutex_unlock(&lreq->lock);
3196  
3197  	submit_request(req, true);
3198  }
3199  
linger_ping_cb(struct ceph_osd_request * req)3200  static void linger_ping_cb(struct ceph_osd_request *req)
3201  {
3202  	struct ceph_osd_linger_request *lreq = req->r_priv;
3203  
3204  	mutex_lock(&lreq->lock);
3205  	if (req != lreq->ping_req) {
3206  		dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3207  		     __func__, lreq, lreq->linger_id, req, lreq->ping_req);
3208  		goto out;
3209  	}
3210  
3211  	dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
3212  	     __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
3213  	     lreq->last_error);
3214  	if (lreq->register_gen == req->r_ops[0].watch.gen) {
3215  		if (!req->r_result) {
3216  			lreq->watch_valid_thru = lreq->ping_sent;
3217  		} else if (!lreq->last_error) {
3218  			lreq->last_error = normalize_watch_error(req->r_result);
3219  			queue_watch_error(lreq);
3220  		}
3221  	} else {
3222  		dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
3223  		     lreq->register_gen, req->r_ops[0].watch.gen);
3224  	}
3225  
3226  out:
3227  	mutex_unlock(&lreq->lock);
3228  	linger_put(lreq);
3229  }
3230  
send_linger_ping(struct ceph_osd_linger_request * lreq)3231  static void send_linger_ping(struct ceph_osd_linger_request *lreq)
3232  {
3233  	struct ceph_osd_client *osdc = lreq->osdc;
3234  	struct ceph_osd_request *req;
3235  	int ret;
3236  
3237  	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
3238  		dout("%s PAUSERD\n", __func__);
3239  		return;
3240  	}
3241  
3242  	lreq->ping_sent = jiffies;
3243  	dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
3244  	     __func__, lreq, lreq->linger_id, lreq->ping_sent,
3245  	     lreq->register_gen);
3246  
3247  	if (lreq->ping_req) {
3248  		if (lreq->ping_req->r_osd)
3249  			cancel_linger_request(lreq->ping_req);
3250  		ceph_osdc_put_request(lreq->ping_req);
3251  	}
3252  
3253  	req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3254  	BUG_ON(!req);
3255  
3256  	target_copy(&req->r_t, &lreq->t);
3257  	osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_PING, lreq->linger_id,
3258  			      lreq->register_gen);
3259  	req->r_callback = linger_ping_cb;
3260  
3261  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3262  	BUG_ON(ret);
3263  
3264  	req->r_priv = linger_get(lreq);
3265  	req->r_linger = true;
3266  	lreq->ping_req = req;
3267  
3268  	ceph_osdc_get_request(req);
3269  	account_request(req);
3270  	req->r_tid = atomic64_inc_return(&osdc->last_tid);
3271  	link_request(lreq->osd, req);
3272  	send_request(req);
3273  }
3274  
linger_submit(struct ceph_osd_linger_request * lreq)3275  static void linger_submit(struct ceph_osd_linger_request *lreq)
3276  {
3277  	struct ceph_osd_client *osdc = lreq->osdc;
3278  	struct ceph_osd *osd;
3279  
3280  	down_write(&osdc->lock);
3281  	linger_register(lreq);
3282  
3283  	calc_target(osdc, &lreq->t, false);
3284  	osd = lookup_create_osd(osdc, lreq->t.osd, true);
3285  	link_linger(osd, lreq);
3286  
3287  	send_linger(lreq);
3288  	up_write(&osdc->lock);
3289  }
3290  
cancel_linger_map_check(struct ceph_osd_linger_request * lreq)3291  static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
3292  {
3293  	struct ceph_osd_client *osdc = lreq->osdc;
3294  	struct ceph_osd_linger_request *lookup_lreq;
3295  
3296  	verify_osdc_wrlocked(osdc);
3297  
3298  	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3299  				       lreq->linger_id);
3300  	if (!lookup_lreq)
3301  		return;
3302  
3303  	WARN_ON(lookup_lreq != lreq);
3304  	erase_linger_mc(&osdc->linger_map_checks, lreq);
3305  	linger_put(lreq);
3306  }
3307  
3308  /*
3309   * @lreq has to be both registered and linked.
3310   */
__linger_cancel(struct ceph_osd_linger_request * lreq)3311  static void __linger_cancel(struct ceph_osd_linger_request *lreq)
3312  {
3313  	if (lreq->ping_req && lreq->ping_req->r_osd)
3314  		cancel_linger_request(lreq->ping_req);
3315  	if (lreq->reg_req && lreq->reg_req->r_osd)
3316  		cancel_linger_request(lreq->reg_req);
3317  	cancel_linger_map_check(lreq);
3318  	unlink_linger(lreq->osd, lreq);
3319  	linger_unregister(lreq);
3320  }
3321  
linger_cancel(struct ceph_osd_linger_request * lreq)3322  static void linger_cancel(struct ceph_osd_linger_request *lreq)
3323  {
3324  	struct ceph_osd_client *osdc = lreq->osdc;
3325  
3326  	down_write(&osdc->lock);
3327  	if (__linger_registered(lreq))
3328  		__linger_cancel(lreq);
3329  	up_write(&osdc->lock);
3330  }
3331  
3332  static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
3333  
check_linger_pool_dne(struct ceph_osd_linger_request * lreq)3334  static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3335  {
3336  	struct ceph_osd_client *osdc = lreq->osdc;
3337  	struct ceph_osdmap *map = osdc->osdmap;
3338  
3339  	verify_osdc_wrlocked(osdc);
3340  	WARN_ON(!map->epoch);
3341  
3342  	if (lreq->register_gen) {
3343  		lreq->map_dne_bound = map->epoch;
3344  		dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3345  		     lreq, lreq->linger_id);
3346  	} else {
3347  		dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3348  		     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3349  		     map->epoch);
3350  	}
3351  
3352  	if (lreq->map_dne_bound) {
3353  		if (map->epoch >= lreq->map_dne_bound) {
3354  			/* we had a new enough map */
3355  			pr_info("linger_id %llu pool does not exist\n",
3356  				lreq->linger_id);
3357  			linger_reg_commit_complete(lreq, -ENOENT);
3358  			__linger_cancel(lreq);
3359  		}
3360  	} else {
3361  		send_linger_map_check(lreq);
3362  	}
3363  }
3364  
linger_map_check_cb(struct ceph_mon_generic_request * greq)3365  static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3366  {
3367  	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3368  	struct ceph_osd_linger_request *lreq;
3369  	u64 linger_id = greq->private_data;
3370  
3371  	WARN_ON(greq->result || !greq->u.newest);
3372  
3373  	down_write(&osdc->lock);
3374  	lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3375  	if (!lreq) {
3376  		dout("%s linger_id %llu dne\n", __func__, linger_id);
3377  		goto out_unlock;
3378  	}
3379  
3380  	dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3381  	     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3382  	     greq->u.newest);
3383  	if (!lreq->map_dne_bound)
3384  		lreq->map_dne_bound = greq->u.newest;
3385  	erase_linger_mc(&osdc->linger_map_checks, lreq);
3386  	check_linger_pool_dne(lreq);
3387  
3388  	linger_put(lreq);
3389  out_unlock:
3390  	up_write(&osdc->lock);
3391  }
3392  
send_linger_map_check(struct ceph_osd_linger_request * lreq)3393  static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3394  {
3395  	struct ceph_osd_client *osdc = lreq->osdc;
3396  	struct ceph_osd_linger_request *lookup_lreq;
3397  	int ret;
3398  
3399  	verify_osdc_wrlocked(osdc);
3400  
3401  	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3402  				       lreq->linger_id);
3403  	if (lookup_lreq) {
3404  		WARN_ON(lookup_lreq != lreq);
3405  		return;
3406  	}
3407  
3408  	linger_get(lreq);
3409  	insert_linger_mc(&osdc->linger_map_checks, lreq);
3410  	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3411  					  linger_map_check_cb, lreq->linger_id);
3412  	WARN_ON(ret);
3413  }
3414  
linger_reg_commit_wait(struct ceph_osd_linger_request * lreq)3415  static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3416  {
3417  	int ret;
3418  
3419  	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3420  	ret = wait_for_completion_killable(&lreq->reg_commit_wait);
3421  	return ret ?: lreq->reg_commit_error;
3422  }
3423  
linger_notify_finish_wait(struct ceph_osd_linger_request * lreq,unsigned long timeout)3424  static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq,
3425  				     unsigned long timeout)
3426  {
3427  	long left;
3428  
3429  	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3430  	left = wait_for_completion_killable_timeout(&lreq->notify_finish_wait,
3431  						ceph_timeout_jiffies(timeout));
3432  	if (left <= 0)
3433  		left = left ?: -ETIMEDOUT;
3434  	else
3435  		left = lreq->notify_finish_error; /* completed */
3436  
3437  	return left;
3438  }
3439  
3440  /*
3441   * Timeout callback, called every N seconds.  When 1 or more OSD
3442   * requests has been active for more than N seconds, we send a keepalive
3443   * (tag + timestamp) to its OSD to ensure any communications channel
3444   * reset is detected.
3445   */
handle_timeout(struct work_struct * work)3446  static void handle_timeout(struct work_struct *work)
3447  {
3448  	struct ceph_osd_client *osdc =
3449  		container_of(work, struct ceph_osd_client, timeout_work.work);
3450  	struct ceph_options *opts = osdc->client->options;
3451  	unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3452  	unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3453  	LIST_HEAD(slow_osds);
3454  	struct rb_node *n, *p;
3455  
3456  	dout("%s osdc %p\n", __func__, osdc);
3457  	down_write(&osdc->lock);
3458  
3459  	/*
3460  	 * ping osds that are a bit slow.  this ensures that if there
3461  	 * is a break in the TCP connection we will notice, and reopen
3462  	 * a connection with that osd (from the fault callback).
3463  	 */
3464  	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3465  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3466  		bool found = false;
3467  
3468  		for (p = rb_first(&osd->o_requests); p; ) {
3469  			struct ceph_osd_request *req =
3470  			    rb_entry(p, struct ceph_osd_request, r_node);
3471  
3472  			p = rb_next(p); /* abort_request() */
3473  
3474  			if (time_before(req->r_stamp, cutoff)) {
3475  				dout(" req %p tid %llu on osd%d is laggy\n",
3476  				     req, req->r_tid, osd->o_osd);
3477  				found = true;
3478  			}
3479  			if (opts->osd_request_timeout &&
3480  			    time_before(req->r_start_stamp, expiry_cutoff)) {
3481  				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3482  				       req->r_tid, osd->o_osd);
3483  				abort_request(req, -ETIMEDOUT);
3484  			}
3485  		}
3486  		for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3487  			struct ceph_osd_linger_request *lreq =
3488  			    rb_entry(p, struct ceph_osd_linger_request, node);
3489  
3490  			dout(" lreq %p linger_id %llu is served by osd%d\n",
3491  			     lreq, lreq->linger_id, osd->o_osd);
3492  			found = true;
3493  
3494  			mutex_lock(&lreq->lock);
3495  			if (lreq->is_watch && lreq->committed && !lreq->last_error)
3496  				send_linger_ping(lreq);
3497  			mutex_unlock(&lreq->lock);
3498  		}
3499  
3500  		if (found)
3501  			list_move_tail(&osd->o_keepalive_item, &slow_osds);
3502  	}
3503  
3504  	if (opts->osd_request_timeout) {
3505  		for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3506  			struct ceph_osd_request *req =
3507  			    rb_entry(p, struct ceph_osd_request, r_node);
3508  
3509  			p = rb_next(p); /* abort_request() */
3510  
3511  			if (time_before(req->r_start_stamp, expiry_cutoff)) {
3512  				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3513  				       req->r_tid, osdc->homeless_osd.o_osd);
3514  				abort_request(req, -ETIMEDOUT);
3515  			}
3516  		}
3517  	}
3518  
3519  	if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3520  		maybe_request_map(osdc);
3521  
3522  	while (!list_empty(&slow_osds)) {
3523  		struct ceph_osd *osd = list_first_entry(&slow_osds,
3524  							struct ceph_osd,
3525  							o_keepalive_item);
3526  		list_del_init(&osd->o_keepalive_item);
3527  		ceph_con_keepalive(&osd->o_con);
3528  	}
3529  
3530  	up_write(&osdc->lock);
3531  	schedule_delayed_work(&osdc->timeout_work,
3532  			      osdc->client->options->osd_keepalive_timeout);
3533  }
3534  
handle_osds_timeout(struct work_struct * work)3535  static void handle_osds_timeout(struct work_struct *work)
3536  {
3537  	struct ceph_osd_client *osdc =
3538  		container_of(work, struct ceph_osd_client,
3539  			     osds_timeout_work.work);
3540  	unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3541  	struct ceph_osd *osd, *nosd;
3542  
3543  	dout("%s osdc %p\n", __func__, osdc);
3544  	down_write(&osdc->lock);
3545  	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3546  		if (time_before(jiffies, osd->lru_ttl))
3547  			break;
3548  
3549  		WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3550  		WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3551  		close_osd(osd);
3552  	}
3553  
3554  	up_write(&osdc->lock);
3555  	schedule_delayed_work(&osdc->osds_timeout_work,
3556  			      round_jiffies_relative(delay));
3557  }
3558  
ceph_oloc_decode(void ** p,void * end,struct ceph_object_locator * oloc)3559  static int ceph_oloc_decode(void **p, void *end,
3560  			    struct ceph_object_locator *oloc)
3561  {
3562  	u8 struct_v, struct_cv;
3563  	u32 len;
3564  	void *struct_end;
3565  	int ret = 0;
3566  
3567  	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3568  	struct_v = ceph_decode_8(p);
3569  	struct_cv = ceph_decode_8(p);
3570  	if (struct_v < 3) {
3571  		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3572  			struct_v, struct_cv);
3573  		goto e_inval;
3574  	}
3575  	if (struct_cv > 6) {
3576  		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3577  			struct_v, struct_cv);
3578  		goto e_inval;
3579  	}
3580  	len = ceph_decode_32(p);
3581  	ceph_decode_need(p, end, len, e_inval);
3582  	struct_end = *p + len;
3583  
3584  	oloc->pool = ceph_decode_64(p);
3585  	*p += 4; /* skip preferred */
3586  
3587  	len = ceph_decode_32(p);
3588  	if (len > 0) {
3589  		pr_warn("ceph_object_locator::key is set\n");
3590  		goto e_inval;
3591  	}
3592  
3593  	if (struct_v >= 5) {
3594  		bool changed = false;
3595  
3596  		len = ceph_decode_32(p);
3597  		if (len > 0) {
3598  			ceph_decode_need(p, end, len, e_inval);
3599  			if (!oloc->pool_ns ||
3600  			    ceph_compare_string(oloc->pool_ns, *p, len))
3601  				changed = true;
3602  			*p += len;
3603  		} else {
3604  			if (oloc->pool_ns)
3605  				changed = true;
3606  		}
3607  		if (changed) {
3608  			/* redirect changes namespace */
3609  			pr_warn("ceph_object_locator::nspace is changed\n");
3610  			goto e_inval;
3611  		}
3612  	}
3613  
3614  	if (struct_v >= 6) {
3615  		s64 hash = ceph_decode_64(p);
3616  		if (hash != -1) {
3617  			pr_warn("ceph_object_locator::hash is set\n");
3618  			goto e_inval;
3619  		}
3620  	}
3621  
3622  	/* skip the rest */
3623  	*p = struct_end;
3624  out:
3625  	return ret;
3626  
3627  e_inval:
3628  	ret = -EINVAL;
3629  	goto out;
3630  }
3631  
ceph_redirect_decode(void ** p,void * end,struct ceph_request_redirect * redir)3632  static int ceph_redirect_decode(void **p, void *end,
3633  				struct ceph_request_redirect *redir)
3634  {
3635  	u8 struct_v, struct_cv;
3636  	u32 len;
3637  	void *struct_end;
3638  	int ret;
3639  
3640  	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3641  	struct_v = ceph_decode_8(p);
3642  	struct_cv = ceph_decode_8(p);
3643  	if (struct_cv > 1) {
3644  		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3645  			struct_v, struct_cv);
3646  		goto e_inval;
3647  	}
3648  	len = ceph_decode_32(p);
3649  	ceph_decode_need(p, end, len, e_inval);
3650  	struct_end = *p + len;
3651  
3652  	ret = ceph_oloc_decode(p, end, &redir->oloc);
3653  	if (ret)
3654  		goto out;
3655  
3656  	len = ceph_decode_32(p);
3657  	if (len > 0) {
3658  		pr_warn("ceph_request_redirect::object_name is set\n");
3659  		goto e_inval;
3660  	}
3661  
3662  	/* skip the rest */
3663  	*p = struct_end;
3664  out:
3665  	return ret;
3666  
3667  e_inval:
3668  	ret = -EINVAL;
3669  	goto out;
3670  }
3671  
3672  struct MOSDOpReply {
3673  	struct ceph_pg pgid;
3674  	u64 flags;
3675  	int result;
3676  	u32 epoch;
3677  	int num_ops;
3678  	u32 outdata_len[CEPH_OSD_MAX_OPS];
3679  	s32 rval[CEPH_OSD_MAX_OPS];
3680  	int retry_attempt;
3681  	struct ceph_eversion replay_version;
3682  	u64 user_version;
3683  	struct ceph_request_redirect redirect;
3684  };
3685  
decode_MOSDOpReply(const struct ceph_msg * msg,struct MOSDOpReply * m)3686  static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3687  {
3688  	void *p = msg->front.iov_base;
3689  	void *const end = p + msg->front.iov_len;
3690  	u16 version = le16_to_cpu(msg->hdr.version);
3691  	struct ceph_eversion bad_replay_version;
3692  	u8 decode_redir;
3693  	u32 len;
3694  	int ret;
3695  	int i;
3696  
3697  	ceph_decode_32_safe(&p, end, len, e_inval);
3698  	ceph_decode_need(&p, end, len, e_inval);
3699  	p += len; /* skip oid */
3700  
3701  	ret = ceph_decode_pgid(&p, end, &m->pgid);
3702  	if (ret)
3703  		return ret;
3704  
3705  	ceph_decode_64_safe(&p, end, m->flags, e_inval);
3706  	ceph_decode_32_safe(&p, end, m->result, e_inval);
3707  	ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3708  	memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3709  	p += sizeof(bad_replay_version);
3710  	ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3711  
3712  	ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3713  	if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3714  		goto e_inval;
3715  
3716  	ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3717  			 e_inval);
3718  	for (i = 0; i < m->num_ops; i++) {
3719  		struct ceph_osd_op *op = p;
3720  
3721  		m->outdata_len[i] = le32_to_cpu(op->payload_len);
3722  		p += sizeof(*op);
3723  	}
3724  
3725  	ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3726  	for (i = 0; i < m->num_ops; i++)
3727  		ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3728  
3729  	if (version >= 5) {
3730  		ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3731  		memcpy(&m->replay_version, p, sizeof(m->replay_version));
3732  		p += sizeof(m->replay_version);
3733  		ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3734  	} else {
3735  		m->replay_version = bad_replay_version; /* struct */
3736  		m->user_version = le64_to_cpu(m->replay_version.version);
3737  	}
3738  
3739  	if (version >= 6) {
3740  		if (version >= 7)
3741  			ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3742  		else
3743  			decode_redir = 1;
3744  	} else {
3745  		decode_redir = 0;
3746  	}
3747  
3748  	if (decode_redir) {
3749  		ret = ceph_redirect_decode(&p, end, &m->redirect);
3750  		if (ret)
3751  			return ret;
3752  	} else {
3753  		ceph_oloc_init(&m->redirect.oloc);
3754  	}
3755  
3756  	return 0;
3757  
3758  e_inval:
3759  	return -EINVAL;
3760  }
3761  
3762  /*
3763   * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3764   * specified.
3765   */
handle_reply(struct ceph_osd * osd,struct ceph_msg * msg)3766  static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3767  {
3768  	struct ceph_osd_client *osdc = osd->o_osdc;
3769  	struct ceph_osd_request *req;
3770  	struct MOSDOpReply m;
3771  	u64 tid = le64_to_cpu(msg->hdr.tid);
3772  	u32 data_len = 0;
3773  	int ret;
3774  	int i;
3775  
3776  	dout("%s msg %p tid %llu\n", __func__, msg, tid);
3777  
3778  	down_read(&osdc->lock);
3779  	if (!osd_registered(osd)) {
3780  		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3781  		goto out_unlock_osdc;
3782  	}
3783  	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3784  
3785  	mutex_lock(&osd->lock);
3786  	req = lookup_request(&osd->o_requests, tid);
3787  	if (!req) {
3788  		dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3789  		goto out_unlock_session;
3790  	}
3791  
3792  	m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3793  	ret = decode_MOSDOpReply(msg, &m);
3794  	m.redirect.oloc.pool_ns = NULL;
3795  	if (ret) {
3796  		pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3797  		       req->r_tid, ret);
3798  		ceph_msg_dump(msg);
3799  		goto fail_request;
3800  	}
3801  	dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3802  	     __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3803  	     m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3804  	     le64_to_cpu(m.replay_version.version), m.user_version);
3805  
3806  	if (m.retry_attempt >= 0) {
3807  		if (m.retry_attempt != req->r_attempts - 1) {
3808  			dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3809  			     req, req->r_tid, m.retry_attempt,
3810  			     req->r_attempts - 1);
3811  			goto out_unlock_session;
3812  		}
3813  	} else {
3814  		WARN_ON(1); /* MOSDOpReply v4 is assumed */
3815  	}
3816  
3817  	if (!ceph_oloc_empty(&m.redirect.oloc)) {
3818  		dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3819  		     m.redirect.oloc.pool);
3820  		unlink_request(osd, req);
3821  		mutex_unlock(&osd->lock);
3822  
3823  		/*
3824  		 * Not ceph_oloc_copy() - changing pool_ns is not
3825  		 * supported.
3826  		 */
3827  		req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3828  		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED |
3829  				CEPH_OSD_FLAG_IGNORE_OVERLAY |
3830  				CEPH_OSD_FLAG_IGNORE_CACHE;
3831  		req->r_tid = 0;
3832  		__submit_request(req, false);
3833  		goto out_unlock_osdc;
3834  	}
3835  
3836  	if (m.result == -EAGAIN) {
3837  		dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
3838  		unlink_request(osd, req);
3839  		mutex_unlock(&osd->lock);
3840  
3841  		/*
3842  		 * The object is missing on the replica or not (yet)
3843  		 * readable.  Clear pgid to force a resend to the primary
3844  		 * via legacy_change.
3845  		 */
3846  		req->r_t.pgid.pool = 0;
3847  		req->r_t.pgid.seed = 0;
3848  		WARN_ON(!req->r_t.used_replica);
3849  		req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3850  				  CEPH_OSD_FLAG_LOCALIZE_READS);
3851  		req->r_tid = 0;
3852  		__submit_request(req, false);
3853  		goto out_unlock_osdc;
3854  	}
3855  
3856  	if (m.num_ops != req->r_num_ops) {
3857  		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3858  		       req->r_num_ops, req->r_tid);
3859  		goto fail_request;
3860  	}
3861  	for (i = 0; i < req->r_num_ops; i++) {
3862  		dout(" req %p tid %llu op %d rval %d len %u\n", req,
3863  		     req->r_tid, i, m.rval[i], m.outdata_len[i]);
3864  		req->r_ops[i].rval = m.rval[i];
3865  		req->r_ops[i].outdata_len = m.outdata_len[i];
3866  		data_len += m.outdata_len[i];
3867  	}
3868  	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3869  		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3870  		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
3871  		goto fail_request;
3872  	}
3873  	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3874  	     req, req->r_tid, m.result, data_len);
3875  
3876  	/*
3877  	 * Since we only ever request ONDISK, we should only ever get
3878  	 * one (type of) reply back.
3879  	 */
3880  	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3881  	req->r_version = m.user_version;
3882  	req->r_result = m.result ?: data_len;
3883  	finish_request(req);
3884  	mutex_unlock(&osd->lock);
3885  	up_read(&osdc->lock);
3886  
3887  	__complete_request(req);
3888  	return;
3889  
3890  fail_request:
3891  	complete_request(req, -EIO);
3892  out_unlock_session:
3893  	mutex_unlock(&osd->lock);
3894  out_unlock_osdc:
3895  	up_read(&osdc->lock);
3896  }
3897  
set_pool_was_full(struct ceph_osd_client * osdc)3898  static void set_pool_was_full(struct ceph_osd_client *osdc)
3899  {
3900  	struct rb_node *n;
3901  
3902  	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3903  		struct ceph_pg_pool_info *pi =
3904  		    rb_entry(n, struct ceph_pg_pool_info, node);
3905  
3906  		pi->was_full = __pool_full(pi);
3907  	}
3908  }
3909  
pool_cleared_full(struct ceph_osd_client * osdc,s64 pool_id)3910  static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3911  {
3912  	struct ceph_pg_pool_info *pi;
3913  
3914  	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3915  	if (!pi)
3916  		return false;
3917  
3918  	return pi->was_full && !__pool_full(pi);
3919  }
3920  
3921  static enum calc_target_result
recalc_linger_target(struct ceph_osd_linger_request * lreq)3922  recalc_linger_target(struct ceph_osd_linger_request *lreq)
3923  {
3924  	struct ceph_osd_client *osdc = lreq->osdc;
3925  	enum calc_target_result ct_res;
3926  
3927  	ct_res = calc_target(osdc, &lreq->t, true);
3928  	if (ct_res == CALC_TARGET_NEED_RESEND) {
3929  		struct ceph_osd *osd;
3930  
3931  		osd = lookup_create_osd(osdc, lreq->t.osd, true);
3932  		if (osd != lreq->osd) {
3933  			unlink_linger(lreq->osd, lreq);
3934  			link_linger(osd, lreq);
3935  		}
3936  	}
3937  
3938  	return ct_res;
3939  }
3940  
3941  /*
3942   * Requeue requests whose mapping to an OSD has changed.
3943   */
scan_requests(struct ceph_osd * osd,bool force_resend,bool cleared_full,bool check_pool_cleared_full,struct rb_root * need_resend,struct list_head * need_resend_linger)3944  static void scan_requests(struct ceph_osd *osd,
3945  			  bool force_resend,
3946  			  bool cleared_full,
3947  			  bool check_pool_cleared_full,
3948  			  struct rb_root *need_resend,
3949  			  struct list_head *need_resend_linger)
3950  {
3951  	struct ceph_osd_client *osdc = osd->o_osdc;
3952  	struct rb_node *n;
3953  	bool force_resend_writes;
3954  
3955  	for (n = rb_first(&osd->o_linger_requests); n; ) {
3956  		struct ceph_osd_linger_request *lreq =
3957  		    rb_entry(n, struct ceph_osd_linger_request, node);
3958  		enum calc_target_result ct_res;
3959  
3960  		n = rb_next(n); /* recalc_linger_target() */
3961  
3962  		dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3963  		     lreq->linger_id);
3964  		ct_res = recalc_linger_target(lreq);
3965  		switch (ct_res) {
3966  		case CALC_TARGET_NO_ACTION:
3967  			force_resend_writes = cleared_full ||
3968  			    (check_pool_cleared_full &&
3969  			     pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3970  			if (!force_resend && !force_resend_writes)
3971  				break;
3972  
3973  			fallthrough;
3974  		case CALC_TARGET_NEED_RESEND:
3975  			cancel_linger_map_check(lreq);
3976  			/*
3977  			 * scan_requests() for the previous epoch(s)
3978  			 * may have already added it to the list, since
3979  			 * it's not unlinked here.
3980  			 */
3981  			if (list_empty(&lreq->scan_item))
3982  				list_add_tail(&lreq->scan_item, need_resend_linger);
3983  			break;
3984  		case CALC_TARGET_POOL_DNE:
3985  			list_del_init(&lreq->scan_item);
3986  			check_linger_pool_dne(lreq);
3987  			break;
3988  		}
3989  	}
3990  
3991  	for (n = rb_first(&osd->o_requests); n; ) {
3992  		struct ceph_osd_request *req =
3993  		    rb_entry(n, struct ceph_osd_request, r_node);
3994  		enum calc_target_result ct_res;
3995  
3996  		n = rb_next(n); /* unlink_request(), check_pool_dne() */
3997  
3998  		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3999  		ct_res = calc_target(osdc, &req->r_t, false);
4000  		switch (ct_res) {
4001  		case CALC_TARGET_NO_ACTION:
4002  			force_resend_writes = cleared_full ||
4003  			    (check_pool_cleared_full &&
4004  			     pool_cleared_full(osdc, req->r_t.base_oloc.pool));
4005  			if (!force_resend &&
4006  			    (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
4007  			     !force_resend_writes))
4008  				break;
4009  
4010  			fallthrough;
4011  		case CALC_TARGET_NEED_RESEND:
4012  			cancel_map_check(req);
4013  			unlink_request(osd, req);
4014  			insert_request(need_resend, req);
4015  			break;
4016  		case CALC_TARGET_POOL_DNE:
4017  			check_pool_dne(req);
4018  			break;
4019  		}
4020  	}
4021  }
4022  
handle_one_map(struct ceph_osd_client * osdc,void * p,void * end,bool incremental,struct rb_root * need_resend,struct list_head * need_resend_linger)4023  static int handle_one_map(struct ceph_osd_client *osdc,
4024  			  void *p, void *end, bool incremental,
4025  			  struct rb_root *need_resend,
4026  			  struct list_head *need_resend_linger)
4027  {
4028  	struct ceph_osdmap *newmap;
4029  	struct rb_node *n;
4030  	bool skipped_map = false;
4031  	bool was_full;
4032  
4033  	was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4034  	set_pool_was_full(osdc);
4035  
4036  	if (incremental)
4037  		newmap = osdmap_apply_incremental(&p, end,
4038  						  ceph_msgr2(osdc->client),
4039  						  osdc->osdmap);
4040  	else
4041  		newmap = ceph_osdmap_decode(&p, end, ceph_msgr2(osdc->client));
4042  	if (IS_ERR(newmap))
4043  		return PTR_ERR(newmap);
4044  
4045  	if (newmap != osdc->osdmap) {
4046  		/*
4047  		 * Preserve ->was_full before destroying the old map.
4048  		 * For pools that weren't in the old map, ->was_full
4049  		 * should be false.
4050  		 */
4051  		for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
4052  			struct ceph_pg_pool_info *pi =
4053  			    rb_entry(n, struct ceph_pg_pool_info, node);
4054  			struct ceph_pg_pool_info *old_pi;
4055  
4056  			old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
4057  			if (old_pi)
4058  				pi->was_full = old_pi->was_full;
4059  			else
4060  				WARN_ON(pi->was_full);
4061  		}
4062  
4063  		if (osdc->osdmap->epoch &&
4064  		    osdc->osdmap->epoch + 1 < newmap->epoch) {
4065  			WARN_ON(incremental);
4066  			skipped_map = true;
4067  		}
4068  
4069  		ceph_osdmap_destroy(osdc->osdmap);
4070  		osdc->osdmap = newmap;
4071  	}
4072  
4073  	was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4074  	scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
4075  		      need_resend, need_resend_linger);
4076  
4077  	for (n = rb_first(&osdc->osds); n; ) {
4078  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4079  
4080  		n = rb_next(n); /* close_osd() */
4081  
4082  		scan_requests(osd, skipped_map, was_full, true, need_resend,
4083  			      need_resend_linger);
4084  		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
4085  		    memcmp(&osd->o_con.peer_addr,
4086  			   ceph_osd_addr(osdc->osdmap, osd->o_osd),
4087  			   sizeof(struct ceph_entity_addr)))
4088  			close_osd(osd);
4089  	}
4090  
4091  	return 0;
4092  }
4093  
kick_requests(struct ceph_osd_client * osdc,struct rb_root * need_resend,struct list_head * need_resend_linger)4094  static void kick_requests(struct ceph_osd_client *osdc,
4095  			  struct rb_root *need_resend,
4096  			  struct list_head *need_resend_linger)
4097  {
4098  	struct ceph_osd_linger_request *lreq, *nlreq;
4099  	enum calc_target_result ct_res;
4100  	struct rb_node *n;
4101  
4102  	/* make sure need_resend targets reflect latest map */
4103  	for (n = rb_first(need_resend); n; ) {
4104  		struct ceph_osd_request *req =
4105  		    rb_entry(n, struct ceph_osd_request, r_node);
4106  
4107  		n = rb_next(n);
4108  
4109  		if (req->r_t.epoch < osdc->osdmap->epoch) {
4110  			ct_res = calc_target(osdc, &req->r_t, false);
4111  			if (ct_res == CALC_TARGET_POOL_DNE) {
4112  				erase_request(need_resend, req);
4113  				check_pool_dne(req);
4114  			}
4115  		}
4116  	}
4117  
4118  	for (n = rb_first(need_resend); n; ) {
4119  		struct ceph_osd_request *req =
4120  		    rb_entry(n, struct ceph_osd_request, r_node);
4121  		struct ceph_osd *osd;
4122  
4123  		n = rb_next(n);
4124  		erase_request(need_resend, req); /* before link_request() */
4125  
4126  		osd = lookup_create_osd(osdc, req->r_t.osd, true);
4127  		link_request(osd, req);
4128  		if (!req->r_linger) {
4129  			if (!osd_homeless(osd) && !req->r_t.paused)
4130  				send_request(req);
4131  		} else {
4132  			cancel_linger_request(req);
4133  		}
4134  	}
4135  
4136  	list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
4137  		if (!osd_homeless(lreq->osd))
4138  			send_linger(lreq);
4139  
4140  		list_del_init(&lreq->scan_item);
4141  	}
4142  }
4143  
4144  /*
4145   * Process updated osd map.
4146   *
4147   * The message contains any number of incremental and full maps, normally
4148   * indicating some sort of topology change in the cluster.  Kick requests
4149   * off to different OSDs as needed.
4150   */
ceph_osdc_handle_map(struct ceph_osd_client * osdc,struct ceph_msg * msg)4151  void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
4152  {
4153  	void *p = msg->front.iov_base;
4154  	void *const end = p + msg->front.iov_len;
4155  	u32 nr_maps, maplen;
4156  	u32 epoch;
4157  	struct ceph_fsid fsid;
4158  	struct rb_root need_resend = RB_ROOT;
4159  	LIST_HEAD(need_resend_linger);
4160  	bool handled_incremental = false;
4161  	bool was_pauserd, was_pausewr;
4162  	bool pauserd, pausewr;
4163  	int err;
4164  
4165  	dout("%s have %u\n", __func__, osdc->osdmap->epoch);
4166  	down_write(&osdc->lock);
4167  
4168  	/* verify fsid */
4169  	ceph_decode_need(&p, end, sizeof(fsid), bad);
4170  	ceph_decode_copy(&p, &fsid, sizeof(fsid));
4171  	if (ceph_check_fsid(osdc->client, &fsid) < 0)
4172  		goto bad;
4173  
4174  	was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4175  	was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4176  		      ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4177  		      have_pool_full(osdc);
4178  
4179  	/* incremental maps */
4180  	ceph_decode_32_safe(&p, end, nr_maps, bad);
4181  	dout(" %d inc maps\n", nr_maps);
4182  	while (nr_maps > 0) {
4183  		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4184  		epoch = ceph_decode_32(&p);
4185  		maplen = ceph_decode_32(&p);
4186  		ceph_decode_need(&p, end, maplen, bad);
4187  		if (osdc->osdmap->epoch &&
4188  		    osdc->osdmap->epoch + 1 == epoch) {
4189  			dout("applying incremental map %u len %d\n",
4190  			     epoch, maplen);
4191  			err = handle_one_map(osdc, p, p + maplen, true,
4192  					     &need_resend, &need_resend_linger);
4193  			if (err)
4194  				goto bad;
4195  			handled_incremental = true;
4196  		} else {
4197  			dout("ignoring incremental map %u len %d\n",
4198  			     epoch, maplen);
4199  		}
4200  		p += maplen;
4201  		nr_maps--;
4202  	}
4203  	if (handled_incremental)
4204  		goto done;
4205  
4206  	/* full maps */
4207  	ceph_decode_32_safe(&p, end, nr_maps, bad);
4208  	dout(" %d full maps\n", nr_maps);
4209  	while (nr_maps) {
4210  		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4211  		epoch = ceph_decode_32(&p);
4212  		maplen = ceph_decode_32(&p);
4213  		ceph_decode_need(&p, end, maplen, bad);
4214  		if (nr_maps > 1) {
4215  			dout("skipping non-latest full map %u len %d\n",
4216  			     epoch, maplen);
4217  		} else if (osdc->osdmap->epoch >= epoch) {
4218  			dout("skipping full map %u len %d, "
4219  			     "older than our %u\n", epoch, maplen,
4220  			     osdc->osdmap->epoch);
4221  		} else {
4222  			dout("taking full map %u len %d\n", epoch, maplen);
4223  			err = handle_one_map(osdc, p, p + maplen, false,
4224  					     &need_resend, &need_resend_linger);
4225  			if (err)
4226  				goto bad;
4227  		}
4228  		p += maplen;
4229  		nr_maps--;
4230  	}
4231  
4232  done:
4233  	/*
4234  	 * subscribe to subsequent osdmap updates if full to ensure
4235  	 * we find out when we are no longer full and stop returning
4236  	 * ENOSPC.
4237  	 */
4238  	pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4239  	pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4240  		  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4241  		  have_pool_full(osdc);
4242  	if (was_pauserd || was_pausewr || pauserd || pausewr ||
4243  	    osdc->osdmap->epoch < osdc->epoch_barrier)
4244  		maybe_request_map(osdc);
4245  
4246  	kick_requests(osdc, &need_resend, &need_resend_linger);
4247  
4248  	ceph_osdc_abort_on_full(osdc);
4249  	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
4250  			  osdc->osdmap->epoch);
4251  	up_write(&osdc->lock);
4252  	wake_up_all(&osdc->client->auth_wq);
4253  	return;
4254  
4255  bad:
4256  	pr_err("osdc handle_map corrupt msg\n");
4257  	ceph_msg_dump(msg);
4258  	up_write(&osdc->lock);
4259  }
4260  
4261  /*
4262   * Resubmit requests pending on the given osd.
4263   */
kick_osd_requests(struct ceph_osd * osd)4264  static void kick_osd_requests(struct ceph_osd *osd)
4265  {
4266  	struct rb_node *n;
4267  
4268  	clear_backoffs(osd);
4269  
4270  	for (n = rb_first(&osd->o_requests); n; ) {
4271  		struct ceph_osd_request *req =
4272  		    rb_entry(n, struct ceph_osd_request, r_node);
4273  
4274  		n = rb_next(n); /* cancel_linger_request() */
4275  
4276  		if (!req->r_linger) {
4277  			if (!req->r_t.paused)
4278  				send_request(req);
4279  		} else {
4280  			cancel_linger_request(req);
4281  		}
4282  	}
4283  	for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4284  		struct ceph_osd_linger_request *lreq =
4285  		    rb_entry(n, struct ceph_osd_linger_request, node);
4286  
4287  		send_linger(lreq);
4288  	}
4289  }
4290  
4291  /*
4292   * If the osd connection drops, we need to resubmit all requests.
4293   */
osd_fault(struct ceph_connection * con)4294  static void osd_fault(struct ceph_connection *con)
4295  {
4296  	struct ceph_osd *osd = con->private;
4297  	struct ceph_osd_client *osdc = osd->o_osdc;
4298  
4299  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4300  
4301  	down_write(&osdc->lock);
4302  	if (!osd_registered(osd)) {
4303  		dout("%s osd%d unknown\n", __func__, osd->o_osd);
4304  		goto out_unlock;
4305  	}
4306  
4307  	if (!reopen_osd(osd))
4308  		kick_osd_requests(osd);
4309  	maybe_request_map(osdc);
4310  
4311  out_unlock:
4312  	up_write(&osdc->lock);
4313  }
4314  
4315  struct MOSDBackoff {
4316  	struct ceph_spg spgid;
4317  	u32 map_epoch;
4318  	u8 op;
4319  	u64 id;
4320  	struct ceph_hobject_id *begin;
4321  	struct ceph_hobject_id *end;
4322  };
4323  
decode_MOSDBackoff(const struct ceph_msg * msg,struct MOSDBackoff * m)4324  static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
4325  {
4326  	void *p = msg->front.iov_base;
4327  	void *const end = p + msg->front.iov_len;
4328  	u8 struct_v;
4329  	u32 struct_len;
4330  	int ret;
4331  
4332  	ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
4333  	if (ret)
4334  		return ret;
4335  
4336  	ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
4337  	if (ret)
4338  		return ret;
4339  
4340  	ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
4341  	ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
4342  	ceph_decode_8_safe(&p, end, m->op, e_inval);
4343  	ceph_decode_64_safe(&p, end, m->id, e_inval);
4344  
4345  	m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
4346  	if (!m->begin)
4347  		return -ENOMEM;
4348  
4349  	ret = decode_hoid(&p, end, m->begin);
4350  	if (ret) {
4351  		free_hoid(m->begin);
4352  		return ret;
4353  	}
4354  
4355  	m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
4356  	if (!m->end) {
4357  		free_hoid(m->begin);
4358  		return -ENOMEM;
4359  	}
4360  
4361  	ret = decode_hoid(&p, end, m->end);
4362  	if (ret) {
4363  		free_hoid(m->begin);
4364  		free_hoid(m->end);
4365  		return ret;
4366  	}
4367  
4368  	return 0;
4369  
4370  e_inval:
4371  	return -EINVAL;
4372  }
4373  
create_backoff_message(const struct ceph_osd_backoff * backoff,u32 map_epoch)4374  static struct ceph_msg *create_backoff_message(
4375  				const struct ceph_osd_backoff *backoff,
4376  				u32 map_epoch)
4377  {
4378  	struct ceph_msg *msg;
4379  	void *p, *end;
4380  	int msg_size;
4381  
4382  	msg_size = CEPH_ENCODING_START_BLK_LEN +
4383  			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4384  	msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4385  	msg_size += CEPH_ENCODING_START_BLK_LEN +
4386  			hoid_encoding_size(backoff->begin);
4387  	msg_size += CEPH_ENCODING_START_BLK_LEN +
4388  			hoid_encoding_size(backoff->end);
4389  
4390  	msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4391  	if (!msg)
4392  		return NULL;
4393  
4394  	p = msg->front.iov_base;
4395  	end = p + msg->front_alloc_len;
4396  
4397  	encode_spgid(&p, &backoff->spgid);
4398  	ceph_encode_32(&p, map_epoch);
4399  	ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4400  	ceph_encode_64(&p, backoff->id);
4401  	encode_hoid(&p, end, backoff->begin);
4402  	encode_hoid(&p, end, backoff->end);
4403  	BUG_ON(p != end);
4404  
4405  	msg->front.iov_len = p - msg->front.iov_base;
4406  	msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4407  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4408  
4409  	return msg;
4410  }
4411  
handle_backoff_block(struct ceph_osd * osd,struct MOSDBackoff * m)4412  static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4413  {
4414  	struct ceph_spg_mapping *spg;
4415  	struct ceph_osd_backoff *backoff;
4416  	struct ceph_msg *msg;
4417  
4418  	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4419  	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4420  
4421  	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4422  	if (!spg) {
4423  		spg = alloc_spg_mapping();
4424  		if (!spg) {
4425  			pr_err("%s failed to allocate spg\n", __func__);
4426  			return;
4427  		}
4428  		spg->spgid = m->spgid; /* struct */
4429  		insert_spg_mapping(&osd->o_backoff_mappings, spg);
4430  	}
4431  
4432  	backoff = alloc_backoff();
4433  	if (!backoff) {
4434  		pr_err("%s failed to allocate backoff\n", __func__);
4435  		return;
4436  	}
4437  	backoff->spgid = m->spgid; /* struct */
4438  	backoff->id = m->id;
4439  	backoff->begin = m->begin;
4440  	m->begin = NULL; /* backoff now owns this */
4441  	backoff->end = m->end;
4442  	m->end = NULL;   /* ditto */
4443  
4444  	insert_backoff(&spg->backoffs, backoff);
4445  	insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4446  
4447  	/*
4448  	 * Ack with original backoff's epoch so that the OSD can
4449  	 * discard this if there was a PG split.
4450  	 */
4451  	msg = create_backoff_message(backoff, m->map_epoch);
4452  	if (!msg) {
4453  		pr_err("%s failed to allocate msg\n", __func__);
4454  		return;
4455  	}
4456  	ceph_con_send(&osd->o_con, msg);
4457  }
4458  
target_contained_by(const struct ceph_osd_request_target * t,const struct ceph_hobject_id * begin,const struct ceph_hobject_id * end)4459  static bool target_contained_by(const struct ceph_osd_request_target *t,
4460  				const struct ceph_hobject_id *begin,
4461  				const struct ceph_hobject_id *end)
4462  {
4463  	struct ceph_hobject_id hoid;
4464  	int cmp;
4465  
4466  	hoid_fill_from_target(&hoid, t);
4467  	cmp = hoid_compare(&hoid, begin);
4468  	return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4469  }
4470  
handle_backoff_unblock(struct ceph_osd * osd,const struct MOSDBackoff * m)4471  static void handle_backoff_unblock(struct ceph_osd *osd,
4472  				   const struct MOSDBackoff *m)
4473  {
4474  	struct ceph_spg_mapping *spg;
4475  	struct ceph_osd_backoff *backoff;
4476  	struct rb_node *n;
4477  
4478  	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4479  	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4480  
4481  	backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4482  	if (!backoff) {
4483  		pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4484  		       __func__, osd->o_osd, m->spgid.pgid.pool,
4485  		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4486  		return;
4487  	}
4488  
4489  	if (hoid_compare(backoff->begin, m->begin) &&
4490  	    hoid_compare(backoff->end, m->end)) {
4491  		pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4492  		       __func__, osd->o_osd, m->spgid.pgid.pool,
4493  		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4494  		/* unblock it anyway... */
4495  	}
4496  
4497  	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4498  	BUG_ON(!spg);
4499  
4500  	erase_backoff(&spg->backoffs, backoff);
4501  	erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4502  	free_backoff(backoff);
4503  
4504  	if (RB_EMPTY_ROOT(&spg->backoffs)) {
4505  		erase_spg_mapping(&osd->o_backoff_mappings, spg);
4506  		free_spg_mapping(spg);
4507  	}
4508  
4509  	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4510  		struct ceph_osd_request *req =
4511  		    rb_entry(n, struct ceph_osd_request, r_node);
4512  
4513  		if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4514  			/*
4515  			 * Match against @m, not @backoff -- the PG may
4516  			 * have split on the OSD.
4517  			 */
4518  			if (target_contained_by(&req->r_t, m->begin, m->end)) {
4519  				/*
4520  				 * If no other installed backoff applies,
4521  				 * resend.
4522  				 */
4523  				send_request(req);
4524  			}
4525  		}
4526  	}
4527  }
4528  
handle_backoff(struct ceph_osd * osd,struct ceph_msg * msg)4529  static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4530  {
4531  	struct ceph_osd_client *osdc = osd->o_osdc;
4532  	struct MOSDBackoff m;
4533  	int ret;
4534  
4535  	down_read(&osdc->lock);
4536  	if (!osd_registered(osd)) {
4537  		dout("%s osd%d unknown\n", __func__, osd->o_osd);
4538  		up_read(&osdc->lock);
4539  		return;
4540  	}
4541  	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4542  
4543  	mutex_lock(&osd->lock);
4544  	ret = decode_MOSDBackoff(msg, &m);
4545  	if (ret) {
4546  		pr_err("failed to decode MOSDBackoff: %d\n", ret);
4547  		ceph_msg_dump(msg);
4548  		goto out_unlock;
4549  	}
4550  
4551  	switch (m.op) {
4552  	case CEPH_OSD_BACKOFF_OP_BLOCK:
4553  		handle_backoff_block(osd, &m);
4554  		break;
4555  	case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4556  		handle_backoff_unblock(osd, &m);
4557  		break;
4558  	default:
4559  		pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4560  	}
4561  
4562  	free_hoid(m.begin);
4563  	free_hoid(m.end);
4564  
4565  out_unlock:
4566  	mutex_unlock(&osd->lock);
4567  	up_read(&osdc->lock);
4568  }
4569  
4570  /*
4571   * Process osd watch notifications
4572   */
handle_watch_notify(struct ceph_osd_client * osdc,struct ceph_msg * msg)4573  static void handle_watch_notify(struct ceph_osd_client *osdc,
4574  				struct ceph_msg *msg)
4575  {
4576  	void *p = msg->front.iov_base;
4577  	void *const end = p + msg->front.iov_len;
4578  	struct ceph_osd_linger_request *lreq;
4579  	struct linger_work *lwork;
4580  	u8 proto_ver, opcode;
4581  	u64 cookie, notify_id;
4582  	u64 notifier_id = 0;
4583  	s32 return_code = 0;
4584  	void *payload = NULL;
4585  	u32 payload_len = 0;
4586  
4587  	ceph_decode_8_safe(&p, end, proto_ver, bad);
4588  	ceph_decode_8_safe(&p, end, opcode, bad);
4589  	ceph_decode_64_safe(&p, end, cookie, bad);
4590  	p += 8; /* skip ver */
4591  	ceph_decode_64_safe(&p, end, notify_id, bad);
4592  
4593  	if (proto_ver >= 1) {
4594  		ceph_decode_32_safe(&p, end, payload_len, bad);
4595  		ceph_decode_need(&p, end, payload_len, bad);
4596  		payload = p;
4597  		p += payload_len;
4598  	}
4599  
4600  	if (le16_to_cpu(msg->hdr.version) >= 2)
4601  		ceph_decode_32_safe(&p, end, return_code, bad);
4602  
4603  	if (le16_to_cpu(msg->hdr.version) >= 3)
4604  		ceph_decode_64_safe(&p, end, notifier_id, bad);
4605  
4606  	down_read(&osdc->lock);
4607  	lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4608  	if (!lreq) {
4609  		dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4610  		     cookie);
4611  		goto out_unlock_osdc;
4612  	}
4613  
4614  	mutex_lock(&lreq->lock);
4615  	dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4616  	     opcode, cookie, lreq, lreq->is_watch);
4617  	if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4618  		if (!lreq->last_error) {
4619  			lreq->last_error = -ENOTCONN;
4620  			queue_watch_error(lreq);
4621  		}
4622  	} else if (!lreq->is_watch) {
4623  		/* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4624  		if (lreq->notify_id && lreq->notify_id != notify_id) {
4625  			dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4626  			     lreq->notify_id, notify_id);
4627  		} else if (!completion_done(&lreq->notify_finish_wait)) {
4628  			struct ceph_msg_data *data =
4629  			    msg->num_data_items ? &msg->data[0] : NULL;
4630  
4631  			if (data) {
4632  				if (lreq->preply_pages) {
4633  					WARN_ON(data->type !=
4634  							CEPH_MSG_DATA_PAGES);
4635  					*lreq->preply_pages = data->pages;
4636  					*lreq->preply_len = data->length;
4637  					data->own_pages = false;
4638  				}
4639  			}
4640  			lreq->notify_finish_error = return_code;
4641  			complete_all(&lreq->notify_finish_wait);
4642  		}
4643  	} else {
4644  		/* CEPH_WATCH_EVENT_NOTIFY */
4645  		lwork = lwork_alloc(lreq, do_watch_notify);
4646  		if (!lwork) {
4647  			pr_err("failed to allocate notify-lwork\n");
4648  			goto out_unlock_lreq;
4649  		}
4650  
4651  		lwork->notify.notify_id = notify_id;
4652  		lwork->notify.notifier_id = notifier_id;
4653  		lwork->notify.payload = payload;
4654  		lwork->notify.payload_len = payload_len;
4655  		lwork->notify.msg = ceph_msg_get(msg);
4656  		lwork_queue(lwork);
4657  	}
4658  
4659  out_unlock_lreq:
4660  	mutex_unlock(&lreq->lock);
4661  out_unlock_osdc:
4662  	up_read(&osdc->lock);
4663  	return;
4664  
4665  bad:
4666  	pr_err("osdc handle_watch_notify corrupt msg\n");
4667  }
4668  
4669  /*
4670   * Register request, send initial attempt.
4671   */
ceph_osdc_start_request(struct ceph_osd_client * osdc,struct ceph_osd_request * req)4672  void ceph_osdc_start_request(struct ceph_osd_client *osdc,
4673  			     struct ceph_osd_request *req)
4674  {
4675  	down_read(&osdc->lock);
4676  	submit_request(req, false);
4677  	up_read(&osdc->lock);
4678  }
4679  EXPORT_SYMBOL(ceph_osdc_start_request);
4680  
4681  /*
4682   * Unregister request.  If @req was registered, it isn't completed:
4683   * r_result isn't set and __complete_request() isn't invoked.
4684   *
4685   * If @req wasn't registered, this call may have raced with
4686   * handle_reply(), in which case r_result would already be set and
4687   * __complete_request() would be getting invoked, possibly even
4688   * concurrently with this call.
4689   */
ceph_osdc_cancel_request(struct ceph_osd_request * req)4690  void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4691  {
4692  	struct ceph_osd_client *osdc = req->r_osdc;
4693  
4694  	down_write(&osdc->lock);
4695  	if (req->r_osd)
4696  		cancel_request(req);
4697  	up_write(&osdc->lock);
4698  }
4699  EXPORT_SYMBOL(ceph_osdc_cancel_request);
4700  
4701  /*
4702   * @timeout: in jiffies, 0 means "wait forever"
4703   */
wait_request_timeout(struct ceph_osd_request * req,unsigned long timeout)4704  static int wait_request_timeout(struct ceph_osd_request *req,
4705  				unsigned long timeout)
4706  {
4707  	long left;
4708  
4709  	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4710  	left = wait_for_completion_killable_timeout(&req->r_completion,
4711  						ceph_timeout_jiffies(timeout));
4712  	if (left <= 0) {
4713  		left = left ?: -ETIMEDOUT;
4714  		ceph_osdc_cancel_request(req);
4715  	} else {
4716  		left = req->r_result; /* completed */
4717  	}
4718  
4719  	return left;
4720  }
4721  
4722  /*
4723   * wait for a request to complete
4724   */
ceph_osdc_wait_request(struct ceph_osd_client * osdc,struct ceph_osd_request * req)4725  int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4726  			   struct ceph_osd_request *req)
4727  {
4728  	return wait_request_timeout(req, 0);
4729  }
4730  EXPORT_SYMBOL(ceph_osdc_wait_request);
4731  
4732  /*
4733   * sync - wait for all in-flight requests to flush.  avoid starvation.
4734   */
ceph_osdc_sync(struct ceph_osd_client * osdc)4735  void ceph_osdc_sync(struct ceph_osd_client *osdc)
4736  {
4737  	struct rb_node *n, *p;
4738  	u64 last_tid = atomic64_read(&osdc->last_tid);
4739  
4740  again:
4741  	down_read(&osdc->lock);
4742  	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4743  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4744  
4745  		mutex_lock(&osd->lock);
4746  		for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4747  			struct ceph_osd_request *req =
4748  			    rb_entry(p, struct ceph_osd_request, r_node);
4749  
4750  			if (req->r_tid > last_tid)
4751  				break;
4752  
4753  			if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4754  				continue;
4755  
4756  			ceph_osdc_get_request(req);
4757  			mutex_unlock(&osd->lock);
4758  			up_read(&osdc->lock);
4759  			dout("%s waiting on req %p tid %llu last_tid %llu\n",
4760  			     __func__, req, req->r_tid, last_tid);
4761  			wait_for_completion(&req->r_completion);
4762  			ceph_osdc_put_request(req);
4763  			goto again;
4764  		}
4765  
4766  		mutex_unlock(&osd->lock);
4767  	}
4768  
4769  	up_read(&osdc->lock);
4770  	dout("%s done last_tid %llu\n", __func__, last_tid);
4771  }
4772  EXPORT_SYMBOL(ceph_osdc_sync);
4773  
4774  /*
4775   * Returns a handle, caller owns a ref.
4776   */
4777  struct ceph_osd_linger_request *
ceph_osdc_watch(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,rados_watchcb2_t wcb,rados_watcherrcb_t errcb,void * data)4778  ceph_osdc_watch(struct ceph_osd_client *osdc,
4779  		struct ceph_object_id *oid,
4780  		struct ceph_object_locator *oloc,
4781  		rados_watchcb2_t wcb,
4782  		rados_watcherrcb_t errcb,
4783  		void *data)
4784  {
4785  	struct ceph_osd_linger_request *lreq;
4786  	int ret;
4787  
4788  	lreq = linger_alloc(osdc);
4789  	if (!lreq)
4790  		return ERR_PTR(-ENOMEM);
4791  
4792  	lreq->is_watch = true;
4793  	lreq->wcb = wcb;
4794  	lreq->errcb = errcb;
4795  	lreq->data = data;
4796  	lreq->watch_valid_thru = jiffies;
4797  
4798  	ceph_oid_copy(&lreq->t.base_oid, oid);
4799  	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4800  	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4801  	ktime_get_real_ts64(&lreq->mtime);
4802  
4803  	linger_submit(lreq);
4804  	ret = linger_reg_commit_wait(lreq);
4805  	if (ret) {
4806  		linger_cancel(lreq);
4807  		goto err_put_lreq;
4808  	}
4809  
4810  	return lreq;
4811  
4812  err_put_lreq:
4813  	linger_put(lreq);
4814  	return ERR_PTR(ret);
4815  }
4816  EXPORT_SYMBOL(ceph_osdc_watch);
4817  
4818  /*
4819   * Releases a ref.
4820   *
4821   * Times out after mount_timeout to preserve rbd unmap behaviour
4822   * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4823   * with mount_timeout").
4824   */
ceph_osdc_unwatch(struct ceph_osd_client * osdc,struct ceph_osd_linger_request * lreq)4825  int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4826  		      struct ceph_osd_linger_request *lreq)
4827  {
4828  	struct ceph_options *opts = osdc->client->options;
4829  	struct ceph_osd_request *req;
4830  	int ret;
4831  
4832  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4833  	if (!req)
4834  		return -ENOMEM;
4835  
4836  	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4837  	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4838  	req->r_flags = CEPH_OSD_FLAG_WRITE;
4839  	ktime_get_real_ts64(&req->r_mtime);
4840  	osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_UNWATCH,
4841  			      lreq->linger_id, 0);
4842  
4843  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4844  	if (ret)
4845  		goto out_put_req;
4846  
4847  	ceph_osdc_start_request(osdc, req);
4848  	linger_cancel(lreq);
4849  	linger_put(lreq);
4850  	ret = wait_request_timeout(req, opts->mount_timeout);
4851  
4852  out_put_req:
4853  	ceph_osdc_put_request(req);
4854  	return ret;
4855  }
4856  EXPORT_SYMBOL(ceph_osdc_unwatch);
4857  
osd_req_op_notify_ack_init(struct ceph_osd_request * req,int which,u64 notify_id,u64 cookie,void * payload,u32 payload_len)4858  static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4859  				      u64 notify_id, u64 cookie, void *payload,
4860  				      u32 payload_len)
4861  {
4862  	struct ceph_osd_req_op *op;
4863  	struct ceph_pagelist *pl;
4864  	int ret;
4865  
4866  	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4867  
4868  	pl = ceph_pagelist_alloc(GFP_NOIO);
4869  	if (!pl)
4870  		return -ENOMEM;
4871  
4872  	ret = ceph_pagelist_encode_64(pl, notify_id);
4873  	ret |= ceph_pagelist_encode_64(pl, cookie);
4874  	if (payload) {
4875  		ret |= ceph_pagelist_encode_32(pl, payload_len);
4876  		ret |= ceph_pagelist_append(pl, payload, payload_len);
4877  	} else {
4878  		ret |= ceph_pagelist_encode_32(pl, 0);
4879  	}
4880  	if (ret) {
4881  		ceph_pagelist_release(pl);
4882  		return -ENOMEM;
4883  	}
4884  
4885  	ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4886  	op->indata_len = pl->length;
4887  	return 0;
4888  }
4889  
ceph_osdc_notify_ack(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,u64 notify_id,u64 cookie,void * payload,u32 payload_len)4890  int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4891  			 struct ceph_object_id *oid,
4892  			 struct ceph_object_locator *oloc,
4893  			 u64 notify_id,
4894  			 u64 cookie,
4895  			 void *payload,
4896  			 u32 payload_len)
4897  {
4898  	struct ceph_osd_request *req;
4899  	int ret;
4900  
4901  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4902  	if (!req)
4903  		return -ENOMEM;
4904  
4905  	ceph_oid_copy(&req->r_base_oid, oid);
4906  	ceph_oloc_copy(&req->r_base_oloc, oloc);
4907  	req->r_flags = CEPH_OSD_FLAG_READ;
4908  
4909  	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4910  					 payload_len);
4911  	if (ret)
4912  		goto out_put_req;
4913  
4914  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4915  	if (ret)
4916  		goto out_put_req;
4917  
4918  	ceph_osdc_start_request(osdc, req);
4919  	ret = ceph_osdc_wait_request(osdc, req);
4920  
4921  out_put_req:
4922  	ceph_osdc_put_request(req);
4923  	return ret;
4924  }
4925  EXPORT_SYMBOL(ceph_osdc_notify_ack);
4926  
4927  /*
4928   * @timeout: in seconds
4929   *
4930   * @preply_{pages,len} are initialized both on success and error.
4931   * The caller is responsible for:
4932   *
4933   *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4934   */
ceph_osdc_notify(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,void * payload,u32 payload_len,u32 timeout,struct page *** preply_pages,size_t * preply_len)4935  int ceph_osdc_notify(struct ceph_osd_client *osdc,
4936  		     struct ceph_object_id *oid,
4937  		     struct ceph_object_locator *oloc,
4938  		     void *payload,
4939  		     u32 payload_len,
4940  		     u32 timeout,
4941  		     struct page ***preply_pages,
4942  		     size_t *preply_len)
4943  {
4944  	struct ceph_osd_linger_request *lreq;
4945  	int ret;
4946  
4947  	WARN_ON(!timeout);
4948  	if (preply_pages) {
4949  		*preply_pages = NULL;
4950  		*preply_len = 0;
4951  	}
4952  
4953  	lreq = linger_alloc(osdc);
4954  	if (!lreq)
4955  		return -ENOMEM;
4956  
4957  	lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO);
4958  	if (!lreq->request_pl) {
4959  		ret = -ENOMEM;
4960  		goto out_put_lreq;
4961  	}
4962  
4963  	ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */
4964  	ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout);
4965  	ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len);
4966  	ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len);
4967  	if (ret) {
4968  		ret = -ENOMEM;
4969  		goto out_put_lreq;
4970  	}
4971  
4972  	/* for notify_id */
4973  	lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO);
4974  	if (IS_ERR(lreq->notify_id_pages)) {
4975  		ret = PTR_ERR(lreq->notify_id_pages);
4976  		lreq->notify_id_pages = NULL;
4977  		goto out_put_lreq;
4978  	}
4979  
4980  	lreq->preply_pages = preply_pages;
4981  	lreq->preply_len = preply_len;
4982  
4983  	ceph_oid_copy(&lreq->t.base_oid, oid);
4984  	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4985  	lreq->t.flags = CEPH_OSD_FLAG_READ;
4986  
4987  	linger_submit(lreq);
4988  	ret = linger_reg_commit_wait(lreq);
4989  	if (!ret)
4990  		ret = linger_notify_finish_wait(lreq,
4991  				 msecs_to_jiffies(2 * timeout * MSEC_PER_SEC));
4992  	else
4993  		dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4994  
4995  	linger_cancel(lreq);
4996  out_put_lreq:
4997  	linger_put(lreq);
4998  	return ret;
4999  }
5000  EXPORT_SYMBOL(ceph_osdc_notify);
5001  
5002  /*
5003   * Return the number of milliseconds since the watch was last
5004   * confirmed, or an error.  If there is an error, the watch is no
5005   * longer valid, and should be destroyed with ceph_osdc_unwatch().
5006   */
ceph_osdc_watch_check(struct ceph_osd_client * osdc,struct ceph_osd_linger_request * lreq)5007  int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
5008  			  struct ceph_osd_linger_request *lreq)
5009  {
5010  	unsigned long stamp, age;
5011  	int ret;
5012  
5013  	down_read(&osdc->lock);
5014  	mutex_lock(&lreq->lock);
5015  	stamp = lreq->watch_valid_thru;
5016  	if (!list_empty(&lreq->pending_lworks)) {
5017  		struct linger_work *lwork =
5018  		    list_first_entry(&lreq->pending_lworks,
5019  				     struct linger_work,
5020  				     pending_item);
5021  
5022  		if (time_before(lwork->queued_stamp, stamp))
5023  			stamp = lwork->queued_stamp;
5024  	}
5025  	age = jiffies - stamp;
5026  	dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
5027  	     lreq, lreq->linger_id, age, lreq->last_error);
5028  	/* we are truncating to msecs, so return a safe upper bound */
5029  	ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
5030  
5031  	mutex_unlock(&lreq->lock);
5032  	up_read(&osdc->lock);
5033  	return ret;
5034  }
5035  
decode_watcher(void ** p,void * end,struct ceph_watch_item * item)5036  static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
5037  {
5038  	u8 struct_v;
5039  	u32 struct_len;
5040  	int ret;
5041  
5042  	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
5043  				  &struct_v, &struct_len);
5044  	if (ret)
5045  		goto bad;
5046  
5047  	ret = -EINVAL;
5048  	ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
5049  	ceph_decode_64_safe(p, end, item->cookie, bad);
5050  	ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
5051  
5052  	if (struct_v >= 2) {
5053  		ret = ceph_decode_entity_addr(p, end, &item->addr);
5054  		if (ret)
5055  			goto bad;
5056  	} else {
5057  		ret = 0;
5058  	}
5059  
5060  	dout("%s %s%llu cookie %llu addr %s\n", __func__,
5061  	     ENTITY_NAME(item->name), item->cookie,
5062  	     ceph_pr_addr(&item->addr));
5063  bad:
5064  	return ret;
5065  }
5066  
decode_watchers(void ** p,void * end,struct ceph_watch_item ** watchers,u32 * num_watchers)5067  static int decode_watchers(void **p, void *end,
5068  			   struct ceph_watch_item **watchers,
5069  			   u32 *num_watchers)
5070  {
5071  	u8 struct_v;
5072  	u32 struct_len;
5073  	int i;
5074  	int ret;
5075  
5076  	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
5077  				  &struct_v, &struct_len);
5078  	if (ret)
5079  		return ret;
5080  
5081  	*num_watchers = ceph_decode_32(p);
5082  	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
5083  	if (!*watchers)
5084  		return -ENOMEM;
5085  
5086  	for (i = 0; i < *num_watchers; i++) {
5087  		ret = decode_watcher(p, end, *watchers + i);
5088  		if (ret) {
5089  			kfree(*watchers);
5090  			return ret;
5091  		}
5092  	}
5093  
5094  	return 0;
5095  }
5096  
5097  /*
5098   * On success, the caller is responsible for:
5099   *
5100   *     kfree(watchers);
5101   */
ceph_osdc_list_watchers(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,struct ceph_watch_item ** watchers,u32 * num_watchers)5102  int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
5103  			    struct ceph_object_id *oid,
5104  			    struct ceph_object_locator *oloc,
5105  			    struct ceph_watch_item **watchers,
5106  			    u32 *num_watchers)
5107  {
5108  	struct ceph_osd_request *req;
5109  	struct page **pages;
5110  	int ret;
5111  
5112  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5113  	if (!req)
5114  		return -ENOMEM;
5115  
5116  	ceph_oid_copy(&req->r_base_oid, oid);
5117  	ceph_oloc_copy(&req->r_base_oloc, oloc);
5118  	req->r_flags = CEPH_OSD_FLAG_READ;
5119  
5120  	pages = ceph_alloc_page_vector(1, GFP_NOIO);
5121  	if (IS_ERR(pages)) {
5122  		ret = PTR_ERR(pages);
5123  		goto out_put_req;
5124  	}
5125  
5126  	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
5127  	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
5128  						 response_data),
5129  				 pages, PAGE_SIZE, 0, false, true);
5130  
5131  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5132  	if (ret)
5133  		goto out_put_req;
5134  
5135  	ceph_osdc_start_request(osdc, req);
5136  	ret = ceph_osdc_wait_request(osdc, req);
5137  	if (ret >= 0) {
5138  		void *p = page_address(pages[0]);
5139  		void *const end = p + req->r_ops[0].outdata_len;
5140  
5141  		ret = decode_watchers(&p, end, watchers, num_watchers);
5142  	}
5143  
5144  out_put_req:
5145  	ceph_osdc_put_request(req);
5146  	return ret;
5147  }
5148  EXPORT_SYMBOL(ceph_osdc_list_watchers);
5149  
5150  /*
5151   * Call all pending notify callbacks - for use after a watch is
5152   * unregistered, to make sure no more callbacks for it will be invoked
5153   */
ceph_osdc_flush_notifies(struct ceph_osd_client * osdc)5154  void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
5155  {
5156  	dout("%s osdc %p\n", __func__, osdc);
5157  	flush_workqueue(osdc->notify_wq);
5158  }
5159  EXPORT_SYMBOL(ceph_osdc_flush_notifies);
5160  
ceph_osdc_maybe_request_map(struct ceph_osd_client * osdc)5161  void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
5162  {
5163  	down_read(&osdc->lock);
5164  	maybe_request_map(osdc);
5165  	up_read(&osdc->lock);
5166  }
5167  EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
5168  
5169  /*
5170   * Execute an OSD class method on an object.
5171   *
5172   * @flags: CEPH_OSD_FLAG_*
5173   * @resp_len: in/out param for reply length
5174   */
ceph_osdc_call(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,const char * class,const char * method,unsigned int flags,struct page * req_page,size_t req_len,struct page ** resp_pages,size_t * resp_len)5175  int ceph_osdc_call(struct ceph_osd_client *osdc,
5176  		   struct ceph_object_id *oid,
5177  		   struct ceph_object_locator *oloc,
5178  		   const char *class, const char *method,
5179  		   unsigned int flags,
5180  		   struct page *req_page, size_t req_len,
5181  		   struct page **resp_pages, size_t *resp_len)
5182  {
5183  	struct ceph_osd_request *req;
5184  	int ret;
5185  
5186  	if (req_len > PAGE_SIZE)
5187  		return -E2BIG;
5188  
5189  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5190  	if (!req)
5191  		return -ENOMEM;
5192  
5193  	ceph_oid_copy(&req->r_base_oid, oid);
5194  	ceph_oloc_copy(&req->r_base_oloc, oloc);
5195  	req->r_flags = flags;
5196  
5197  	ret = osd_req_op_cls_init(req, 0, class, method);
5198  	if (ret)
5199  		goto out_put_req;
5200  
5201  	if (req_page)
5202  		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
5203  						  0, false, false);
5204  	if (resp_pages)
5205  		osd_req_op_cls_response_data_pages(req, 0, resp_pages,
5206  						   *resp_len, 0, false, false);
5207  
5208  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5209  	if (ret)
5210  		goto out_put_req;
5211  
5212  	ceph_osdc_start_request(osdc, req);
5213  	ret = ceph_osdc_wait_request(osdc, req);
5214  	if (ret >= 0) {
5215  		ret = req->r_ops[0].rval;
5216  		if (resp_pages)
5217  			*resp_len = req->r_ops[0].outdata_len;
5218  	}
5219  
5220  out_put_req:
5221  	ceph_osdc_put_request(req);
5222  	return ret;
5223  }
5224  EXPORT_SYMBOL(ceph_osdc_call);
5225  
5226  /*
5227   * reset all osd connections
5228   */
ceph_osdc_reopen_osds(struct ceph_osd_client * osdc)5229  void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
5230  {
5231  	struct rb_node *n;
5232  
5233  	down_write(&osdc->lock);
5234  	for (n = rb_first(&osdc->osds); n; ) {
5235  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5236  
5237  		n = rb_next(n);
5238  		if (!reopen_osd(osd))
5239  			kick_osd_requests(osd);
5240  	}
5241  	up_write(&osdc->lock);
5242  }
5243  
5244  /*
5245   * init, shutdown
5246   */
ceph_osdc_init(struct ceph_osd_client * osdc,struct ceph_client * client)5247  int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5248  {
5249  	int err;
5250  
5251  	dout("init\n");
5252  	osdc->client = client;
5253  	init_rwsem(&osdc->lock);
5254  	osdc->osds = RB_ROOT;
5255  	INIT_LIST_HEAD(&osdc->osd_lru);
5256  	spin_lock_init(&osdc->osd_lru_lock);
5257  	osd_init(&osdc->homeless_osd);
5258  	osdc->homeless_osd.o_osdc = osdc;
5259  	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
5260  	osdc->last_linger_id = CEPH_LINGER_ID_START;
5261  	osdc->linger_requests = RB_ROOT;
5262  	osdc->map_checks = RB_ROOT;
5263  	osdc->linger_map_checks = RB_ROOT;
5264  	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
5265  	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
5266  
5267  	err = -ENOMEM;
5268  	osdc->osdmap = ceph_osdmap_alloc();
5269  	if (!osdc->osdmap)
5270  		goto out;
5271  
5272  	osdc->req_mempool = mempool_create_slab_pool(10,
5273  						     ceph_osd_request_cache);
5274  	if (!osdc->req_mempool)
5275  		goto out_map;
5276  
5277  	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5278  				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5279  	if (err < 0)
5280  		goto out_mempool;
5281  	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5282  				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5283  				"osd_op_reply");
5284  	if (err < 0)
5285  		goto out_msgpool;
5286  
5287  	err = -ENOMEM;
5288  	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
5289  	if (!osdc->notify_wq)
5290  		goto out_msgpool_reply;
5291  
5292  	osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5293  	if (!osdc->completion_wq)
5294  		goto out_notify_wq;
5295  
5296  	schedule_delayed_work(&osdc->timeout_work,
5297  			      osdc->client->options->osd_keepalive_timeout);
5298  	schedule_delayed_work(&osdc->osds_timeout_work,
5299  	    round_jiffies_relative(osdc->client->options->osd_idle_ttl));
5300  
5301  	return 0;
5302  
5303  out_notify_wq:
5304  	destroy_workqueue(osdc->notify_wq);
5305  out_msgpool_reply:
5306  	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5307  out_msgpool:
5308  	ceph_msgpool_destroy(&osdc->msgpool_op);
5309  out_mempool:
5310  	mempool_destroy(osdc->req_mempool);
5311  out_map:
5312  	ceph_osdmap_destroy(osdc->osdmap);
5313  out:
5314  	return err;
5315  }
5316  
ceph_osdc_stop(struct ceph_osd_client * osdc)5317  void ceph_osdc_stop(struct ceph_osd_client *osdc)
5318  {
5319  	destroy_workqueue(osdc->completion_wq);
5320  	destroy_workqueue(osdc->notify_wq);
5321  	cancel_delayed_work_sync(&osdc->timeout_work);
5322  	cancel_delayed_work_sync(&osdc->osds_timeout_work);
5323  
5324  	down_write(&osdc->lock);
5325  	while (!RB_EMPTY_ROOT(&osdc->osds)) {
5326  		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5327  						struct ceph_osd, o_node);
5328  		close_osd(osd);
5329  	}
5330  	up_write(&osdc->lock);
5331  	WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5332  	osd_cleanup(&osdc->homeless_osd);
5333  
5334  	WARN_ON(!list_empty(&osdc->osd_lru));
5335  	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5336  	WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5337  	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5338  	WARN_ON(atomic_read(&osdc->num_requests));
5339  	WARN_ON(atomic_read(&osdc->num_homeless));
5340  
5341  	ceph_osdmap_destroy(osdc->osdmap);
5342  	mempool_destroy(osdc->req_mempool);
5343  	ceph_msgpool_destroy(&osdc->msgpool_op);
5344  	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5345  }
5346  
osd_req_op_copy_from_init(struct ceph_osd_request * req,u64 src_snapid,u64 src_version,struct ceph_object_id * src_oid,struct ceph_object_locator * src_oloc,u32 src_fadvise_flags,u32 dst_fadvise_flags,u32 truncate_seq,u64 truncate_size,u8 copy_from_flags)5347  int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5348  			      u64 src_snapid, u64 src_version,
5349  			      struct ceph_object_id *src_oid,
5350  			      struct ceph_object_locator *src_oloc,
5351  			      u32 src_fadvise_flags,
5352  			      u32 dst_fadvise_flags,
5353  			      u32 truncate_seq, u64 truncate_size,
5354  			      u8 copy_from_flags)
5355  {
5356  	struct ceph_osd_req_op *op;
5357  	struct page **pages;
5358  	void *p, *end;
5359  
5360  	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5361  	if (IS_ERR(pages))
5362  		return PTR_ERR(pages);
5363  
5364  	op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
5365  			     dst_fadvise_flags);
5366  	op->copy_from.snapid = src_snapid;
5367  	op->copy_from.src_version = src_version;
5368  	op->copy_from.flags = copy_from_flags;
5369  	op->copy_from.src_fadvise_flags = src_fadvise_flags;
5370  
5371  	p = page_address(pages[0]);
5372  	end = p + PAGE_SIZE;
5373  	ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5374  	encode_oloc(&p, end, src_oloc);
5375  	ceph_encode_32(&p, truncate_seq);
5376  	ceph_encode_64(&p, truncate_size);
5377  	op->indata_len = PAGE_SIZE - (end - p);
5378  
5379  	ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5380  				 op->indata_len, 0, false, true);
5381  	return 0;
5382  }
5383  EXPORT_SYMBOL(osd_req_op_copy_from_init);
5384  
ceph_osdc_setup(void)5385  int __init ceph_osdc_setup(void)
5386  {
5387  	size_t size = sizeof(struct ceph_osd_request) +
5388  	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5389  
5390  	BUG_ON(ceph_osd_request_cache);
5391  	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5392  						   0, 0, NULL);
5393  
5394  	return ceph_osd_request_cache ? 0 : -ENOMEM;
5395  }
5396  
ceph_osdc_cleanup(void)5397  void ceph_osdc_cleanup(void)
5398  {
5399  	BUG_ON(!ceph_osd_request_cache);
5400  	kmem_cache_destroy(ceph_osd_request_cache);
5401  	ceph_osd_request_cache = NULL;
5402  }
5403  
5404  /*
5405   * handle incoming message
5406   */
osd_dispatch(struct ceph_connection * con,struct ceph_msg * msg)5407  static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5408  {
5409  	struct ceph_osd *osd = con->private;
5410  	struct ceph_osd_client *osdc = osd->o_osdc;
5411  	int type = le16_to_cpu(msg->hdr.type);
5412  
5413  	switch (type) {
5414  	case CEPH_MSG_OSD_MAP:
5415  		ceph_osdc_handle_map(osdc, msg);
5416  		break;
5417  	case CEPH_MSG_OSD_OPREPLY:
5418  		handle_reply(osd, msg);
5419  		break;
5420  	case CEPH_MSG_OSD_BACKOFF:
5421  		handle_backoff(osd, msg);
5422  		break;
5423  	case CEPH_MSG_WATCH_NOTIFY:
5424  		handle_watch_notify(osdc, msg);
5425  		break;
5426  
5427  	default:
5428  		pr_err("received unknown message type %d %s\n", type,
5429  		       ceph_msg_type_name(type));
5430  	}
5431  
5432  	ceph_msg_put(msg);
5433  }
5434  
5435  /* How much sparse data was requested? */
sparse_data_requested(struct ceph_osd_request * req)5436  static u64 sparse_data_requested(struct ceph_osd_request *req)
5437  {
5438  	u64 len = 0;
5439  
5440  	if (req->r_flags & CEPH_OSD_FLAG_READ) {
5441  		int i;
5442  
5443  		for (i = 0; i < req->r_num_ops; ++i) {
5444  			struct ceph_osd_req_op *op = &req->r_ops[i];
5445  
5446  			if (op->op == CEPH_OSD_OP_SPARSE_READ)
5447  				len += op->extent.length;
5448  		}
5449  	}
5450  	return len;
5451  }
5452  
5453  /*
5454   * Lookup and return message for incoming reply.  Don't try to do
5455   * anything about a larger than preallocated data portion of the
5456   * message at the moment - for now, just skip the message.
5457   */
get_reply(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5458  static struct ceph_msg *get_reply(struct ceph_connection *con,
5459  				  struct ceph_msg_header *hdr,
5460  				  int *skip)
5461  {
5462  	struct ceph_osd *osd = con->private;
5463  	struct ceph_osd_client *osdc = osd->o_osdc;
5464  	struct ceph_msg *m = NULL;
5465  	struct ceph_osd_request *req;
5466  	int front_len = le32_to_cpu(hdr->front_len);
5467  	int data_len = le32_to_cpu(hdr->data_len);
5468  	u64 tid = le64_to_cpu(hdr->tid);
5469  	u64 srlen;
5470  
5471  	down_read(&osdc->lock);
5472  	if (!osd_registered(osd)) {
5473  		dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5474  		*skip = 1;
5475  		goto out_unlock_osdc;
5476  	}
5477  	WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5478  
5479  	mutex_lock(&osd->lock);
5480  	req = lookup_request(&osd->o_requests, tid);
5481  	if (!req) {
5482  		dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5483  		     osd->o_osd, tid);
5484  		*skip = 1;
5485  		goto out_unlock_session;
5486  	}
5487  
5488  	ceph_msg_revoke_incoming(req->r_reply);
5489  
5490  	if (front_len > req->r_reply->front_alloc_len) {
5491  		pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5492  			__func__, osd->o_osd, req->r_tid, front_len,
5493  			req->r_reply->front_alloc_len);
5494  		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5495  				 false);
5496  		if (!m)
5497  			goto out_unlock_session;
5498  		ceph_msg_put(req->r_reply);
5499  		req->r_reply = m;
5500  	}
5501  
5502  	srlen = sparse_data_requested(req);
5503  	if (!srlen && data_len > req->r_reply->data_length) {
5504  		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5505  			__func__, osd->o_osd, req->r_tid, data_len,
5506  			req->r_reply->data_length);
5507  		m = NULL;
5508  		*skip = 1;
5509  		goto out_unlock_session;
5510  	}
5511  
5512  	m = ceph_msg_get(req->r_reply);
5513  	m->sparse_read_total = srlen;
5514  
5515  	dout("get_reply tid %lld %p\n", tid, m);
5516  
5517  out_unlock_session:
5518  	mutex_unlock(&osd->lock);
5519  out_unlock_osdc:
5520  	up_read(&osdc->lock);
5521  	return m;
5522  }
5523  
alloc_msg_with_page_vector(struct ceph_msg_header * hdr)5524  static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5525  {
5526  	struct ceph_msg *m;
5527  	int type = le16_to_cpu(hdr->type);
5528  	u32 front_len = le32_to_cpu(hdr->front_len);
5529  	u32 data_len = le32_to_cpu(hdr->data_len);
5530  
5531  	m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5532  	if (!m)
5533  		return NULL;
5534  
5535  	if (data_len) {
5536  		struct page **pages;
5537  
5538  		pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5539  					       GFP_NOIO);
5540  		if (IS_ERR(pages)) {
5541  			ceph_msg_put(m);
5542  			return NULL;
5543  		}
5544  
5545  		ceph_msg_data_add_pages(m, pages, data_len, 0, true);
5546  	}
5547  
5548  	return m;
5549  }
5550  
osd_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5551  static struct ceph_msg *osd_alloc_msg(struct ceph_connection *con,
5552  				      struct ceph_msg_header *hdr,
5553  				      int *skip)
5554  {
5555  	struct ceph_osd *osd = con->private;
5556  	int type = le16_to_cpu(hdr->type);
5557  
5558  	*skip = 0;
5559  	switch (type) {
5560  	case CEPH_MSG_OSD_MAP:
5561  	case CEPH_MSG_OSD_BACKOFF:
5562  	case CEPH_MSG_WATCH_NOTIFY:
5563  		return alloc_msg_with_page_vector(hdr);
5564  	case CEPH_MSG_OSD_OPREPLY:
5565  		return get_reply(con, hdr, skip);
5566  	default:
5567  		pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5568  			osd->o_osd, type);
5569  		*skip = 1;
5570  		return NULL;
5571  	}
5572  }
5573  
5574  /*
5575   * Wrappers to refcount containing ceph_osd struct
5576   */
osd_get_con(struct ceph_connection * con)5577  static struct ceph_connection *osd_get_con(struct ceph_connection *con)
5578  {
5579  	struct ceph_osd *osd = con->private;
5580  	if (get_osd(osd))
5581  		return con;
5582  	return NULL;
5583  }
5584  
osd_put_con(struct ceph_connection * con)5585  static void osd_put_con(struct ceph_connection *con)
5586  {
5587  	struct ceph_osd *osd = con->private;
5588  	put_osd(osd);
5589  }
5590  
5591  /*
5592   * authentication
5593   */
5594  
5595  /*
5596   * Note: returned pointer is the address of a structure that's
5597   * managed separately.  Caller must *not* attempt to free it.
5598   */
5599  static struct ceph_auth_handshake *
osd_get_authorizer(struct ceph_connection * con,int * proto,int force_new)5600  osd_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5601  {
5602  	struct ceph_osd *o = con->private;
5603  	struct ceph_osd_client *osdc = o->o_osdc;
5604  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5605  	struct ceph_auth_handshake *auth = &o->o_auth;
5606  	int ret;
5607  
5608  	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5609  					 force_new, proto, NULL, NULL);
5610  	if (ret)
5611  		return ERR_PTR(ret);
5612  
5613  	return auth;
5614  }
5615  
osd_add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)5616  static int osd_add_authorizer_challenge(struct ceph_connection *con,
5617  				    void *challenge_buf, int challenge_buf_len)
5618  {
5619  	struct ceph_osd *o = con->private;
5620  	struct ceph_osd_client *osdc = o->o_osdc;
5621  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5622  
5623  	return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer,
5624  					    challenge_buf, challenge_buf_len);
5625  }
5626  
osd_verify_authorizer_reply(struct ceph_connection * con)5627  static int osd_verify_authorizer_reply(struct ceph_connection *con)
5628  {
5629  	struct ceph_osd *o = con->private;
5630  	struct ceph_osd_client *osdc = o->o_osdc;
5631  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5632  	struct ceph_auth_handshake *auth = &o->o_auth;
5633  
5634  	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5635  		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5636  		NULL, NULL, NULL, NULL);
5637  }
5638  
osd_invalidate_authorizer(struct ceph_connection * con)5639  static int osd_invalidate_authorizer(struct ceph_connection *con)
5640  {
5641  	struct ceph_osd *o = con->private;
5642  	struct ceph_osd_client *osdc = o->o_osdc;
5643  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5644  
5645  	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5646  	return ceph_monc_validate_auth(&osdc->client->monc);
5647  }
5648  
osd_get_auth_request(struct ceph_connection * con,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5649  static int osd_get_auth_request(struct ceph_connection *con,
5650  				void *buf, int *buf_len,
5651  				void **authorizer, int *authorizer_len)
5652  {
5653  	struct ceph_osd *o = con->private;
5654  	struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5655  	struct ceph_auth_handshake *auth = &o->o_auth;
5656  	int ret;
5657  
5658  	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5659  				       buf, buf_len);
5660  	if (ret)
5661  		return ret;
5662  
5663  	*authorizer = auth->authorizer_buf;
5664  	*authorizer_len = auth->authorizer_buf_len;
5665  	return 0;
5666  }
5667  
osd_handle_auth_reply_more(struct ceph_connection * con,void * reply,int reply_len,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5668  static int osd_handle_auth_reply_more(struct ceph_connection *con,
5669  				      void *reply, int reply_len,
5670  				      void *buf, int *buf_len,
5671  				      void **authorizer, int *authorizer_len)
5672  {
5673  	struct ceph_osd *o = con->private;
5674  	struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5675  	struct ceph_auth_handshake *auth = &o->o_auth;
5676  	int ret;
5677  
5678  	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5679  					      buf, buf_len);
5680  	if (ret)
5681  		return ret;
5682  
5683  	*authorizer = auth->authorizer_buf;
5684  	*authorizer_len = auth->authorizer_buf_len;
5685  	return 0;
5686  }
5687  
osd_handle_auth_done(struct ceph_connection * con,u64 global_id,void * reply,int reply_len,u8 * session_key,int * session_key_len,u8 * con_secret,int * con_secret_len)5688  static int osd_handle_auth_done(struct ceph_connection *con,
5689  				u64 global_id, void *reply, int reply_len,
5690  				u8 *session_key, int *session_key_len,
5691  				u8 *con_secret, int *con_secret_len)
5692  {
5693  	struct ceph_osd *o = con->private;
5694  	struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5695  	struct ceph_auth_handshake *auth = &o->o_auth;
5696  
5697  	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5698  					       session_key, session_key_len,
5699  					       con_secret, con_secret_len);
5700  }
5701  
osd_handle_auth_bad_method(struct ceph_connection * con,int used_proto,int result,const int * allowed_protos,int proto_cnt,const int * allowed_modes,int mode_cnt)5702  static int osd_handle_auth_bad_method(struct ceph_connection *con,
5703  				      int used_proto, int result,
5704  				      const int *allowed_protos, int proto_cnt,
5705  				      const int *allowed_modes, int mode_cnt)
5706  {
5707  	struct ceph_osd *o = con->private;
5708  	struct ceph_mon_client *monc = &o->o_osdc->client->monc;
5709  	int ret;
5710  
5711  	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_OSD,
5712  					    used_proto, result,
5713  					    allowed_protos, proto_cnt,
5714  					    allowed_modes, mode_cnt)) {
5715  		ret = ceph_monc_validate_auth(monc);
5716  		if (ret)
5717  			return ret;
5718  	}
5719  
5720  	return -EACCES;
5721  }
5722  
osd_reencode_message(struct ceph_msg * msg)5723  static void osd_reencode_message(struct ceph_msg *msg)
5724  {
5725  	int type = le16_to_cpu(msg->hdr.type);
5726  
5727  	if (type == CEPH_MSG_OSD_OP)
5728  		encode_request_finish(msg);
5729  }
5730  
osd_sign_message(struct ceph_msg * msg)5731  static int osd_sign_message(struct ceph_msg *msg)
5732  {
5733  	struct ceph_osd *o = msg->con->private;
5734  	struct ceph_auth_handshake *auth = &o->o_auth;
5735  
5736  	return ceph_auth_sign_message(auth, msg);
5737  }
5738  
osd_check_message_signature(struct ceph_msg * msg)5739  static int osd_check_message_signature(struct ceph_msg *msg)
5740  {
5741  	struct ceph_osd *o = msg->con->private;
5742  	struct ceph_auth_handshake *auth = &o->o_auth;
5743  
5744  	return ceph_auth_check_message_signature(auth, msg);
5745  }
5746  
advance_cursor(struct ceph_msg_data_cursor * cursor,size_t len,bool zero)5747  static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len,
5748  			   bool zero)
5749  {
5750  	while (len) {
5751  		struct page *page;
5752  		size_t poff, plen;
5753  
5754  		page = ceph_msg_data_next(cursor, &poff, &plen);
5755  		if (plen > len)
5756  			plen = len;
5757  		if (zero)
5758  			zero_user_segment(page, poff, poff + plen);
5759  		len -= plen;
5760  		ceph_msg_data_advance(cursor, plen);
5761  	}
5762  }
5763  
prep_next_sparse_read(struct ceph_connection * con,struct ceph_msg_data_cursor * cursor)5764  static int prep_next_sparse_read(struct ceph_connection *con,
5765  				 struct ceph_msg_data_cursor *cursor)
5766  {
5767  	struct ceph_osd *o = con->private;
5768  	struct ceph_sparse_read *sr = &o->o_sparse_read;
5769  	struct ceph_osd_request *req;
5770  	struct ceph_osd_req_op *op;
5771  
5772  	spin_lock(&o->o_requests_lock);
5773  	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
5774  	if (!req) {
5775  		spin_unlock(&o->o_requests_lock);
5776  		return -EBADR;
5777  	}
5778  
5779  	if (o->o_sparse_op_idx < 0) {
5780  		dout("%s: [%d] starting new sparse read req\n",
5781  		     __func__, o->o_osd);
5782  	} else {
5783  		u64 end;
5784  
5785  		op = &req->r_ops[o->o_sparse_op_idx];
5786  
5787  		WARN_ON_ONCE(op->extent.sparse_ext);
5788  
5789  		/* hand back buffer we took earlier */
5790  		op->extent.sparse_ext = sr->sr_extent;
5791  		sr->sr_extent = NULL;
5792  		op->extent.sparse_ext_cnt = sr->sr_count;
5793  		sr->sr_ext_len = 0;
5794  		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
5795  		     __func__, o->o_osd, op->extent.sparse_ext_cnt, cursor->resid);
5796  		/* Advance to end of data for this operation */
5797  		end = ceph_sparse_ext_map_end(op);
5798  		if (end < sr->sr_req_len)
5799  			advance_cursor(cursor, sr->sr_req_len - end, false);
5800  	}
5801  
5802  	ceph_init_sparse_read(sr);
5803  
5804  	/* find next op in this request (if any) */
5805  	while (++o->o_sparse_op_idx < req->r_num_ops) {
5806  		op = &req->r_ops[o->o_sparse_op_idx];
5807  		if (op->op == CEPH_OSD_OP_SPARSE_READ)
5808  			goto found;
5809  	}
5810  
5811  	/* reset for next sparse read request */
5812  	spin_unlock(&o->o_requests_lock);
5813  	o->o_sparse_op_idx = -1;
5814  	return 0;
5815  found:
5816  	sr->sr_req_off = op->extent.offset;
5817  	sr->sr_req_len = op->extent.length;
5818  	sr->sr_pos = sr->sr_req_off;
5819  	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
5820  	     o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
5821  
5822  	/* hand off request's sparse extent map buffer */
5823  	sr->sr_ext_len = op->extent.sparse_ext_cnt;
5824  	op->extent.sparse_ext_cnt = 0;
5825  	sr->sr_extent = op->extent.sparse_ext;
5826  	op->extent.sparse_ext = NULL;
5827  
5828  	spin_unlock(&o->o_requests_lock);
5829  	return 1;
5830  }
5831  
5832  #ifdef __BIG_ENDIAN
convert_extent_map(struct ceph_sparse_read * sr)5833  static inline void convert_extent_map(struct ceph_sparse_read *sr)
5834  {
5835  	int i;
5836  
5837  	for (i = 0; i < sr->sr_count; i++) {
5838  		struct ceph_sparse_extent *ext = &sr->sr_extent[i];
5839  
5840  		ext->off = le64_to_cpu((__force __le64)ext->off);
5841  		ext->len = le64_to_cpu((__force __le64)ext->len);
5842  	}
5843  }
5844  #else
convert_extent_map(struct ceph_sparse_read * sr)5845  static inline void convert_extent_map(struct ceph_sparse_read *sr)
5846  {
5847  }
5848  #endif
5849  
osd_sparse_read(struct ceph_connection * con,struct ceph_msg_data_cursor * cursor,char ** pbuf)5850  static int osd_sparse_read(struct ceph_connection *con,
5851  			   struct ceph_msg_data_cursor *cursor,
5852  			   char **pbuf)
5853  {
5854  	struct ceph_osd *o = con->private;
5855  	struct ceph_sparse_read *sr = &o->o_sparse_read;
5856  	u32 count = sr->sr_count;
5857  	u64 eoff, elen, len = 0;
5858  	int i, ret;
5859  
5860  	switch (sr->sr_state) {
5861  	case CEPH_SPARSE_READ_HDR:
5862  next_op:
5863  		ret = prep_next_sparse_read(con, cursor);
5864  		if (ret <= 0)
5865  			return ret;
5866  
5867  		/* number of extents */
5868  		ret = sizeof(sr->sr_count);
5869  		*pbuf = (char *)&sr->sr_count;
5870  		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
5871  		break;
5872  	case CEPH_SPARSE_READ_EXTENTS:
5873  		/* Convert sr_count to host-endian */
5874  		count = le32_to_cpu((__force __le32)sr->sr_count);
5875  		sr->sr_count = count;
5876  		dout("[%d] got %u extents\n", o->o_osd, count);
5877  
5878  		if (count > 0) {
5879  			if (!sr->sr_extent || count > sr->sr_ext_len) {
5880  				/* no extent array provided, or too short */
5881  				kfree(sr->sr_extent);
5882  				sr->sr_extent = kmalloc_array(count,
5883  							      sizeof(*sr->sr_extent),
5884  							      GFP_NOIO);
5885  				if (!sr->sr_extent) {
5886  					pr_err("%s: failed to allocate %u extents\n",
5887  					       __func__, count);
5888  					return -ENOMEM;
5889  				}
5890  				sr->sr_ext_len = count;
5891  			}
5892  			ret = count * sizeof(*sr->sr_extent);
5893  			*pbuf = (char *)sr->sr_extent;
5894  			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
5895  			break;
5896  		}
5897  		/* No extents? Read data len */
5898  		fallthrough;
5899  	case CEPH_SPARSE_READ_DATA_LEN:
5900  		convert_extent_map(sr);
5901  		ret = sizeof(sr->sr_datalen);
5902  		*pbuf = (char *)&sr->sr_datalen;
5903  		sr->sr_state = CEPH_SPARSE_READ_DATA_PRE;
5904  		break;
5905  	case CEPH_SPARSE_READ_DATA_PRE:
5906  		/* Convert sr_datalen to host-endian */
5907  		sr->sr_datalen = le32_to_cpu((__force __le32)sr->sr_datalen);
5908  		for (i = 0; i < count; i++)
5909  			len += sr->sr_extent[i].len;
5910  		if (sr->sr_datalen != len) {
5911  			pr_warn_ratelimited("data len %u != extent len %llu\n",
5912  					    sr->sr_datalen, len);
5913  			return -EREMOTEIO;
5914  		}
5915  		sr->sr_state = CEPH_SPARSE_READ_DATA;
5916  		fallthrough;
5917  	case CEPH_SPARSE_READ_DATA:
5918  		if (sr->sr_index >= count) {
5919  			sr->sr_state = CEPH_SPARSE_READ_HDR;
5920  			goto next_op;
5921  		}
5922  
5923  		eoff = sr->sr_extent[sr->sr_index].off;
5924  		elen = sr->sr_extent[sr->sr_index].len;
5925  
5926  		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
5927  		     o->o_osd, sr->sr_index, eoff, elen);
5928  
5929  		if (elen > INT_MAX) {
5930  			dout("Sparse read extent length too long (0x%llx)\n",
5931  			     elen);
5932  			return -EREMOTEIO;
5933  		}
5934  
5935  		/* zero out anything from sr_pos to start of extent */
5936  		if (sr->sr_pos < eoff)
5937  			advance_cursor(cursor, eoff - sr->sr_pos, true);
5938  
5939  		/* Set position to end of extent */
5940  		sr->sr_pos = eoff + elen;
5941  
5942  		/* send back the new length and nullify the ptr */
5943  		cursor->sr_resid = elen;
5944  		ret = elen;
5945  		*pbuf = NULL;
5946  
5947  		/* Bump the array index */
5948  		++sr->sr_index;
5949  		break;
5950  	}
5951  	return ret;
5952  }
5953  
5954  static const struct ceph_connection_operations osd_con_ops = {
5955  	.get = osd_get_con,
5956  	.put = osd_put_con,
5957  	.sparse_read = osd_sparse_read,
5958  	.alloc_msg = osd_alloc_msg,
5959  	.dispatch = osd_dispatch,
5960  	.fault = osd_fault,
5961  	.reencode_message = osd_reencode_message,
5962  	.get_authorizer = osd_get_authorizer,
5963  	.add_authorizer_challenge = osd_add_authorizer_challenge,
5964  	.verify_authorizer_reply = osd_verify_authorizer_reply,
5965  	.invalidate_authorizer = osd_invalidate_authorizer,
5966  	.sign_message = osd_sign_message,
5967  	.check_message_signature = osd_check_message_signature,
5968  	.get_auth_request = osd_get_auth_request,
5969  	.handle_auth_reply_more = osd_handle_auth_reply_more,
5970  	.handle_auth_done = osd_handle_auth_done,
5971  	.handle_auth_bad_method = osd_handle_auth_bad_method,
5972  };
5973