1  // SPDX-License-Identifier: GPL-2.0
2  #include <linux/ceph/ceph_debug.h>
3  
4  #include <linux/fs.h>
5  #include <linux/wait.h>
6  #include <linux/slab.h>
7  #include <linux/gfp.h>
8  #include <linux/sched.h>
9  #include <linux/debugfs.h>
10  #include <linux/seq_file.h>
11  #include <linux/ratelimit.h>
12  #include <linux/bits.h>
13  #include <linux/ktime.h>
14  #include <linux/bitmap.h>
15  #include <linux/mnt_idmapping.h>
16  
17  #include "super.h"
18  #include "mds_client.h"
19  #include "crypto.h"
20  
21  #include <linux/ceph/ceph_features.h>
22  #include <linux/ceph/messenger.h>
23  #include <linux/ceph/decode.h>
24  #include <linux/ceph/pagelist.h>
25  #include <linux/ceph/auth.h>
26  #include <linux/ceph/debugfs.h>
27  
28  #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
29  
30  /*
31   * A cluster of MDS (metadata server) daemons is responsible for
32   * managing the file system namespace (the directory hierarchy and
33   * inodes) and for coordinating shared access to storage.  Metadata is
34   * partitioning hierarchically across a number of servers, and that
35   * partition varies over time as the cluster adjusts the distribution
36   * in order to balance load.
37   *
38   * The MDS client is primarily responsible to managing synchronous
39   * metadata requests for operations like open, unlink, and so forth.
40   * If there is a MDS failure, we find out about it when we (possibly
41   * request and) receive a new MDS map, and can resubmit affected
42   * requests.
43   *
44   * For the most part, though, we take advantage of a lossless
45   * communications channel to the MDS, and do not need to worry about
46   * timing out or resubmitting requests.
47   *
48   * We maintain a stateful "session" with each MDS we interact with.
49   * Within each session, we sent periodic heartbeat messages to ensure
50   * any capabilities or leases we have been issues remain valid.  If
51   * the session times out and goes stale, our leases and capabilities
52   * are no longer valid.
53   */
54  
55  struct ceph_reconnect_state {
56  	struct ceph_mds_session *session;
57  	int nr_caps, nr_realms;
58  	struct ceph_pagelist *pagelist;
59  	unsigned msg_version;
60  	bool allow_multi;
61  };
62  
63  static void __wake_requests(struct ceph_mds_client *mdsc,
64  			    struct list_head *head);
65  static void ceph_cap_release_work(struct work_struct *work);
66  static void ceph_cap_reclaim_work(struct work_struct *work);
67  
68  static const struct ceph_connection_operations mds_con_ops;
69  
70  
71  /*
72   * mds reply parsing
73   */
74  
parse_reply_info_quota(void ** p,void * end,struct ceph_mds_reply_info_in * info)75  static int parse_reply_info_quota(void **p, void *end,
76  				  struct ceph_mds_reply_info_in *info)
77  {
78  	u8 struct_v, struct_compat;
79  	u32 struct_len;
80  
81  	ceph_decode_8_safe(p, end, struct_v, bad);
82  	ceph_decode_8_safe(p, end, struct_compat, bad);
83  	/* struct_v is expected to be >= 1. we only
84  	 * understand encoding with struct_compat == 1. */
85  	if (!struct_v || struct_compat != 1)
86  		goto bad;
87  	ceph_decode_32_safe(p, end, struct_len, bad);
88  	ceph_decode_need(p, end, struct_len, bad);
89  	end = *p + struct_len;
90  	ceph_decode_64_safe(p, end, info->max_bytes, bad);
91  	ceph_decode_64_safe(p, end, info->max_files, bad);
92  	*p = end;
93  	return 0;
94  bad:
95  	return -EIO;
96  }
97  
98  /*
99   * parse individual inode info
100   */
parse_reply_info_in(void ** p,void * end,struct ceph_mds_reply_info_in * info,u64 features)101  static int parse_reply_info_in(void **p, void *end,
102  			       struct ceph_mds_reply_info_in *info,
103  			       u64 features)
104  {
105  	int err = 0;
106  	u8 struct_v = 0;
107  
108  	if (features == (u64)-1) {
109  		u32 struct_len;
110  		u8 struct_compat;
111  		ceph_decode_8_safe(p, end, struct_v, bad);
112  		ceph_decode_8_safe(p, end, struct_compat, bad);
113  		/* struct_v is expected to be >= 1. we only understand
114  		 * encoding with struct_compat == 1. */
115  		if (!struct_v || struct_compat != 1)
116  			goto bad;
117  		ceph_decode_32_safe(p, end, struct_len, bad);
118  		ceph_decode_need(p, end, struct_len, bad);
119  		end = *p + struct_len;
120  	}
121  
122  	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
123  	info->in = *p;
124  	*p += sizeof(struct ceph_mds_reply_inode) +
125  		sizeof(*info->in->fragtree.splits) *
126  		le32_to_cpu(info->in->fragtree.nsplits);
127  
128  	ceph_decode_32_safe(p, end, info->symlink_len, bad);
129  	ceph_decode_need(p, end, info->symlink_len, bad);
130  	info->symlink = *p;
131  	*p += info->symlink_len;
132  
133  	ceph_decode_copy_safe(p, end, &info->dir_layout,
134  			      sizeof(info->dir_layout), bad);
135  	ceph_decode_32_safe(p, end, info->xattr_len, bad);
136  	ceph_decode_need(p, end, info->xattr_len, bad);
137  	info->xattr_data = *p;
138  	*p += info->xattr_len;
139  
140  	if (features == (u64)-1) {
141  		/* inline data */
142  		ceph_decode_64_safe(p, end, info->inline_version, bad);
143  		ceph_decode_32_safe(p, end, info->inline_len, bad);
144  		ceph_decode_need(p, end, info->inline_len, bad);
145  		info->inline_data = *p;
146  		*p += info->inline_len;
147  		/* quota */
148  		err = parse_reply_info_quota(p, end, info);
149  		if (err < 0)
150  			goto out_bad;
151  		/* pool namespace */
152  		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
153  		if (info->pool_ns_len > 0) {
154  			ceph_decode_need(p, end, info->pool_ns_len, bad);
155  			info->pool_ns_data = *p;
156  			*p += info->pool_ns_len;
157  		}
158  
159  		/* btime */
160  		ceph_decode_need(p, end, sizeof(info->btime), bad);
161  		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
162  
163  		/* change attribute */
164  		ceph_decode_64_safe(p, end, info->change_attr, bad);
165  
166  		/* dir pin */
167  		if (struct_v >= 2) {
168  			ceph_decode_32_safe(p, end, info->dir_pin, bad);
169  		} else {
170  			info->dir_pin = -ENODATA;
171  		}
172  
173  		/* snapshot birth time, remains zero for v<=2 */
174  		if (struct_v >= 3) {
175  			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
176  			ceph_decode_copy(p, &info->snap_btime,
177  					 sizeof(info->snap_btime));
178  		} else {
179  			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
180  		}
181  
182  		/* snapshot count, remains zero for v<=3 */
183  		if (struct_v >= 4) {
184  			ceph_decode_64_safe(p, end, info->rsnaps, bad);
185  		} else {
186  			info->rsnaps = 0;
187  		}
188  
189  		if (struct_v >= 5) {
190  			u32 alen;
191  
192  			ceph_decode_32_safe(p, end, alen, bad);
193  
194  			while (alen--) {
195  				u32 len;
196  
197  				/* key */
198  				ceph_decode_32_safe(p, end, len, bad);
199  				ceph_decode_skip_n(p, end, len, bad);
200  				/* value */
201  				ceph_decode_32_safe(p, end, len, bad);
202  				ceph_decode_skip_n(p, end, len, bad);
203  			}
204  		}
205  
206  		/* fscrypt flag -- ignore */
207  		if (struct_v >= 6)
208  			ceph_decode_skip_8(p, end, bad);
209  
210  		info->fscrypt_auth = NULL;
211  		info->fscrypt_auth_len = 0;
212  		info->fscrypt_file = NULL;
213  		info->fscrypt_file_len = 0;
214  		if (struct_v >= 7) {
215  			ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
216  			if (info->fscrypt_auth_len) {
217  				info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
218  							     GFP_KERNEL);
219  				if (!info->fscrypt_auth)
220  					return -ENOMEM;
221  				ceph_decode_copy_safe(p, end, info->fscrypt_auth,
222  						      info->fscrypt_auth_len, bad);
223  			}
224  			ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
225  			if (info->fscrypt_file_len) {
226  				info->fscrypt_file = kmalloc(info->fscrypt_file_len,
227  							     GFP_KERNEL);
228  				if (!info->fscrypt_file)
229  					return -ENOMEM;
230  				ceph_decode_copy_safe(p, end, info->fscrypt_file,
231  						      info->fscrypt_file_len, bad);
232  			}
233  		}
234  		*p = end;
235  	} else {
236  		/* legacy (unversioned) struct */
237  		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
238  			ceph_decode_64_safe(p, end, info->inline_version, bad);
239  			ceph_decode_32_safe(p, end, info->inline_len, bad);
240  			ceph_decode_need(p, end, info->inline_len, bad);
241  			info->inline_data = *p;
242  			*p += info->inline_len;
243  		} else
244  			info->inline_version = CEPH_INLINE_NONE;
245  
246  		if (features & CEPH_FEATURE_MDS_QUOTA) {
247  			err = parse_reply_info_quota(p, end, info);
248  			if (err < 0)
249  				goto out_bad;
250  		} else {
251  			info->max_bytes = 0;
252  			info->max_files = 0;
253  		}
254  
255  		info->pool_ns_len = 0;
256  		info->pool_ns_data = NULL;
257  		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
258  			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
259  			if (info->pool_ns_len > 0) {
260  				ceph_decode_need(p, end, info->pool_ns_len, bad);
261  				info->pool_ns_data = *p;
262  				*p += info->pool_ns_len;
263  			}
264  		}
265  
266  		if (features & CEPH_FEATURE_FS_BTIME) {
267  			ceph_decode_need(p, end, sizeof(info->btime), bad);
268  			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
269  			ceph_decode_64_safe(p, end, info->change_attr, bad);
270  		}
271  
272  		info->dir_pin = -ENODATA;
273  		/* info->snap_btime and info->rsnaps remain zero */
274  	}
275  	return 0;
276  bad:
277  	err = -EIO;
278  out_bad:
279  	return err;
280  }
281  
parse_reply_info_dir(void ** p,void * end,struct ceph_mds_reply_dirfrag ** dirfrag,u64 features)282  static int parse_reply_info_dir(void **p, void *end,
283  				struct ceph_mds_reply_dirfrag **dirfrag,
284  				u64 features)
285  {
286  	if (features == (u64)-1) {
287  		u8 struct_v, struct_compat;
288  		u32 struct_len;
289  		ceph_decode_8_safe(p, end, struct_v, bad);
290  		ceph_decode_8_safe(p, end, struct_compat, bad);
291  		/* struct_v is expected to be >= 1. we only understand
292  		 * encoding whose struct_compat == 1. */
293  		if (!struct_v || struct_compat != 1)
294  			goto bad;
295  		ceph_decode_32_safe(p, end, struct_len, bad);
296  		ceph_decode_need(p, end, struct_len, bad);
297  		end = *p + struct_len;
298  	}
299  
300  	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
301  	*dirfrag = *p;
302  	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
303  	if (unlikely(*p > end))
304  		goto bad;
305  	if (features == (u64)-1)
306  		*p = end;
307  	return 0;
308  bad:
309  	return -EIO;
310  }
311  
parse_reply_info_lease(void ** p,void * end,struct ceph_mds_reply_lease ** lease,u64 features,u32 * altname_len,u8 ** altname)312  static int parse_reply_info_lease(void **p, void *end,
313  				  struct ceph_mds_reply_lease **lease,
314  				  u64 features, u32 *altname_len, u8 **altname)
315  {
316  	u8 struct_v;
317  	u32 struct_len;
318  	void *lend;
319  
320  	if (features == (u64)-1) {
321  		u8 struct_compat;
322  
323  		ceph_decode_8_safe(p, end, struct_v, bad);
324  		ceph_decode_8_safe(p, end, struct_compat, bad);
325  
326  		/* struct_v is expected to be >= 1. we only understand
327  		 * encoding whose struct_compat == 1. */
328  		if (!struct_v || struct_compat != 1)
329  			goto bad;
330  
331  		ceph_decode_32_safe(p, end, struct_len, bad);
332  	} else {
333  		struct_len = sizeof(**lease);
334  		*altname_len = 0;
335  		*altname = NULL;
336  	}
337  
338  	lend = *p + struct_len;
339  	ceph_decode_need(p, end, struct_len, bad);
340  	*lease = *p;
341  	*p += sizeof(**lease);
342  
343  	if (features == (u64)-1) {
344  		if (struct_v >= 2) {
345  			ceph_decode_32_safe(p, end, *altname_len, bad);
346  			ceph_decode_need(p, end, *altname_len, bad);
347  			*altname = *p;
348  			*p += *altname_len;
349  		} else {
350  			*altname = NULL;
351  			*altname_len = 0;
352  		}
353  	}
354  	*p = lend;
355  	return 0;
356  bad:
357  	return -EIO;
358  }
359  
360  /*
361   * parse a normal reply, which may contain a (dir+)dentry and/or a
362   * target inode.
363   */
parse_reply_info_trace(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)364  static int parse_reply_info_trace(void **p, void *end,
365  				  struct ceph_mds_reply_info_parsed *info,
366  				  u64 features)
367  {
368  	int err;
369  
370  	if (info->head->is_dentry) {
371  		err = parse_reply_info_in(p, end, &info->diri, features);
372  		if (err < 0)
373  			goto out_bad;
374  
375  		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
376  		if (err < 0)
377  			goto out_bad;
378  
379  		ceph_decode_32_safe(p, end, info->dname_len, bad);
380  		ceph_decode_need(p, end, info->dname_len, bad);
381  		info->dname = *p;
382  		*p += info->dname_len;
383  
384  		err = parse_reply_info_lease(p, end, &info->dlease, features,
385  					     &info->altname_len, &info->altname);
386  		if (err < 0)
387  			goto out_bad;
388  	}
389  
390  	if (info->head->is_target) {
391  		err = parse_reply_info_in(p, end, &info->targeti, features);
392  		if (err < 0)
393  			goto out_bad;
394  	}
395  
396  	if (unlikely(*p != end))
397  		goto bad;
398  	return 0;
399  
400  bad:
401  	err = -EIO;
402  out_bad:
403  	pr_err("problem parsing mds trace %d\n", err);
404  	return err;
405  }
406  
407  /*
408   * parse readdir results
409   */
parse_reply_info_readdir(void ** p,void * end,struct ceph_mds_request * req,u64 features)410  static int parse_reply_info_readdir(void **p, void *end,
411  				    struct ceph_mds_request *req,
412  				    u64 features)
413  {
414  	struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
415  	struct ceph_client *cl = req->r_mdsc->fsc->client;
416  	u32 num, i = 0;
417  	int err;
418  
419  	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
420  	if (err < 0)
421  		goto out_bad;
422  
423  	ceph_decode_need(p, end, sizeof(num) + 2, bad);
424  	num = ceph_decode_32(p);
425  	{
426  		u16 flags = ceph_decode_16(p);
427  		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
428  		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
429  		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
430  		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
431  	}
432  	if (num == 0)
433  		goto done;
434  
435  	BUG_ON(!info->dir_entries);
436  	if ((unsigned long)(info->dir_entries + num) >
437  	    (unsigned long)info->dir_entries + info->dir_buf_size) {
438  		pr_err_client(cl, "dir contents are larger than expected\n");
439  		WARN_ON(1);
440  		goto bad;
441  	}
442  
443  	info->dir_nr = num;
444  	while (num) {
445  		struct inode *inode = d_inode(req->r_dentry);
446  		struct ceph_inode_info *ci = ceph_inode(inode);
447  		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
448  		struct fscrypt_str tname = FSTR_INIT(NULL, 0);
449  		struct fscrypt_str oname = FSTR_INIT(NULL, 0);
450  		struct ceph_fname fname;
451  		u32 altname_len, _name_len;
452  		u8 *altname, *_name;
453  
454  		/* dentry */
455  		ceph_decode_32_safe(p, end, _name_len, bad);
456  		ceph_decode_need(p, end, _name_len, bad);
457  		_name = *p;
458  		*p += _name_len;
459  		doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name);
460  
461  		if (info->hash_order)
462  			rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
463  						      _name, _name_len);
464  
465  		/* dentry lease */
466  		err = parse_reply_info_lease(p, end, &rde->lease, features,
467  					     &altname_len, &altname);
468  		if (err)
469  			goto out_bad;
470  
471  		/*
472  		 * Try to dencrypt the dentry names and update them
473  		 * in the ceph_mds_reply_dir_entry struct.
474  		 */
475  		fname.dir = inode;
476  		fname.name = _name;
477  		fname.name_len = _name_len;
478  		fname.ctext = altname;
479  		fname.ctext_len = altname_len;
480  		/*
481  		 * The _name_len maybe larger than altname_len, such as
482  		 * when the human readable name length is in range of
483  		 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
484  		 * then the copy in ceph_fname_to_usr will corrupt the
485  		 * data if there has no encryption key.
486  		 *
487  		 * Just set the no_copy flag and then if there has no
488  		 * encryption key the oname.name will be assigned to
489  		 * _name always.
490  		 */
491  		fname.no_copy = true;
492  		if (altname_len == 0) {
493  			/*
494  			 * Set tname to _name, and this will be used
495  			 * to do the base64_decode in-place. It's
496  			 * safe because the decoded string should
497  			 * always be shorter, which is 3/4 of origin
498  			 * string.
499  			 */
500  			tname.name = _name;
501  
502  			/*
503  			 * Set oname to _name too, and this will be
504  			 * used to do the dencryption in-place.
505  			 */
506  			oname.name = _name;
507  			oname.len = _name_len;
508  		} else {
509  			/*
510  			 * This will do the decryption only in-place
511  			 * from altname cryptext directly.
512  			 */
513  			oname.name = altname;
514  			oname.len = altname_len;
515  		}
516  		rde->is_nokey = false;
517  		err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
518  		if (err) {
519  			pr_err_client(cl, "unable to decode %.*s, got %d\n",
520  				      _name_len, _name, err);
521  			goto out_bad;
522  		}
523  		rde->name = oname.name;
524  		rde->name_len = oname.len;
525  
526  		/* inode */
527  		err = parse_reply_info_in(p, end, &rde->inode, features);
528  		if (err < 0)
529  			goto out_bad;
530  		/* ceph_readdir_prepopulate() will update it */
531  		rde->offset = 0;
532  		i++;
533  		num--;
534  	}
535  
536  done:
537  	/* Skip over any unrecognized fields */
538  	*p = end;
539  	return 0;
540  
541  bad:
542  	err = -EIO;
543  out_bad:
544  	pr_err_client(cl, "problem parsing dir contents %d\n", err);
545  	return err;
546  }
547  
548  /*
549   * parse fcntl F_GETLK results
550   */
parse_reply_info_filelock(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)551  static int parse_reply_info_filelock(void **p, void *end,
552  				     struct ceph_mds_reply_info_parsed *info,
553  				     u64 features)
554  {
555  	if (*p + sizeof(*info->filelock_reply) > end)
556  		goto bad;
557  
558  	info->filelock_reply = *p;
559  
560  	/* Skip over any unrecognized fields */
561  	*p = end;
562  	return 0;
563  bad:
564  	return -EIO;
565  }
566  
567  
568  #if BITS_PER_LONG == 64
569  
570  #define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
571  
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)572  static int ceph_parse_deleg_inos(void **p, void *end,
573  				 struct ceph_mds_session *s)
574  {
575  	struct ceph_client *cl = s->s_mdsc->fsc->client;
576  	u32 sets;
577  
578  	ceph_decode_32_safe(p, end, sets, bad);
579  	doutc(cl, "got %u sets of delegated inodes\n", sets);
580  	while (sets--) {
581  		u64 start, len;
582  
583  		ceph_decode_64_safe(p, end, start, bad);
584  		ceph_decode_64_safe(p, end, len, bad);
585  
586  		/* Don't accept a delegation of system inodes */
587  		if (start < CEPH_INO_SYSTEM_BASE) {
588  			pr_warn_ratelimited_client(cl,
589  				"ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
590  				start, len);
591  			continue;
592  		}
593  		while (len--) {
594  			int err = xa_insert(&s->s_delegated_inos, start++,
595  					    DELEGATED_INO_AVAILABLE,
596  					    GFP_KERNEL);
597  			if (!err) {
598  				doutc(cl, "added delegated inode 0x%llx\n", start - 1);
599  			} else if (err == -EBUSY) {
600  				pr_warn_client(cl,
601  					"MDS delegated inode 0x%llx more than once.\n",
602  					start - 1);
603  			} else {
604  				return err;
605  			}
606  		}
607  	}
608  	return 0;
609  bad:
610  	return -EIO;
611  }
612  
ceph_get_deleg_ino(struct ceph_mds_session * s)613  u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
614  {
615  	unsigned long ino;
616  	void *val;
617  
618  	xa_for_each(&s->s_delegated_inos, ino, val) {
619  		val = xa_erase(&s->s_delegated_inos, ino);
620  		if (val == DELEGATED_INO_AVAILABLE)
621  			return ino;
622  	}
623  	return 0;
624  }
625  
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)626  int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
627  {
628  	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
629  			 GFP_KERNEL);
630  }
631  #else /* BITS_PER_LONG == 64 */
632  /*
633   * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
634   * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
635   * and bottom words?
636   */
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)637  static int ceph_parse_deleg_inos(void **p, void *end,
638  				 struct ceph_mds_session *s)
639  {
640  	u32 sets;
641  
642  	ceph_decode_32_safe(p, end, sets, bad);
643  	if (sets)
644  		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
645  	return 0;
646  bad:
647  	return -EIO;
648  }
649  
ceph_get_deleg_ino(struct ceph_mds_session * s)650  u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
651  {
652  	return 0;
653  }
654  
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)655  int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
656  {
657  	return 0;
658  }
659  #endif /* BITS_PER_LONG == 64 */
660  
661  /*
662   * parse create results
663   */
parse_reply_info_create(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features,struct ceph_mds_session * s)664  static int parse_reply_info_create(void **p, void *end,
665  				  struct ceph_mds_reply_info_parsed *info,
666  				  u64 features, struct ceph_mds_session *s)
667  {
668  	int ret;
669  
670  	if (features == (u64)-1 ||
671  	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
672  		if (*p == end) {
673  			/* Malformed reply? */
674  			info->has_create_ino = false;
675  		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
676  			info->has_create_ino = true;
677  			/* struct_v, struct_compat, and len */
678  			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
679  			ceph_decode_64_safe(p, end, info->ino, bad);
680  			ret = ceph_parse_deleg_inos(p, end, s);
681  			if (ret)
682  				return ret;
683  		} else {
684  			/* legacy */
685  			ceph_decode_64_safe(p, end, info->ino, bad);
686  			info->has_create_ino = true;
687  		}
688  	} else {
689  		if (*p != end)
690  			goto bad;
691  	}
692  
693  	/* Skip over any unrecognized fields */
694  	*p = end;
695  	return 0;
696  bad:
697  	return -EIO;
698  }
699  
parse_reply_info_getvxattr(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)700  static int parse_reply_info_getvxattr(void **p, void *end,
701  				      struct ceph_mds_reply_info_parsed *info,
702  				      u64 features)
703  {
704  	u32 value_len;
705  
706  	ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
707  	ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
708  	ceph_decode_skip_32(p, end, bad); /* skip payload length */
709  
710  	ceph_decode_32_safe(p, end, value_len, bad);
711  
712  	if (value_len == end - *p) {
713  	  info->xattr_info.xattr_value = *p;
714  	  info->xattr_info.xattr_value_len = value_len;
715  	  *p = end;
716  	  return value_len;
717  	}
718  bad:
719  	return -EIO;
720  }
721  
722  /*
723   * parse extra results
724   */
parse_reply_info_extra(void ** p,void * end,struct ceph_mds_request * req,u64 features,struct ceph_mds_session * s)725  static int parse_reply_info_extra(void **p, void *end,
726  				  struct ceph_mds_request *req,
727  				  u64 features, struct ceph_mds_session *s)
728  {
729  	struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
730  	u32 op = le32_to_cpu(info->head->op);
731  
732  	if (op == CEPH_MDS_OP_GETFILELOCK)
733  		return parse_reply_info_filelock(p, end, info, features);
734  	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
735  		return parse_reply_info_readdir(p, end, req, features);
736  	else if (op == CEPH_MDS_OP_CREATE)
737  		return parse_reply_info_create(p, end, info, features, s);
738  	else if (op == CEPH_MDS_OP_GETVXATTR)
739  		return parse_reply_info_getvxattr(p, end, info, features);
740  	else
741  		return -EIO;
742  }
743  
744  /*
745   * parse entire mds reply
746   */
parse_reply_info(struct ceph_mds_session * s,struct ceph_msg * msg,struct ceph_mds_request * req,u64 features)747  static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
748  			    struct ceph_mds_request *req, u64 features)
749  {
750  	struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
751  	struct ceph_client *cl = s->s_mdsc->fsc->client;
752  	void *p, *end;
753  	u32 len;
754  	int err;
755  
756  	info->head = msg->front.iov_base;
757  	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
758  	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
759  
760  	/* trace */
761  	ceph_decode_32_safe(&p, end, len, bad);
762  	if (len > 0) {
763  		ceph_decode_need(&p, end, len, bad);
764  		err = parse_reply_info_trace(&p, p+len, info, features);
765  		if (err < 0)
766  			goto out_bad;
767  	}
768  
769  	/* extra */
770  	ceph_decode_32_safe(&p, end, len, bad);
771  	if (len > 0) {
772  		ceph_decode_need(&p, end, len, bad);
773  		err = parse_reply_info_extra(&p, p+len, req, features, s);
774  		if (err < 0)
775  			goto out_bad;
776  	}
777  
778  	/* snap blob */
779  	ceph_decode_32_safe(&p, end, len, bad);
780  	info->snapblob_len = len;
781  	info->snapblob = p;
782  	p += len;
783  
784  	if (p != end)
785  		goto bad;
786  	return 0;
787  
788  bad:
789  	err = -EIO;
790  out_bad:
791  	pr_err_client(cl, "mds parse_reply err %d\n", err);
792  	ceph_msg_dump(msg);
793  	return err;
794  }
795  
destroy_reply_info(struct ceph_mds_reply_info_parsed * info)796  static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
797  {
798  	int i;
799  
800  	kfree(info->diri.fscrypt_auth);
801  	kfree(info->diri.fscrypt_file);
802  	kfree(info->targeti.fscrypt_auth);
803  	kfree(info->targeti.fscrypt_file);
804  	if (!info->dir_entries)
805  		return;
806  
807  	for (i = 0; i < info->dir_nr; i++) {
808  		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
809  
810  		kfree(rde->inode.fscrypt_auth);
811  		kfree(rde->inode.fscrypt_file);
812  	}
813  	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
814  }
815  
816  /*
817   * In async unlink case the kclient won't wait for the first reply
818   * from MDS and just drop all the links and unhash the dentry and then
819   * succeeds immediately.
820   *
821   * For any new create/link/rename,etc requests followed by using the
822   * same file names we must wait for the first reply of the inflight
823   * unlink request, or the MDS possibly will fail these following
824   * requests with -EEXIST if the inflight async unlink request was
825   * delayed for some reasons.
826   *
827   * And the worst case is that for the none async openc request it will
828   * successfully open the file if the CDentry hasn't been unlinked yet,
829   * but later the previous delayed async unlink request will remove the
830   * CDenty. That means the just created file is possiblly deleted later
831   * by accident.
832   *
833   * We need to wait for the inflight async unlink requests to finish
834   * when creating new files/directories by using the same file names.
835   */
ceph_wait_on_conflict_unlink(struct dentry * dentry)836  int ceph_wait_on_conflict_unlink(struct dentry *dentry)
837  {
838  	struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
839  	struct ceph_client *cl = fsc->client;
840  	struct dentry *pdentry = dentry->d_parent;
841  	struct dentry *udentry, *found = NULL;
842  	struct ceph_dentry_info *di;
843  	struct qstr dname;
844  	u32 hash = dentry->d_name.hash;
845  	int err;
846  
847  	dname.name = dentry->d_name.name;
848  	dname.len = dentry->d_name.len;
849  
850  	rcu_read_lock();
851  	hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
852  				   hnode, hash) {
853  		udentry = di->dentry;
854  
855  		spin_lock(&udentry->d_lock);
856  		if (udentry->d_name.hash != hash)
857  			goto next;
858  		if (unlikely(udentry->d_parent != pdentry))
859  			goto next;
860  		if (!hash_hashed(&di->hnode))
861  			goto next;
862  
863  		if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
864  			pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n",
865  				       dentry, dentry);
866  
867  		if (!d_same_name(udentry, pdentry, &dname))
868  			goto next;
869  
870  		found = dget_dlock(udentry);
871  		spin_unlock(&udentry->d_lock);
872  		break;
873  next:
874  		spin_unlock(&udentry->d_lock);
875  	}
876  	rcu_read_unlock();
877  
878  	if (likely(!found))
879  		return 0;
880  
881  	doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry,
882  	      found, found);
883  
884  	err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
885  			  TASK_KILLABLE);
886  	dput(found);
887  	return err;
888  }
889  
890  
891  /*
892   * sessions
893   */
ceph_session_state_name(int s)894  const char *ceph_session_state_name(int s)
895  {
896  	switch (s) {
897  	case CEPH_MDS_SESSION_NEW: return "new";
898  	case CEPH_MDS_SESSION_OPENING: return "opening";
899  	case CEPH_MDS_SESSION_OPEN: return "open";
900  	case CEPH_MDS_SESSION_HUNG: return "hung";
901  	case CEPH_MDS_SESSION_CLOSING: return "closing";
902  	case CEPH_MDS_SESSION_CLOSED: return "closed";
903  	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
904  	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
905  	case CEPH_MDS_SESSION_REJECTED: return "rejected";
906  	default: return "???";
907  	}
908  }
909  
ceph_get_mds_session(struct ceph_mds_session * s)910  struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
911  {
912  	if (refcount_inc_not_zero(&s->s_ref))
913  		return s;
914  	return NULL;
915  }
916  
ceph_put_mds_session(struct ceph_mds_session * s)917  void ceph_put_mds_session(struct ceph_mds_session *s)
918  {
919  	if (IS_ERR_OR_NULL(s))
920  		return;
921  
922  	if (refcount_dec_and_test(&s->s_ref)) {
923  		if (s->s_auth.authorizer)
924  			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
925  		WARN_ON(mutex_is_locked(&s->s_mutex));
926  		xa_destroy(&s->s_delegated_inos);
927  		kfree(s);
928  	}
929  }
930  
931  /*
932   * called under mdsc->mutex
933   */
__ceph_lookup_mds_session(struct ceph_mds_client * mdsc,int mds)934  struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
935  						   int mds)
936  {
937  	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
938  		return NULL;
939  	return ceph_get_mds_session(mdsc->sessions[mds]);
940  }
941  
__have_session(struct ceph_mds_client * mdsc,int mds)942  static bool __have_session(struct ceph_mds_client *mdsc, int mds)
943  {
944  	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
945  		return false;
946  	else
947  		return true;
948  }
949  
__verify_registered_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)950  static int __verify_registered_session(struct ceph_mds_client *mdsc,
951  				       struct ceph_mds_session *s)
952  {
953  	if (s->s_mds >= mdsc->max_sessions ||
954  	    mdsc->sessions[s->s_mds] != s)
955  		return -ENOENT;
956  	return 0;
957  }
958  
959  /*
960   * create+register a new session for given mds.
961   * called under mdsc->mutex.
962   */
register_session(struct ceph_mds_client * mdsc,int mds)963  static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
964  						 int mds)
965  {
966  	struct ceph_client *cl = mdsc->fsc->client;
967  	struct ceph_mds_session *s;
968  
969  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
970  		return ERR_PTR(-EIO);
971  
972  	if (mds >= mdsc->mdsmap->possible_max_rank)
973  		return ERR_PTR(-EINVAL);
974  
975  	s = kzalloc(sizeof(*s), GFP_NOFS);
976  	if (!s)
977  		return ERR_PTR(-ENOMEM);
978  
979  	if (mds >= mdsc->max_sessions) {
980  		int newmax = 1 << get_count_order(mds + 1);
981  		struct ceph_mds_session **sa;
982  
983  		doutc(cl, "realloc to %d\n", newmax);
984  		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
985  		if (!sa)
986  			goto fail_realloc;
987  		if (mdsc->sessions) {
988  			memcpy(sa, mdsc->sessions,
989  			       mdsc->max_sessions * sizeof(void *));
990  			kfree(mdsc->sessions);
991  		}
992  		mdsc->sessions = sa;
993  		mdsc->max_sessions = newmax;
994  	}
995  
996  	doutc(cl, "mds%d\n", mds);
997  	s->s_mdsc = mdsc;
998  	s->s_mds = mds;
999  	s->s_state = CEPH_MDS_SESSION_NEW;
1000  	mutex_init(&s->s_mutex);
1001  
1002  	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
1003  
1004  	atomic_set(&s->s_cap_gen, 1);
1005  	s->s_cap_ttl = jiffies - 1;
1006  
1007  	spin_lock_init(&s->s_cap_lock);
1008  	INIT_LIST_HEAD(&s->s_caps);
1009  	refcount_set(&s->s_ref, 1);
1010  	INIT_LIST_HEAD(&s->s_waiting);
1011  	INIT_LIST_HEAD(&s->s_unsafe);
1012  	xa_init(&s->s_delegated_inos);
1013  	INIT_LIST_HEAD(&s->s_cap_releases);
1014  	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1015  
1016  	INIT_LIST_HEAD(&s->s_cap_dirty);
1017  	INIT_LIST_HEAD(&s->s_cap_flushing);
1018  
1019  	mdsc->sessions[mds] = s;
1020  	atomic_inc(&mdsc->num_sessions);
1021  	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
1022  
1023  	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1024  		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1025  
1026  	return s;
1027  
1028  fail_realloc:
1029  	kfree(s);
1030  	return ERR_PTR(-ENOMEM);
1031  }
1032  
1033  /*
1034   * called under mdsc->mutex
1035   */
__unregister_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)1036  static void __unregister_session(struct ceph_mds_client *mdsc,
1037  			       struct ceph_mds_session *s)
1038  {
1039  	doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s);
1040  	BUG_ON(mdsc->sessions[s->s_mds] != s);
1041  	mdsc->sessions[s->s_mds] = NULL;
1042  	ceph_con_close(&s->s_con);
1043  	ceph_put_mds_session(s);
1044  	atomic_dec(&mdsc->num_sessions);
1045  }
1046  
1047  /*
1048   * drop session refs in request.
1049   *
1050   * should be last request ref, or hold mdsc->mutex
1051   */
put_request_session(struct ceph_mds_request * req)1052  static void put_request_session(struct ceph_mds_request *req)
1053  {
1054  	if (req->r_session) {
1055  		ceph_put_mds_session(req->r_session);
1056  		req->r_session = NULL;
1057  	}
1058  }
1059  
ceph_mdsc_iterate_sessions(struct ceph_mds_client * mdsc,void (* cb)(struct ceph_mds_session *),bool check_state)1060  void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1061  				void (*cb)(struct ceph_mds_session *),
1062  				bool check_state)
1063  {
1064  	int mds;
1065  
1066  	mutex_lock(&mdsc->mutex);
1067  	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1068  		struct ceph_mds_session *s;
1069  
1070  		s = __ceph_lookup_mds_session(mdsc, mds);
1071  		if (!s)
1072  			continue;
1073  
1074  		if (check_state && !check_session_state(s)) {
1075  			ceph_put_mds_session(s);
1076  			continue;
1077  		}
1078  
1079  		mutex_unlock(&mdsc->mutex);
1080  		cb(s);
1081  		ceph_put_mds_session(s);
1082  		mutex_lock(&mdsc->mutex);
1083  	}
1084  	mutex_unlock(&mdsc->mutex);
1085  }
1086  
ceph_mdsc_release_request(struct kref * kref)1087  void ceph_mdsc_release_request(struct kref *kref)
1088  {
1089  	struct ceph_mds_request *req = container_of(kref,
1090  						    struct ceph_mds_request,
1091  						    r_kref);
1092  	ceph_mdsc_release_dir_caps_async(req);
1093  	destroy_reply_info(&req->r_reply_info);
1094  	if (req->r_request)
1095  		ceph_msg_put(req->r_request);
1096  	if (req->r_reply)
1097  		ceph_msg_put(req->r_reply);
1098  	if (req->r_inode) {
1099  		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1100  		iput(req->r_inode);
1101  	}
1102  	if (req->r_parent) {
1103  		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1104  		iput(req->r_parent);
1105  	}
1106  	iput(req->r_target_inode);
1107  	iput(req->r_new_inode);
1108  	if (req->r_dentry)
1109  		dput(req->r_dentry);
1110  	if (req->r_old_dentry)
1111  		dput(req->r_old_dentry);
1112  	if (req->r_old_dentry_dir) {
1113  		/*
1114  		 * track (and drop pins for) r_old_dentry_dir
1115  		 * separately, since r_old_dentry's d_parent may have
1116  		 * changed between the dir mutex being dropped and
1117  		 * this request being freed.
1118  		 */
1119  		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1120  				  CEPH_CAP_PIN);
1121  		iput(req->r_old_dentry_dir);
1122  	}
1123  	kfree(req->r_path1);
1124  	kfree(req->r_path2);
1125  	put_cred(req->r_cred);
1126  	if (req->r_mnt_idmap)
1127  		mnt_idmap_put(req->r_mnt_idmap);
1128  	if (req->r_pagelist)
1129  		ceph_pagelist_release(req->r_pagelist);
1130  	kfree(req->r_fscrypt_auth);
1131  	kfree(req->r_altname);
1132  	put_request_session(req);
1133  	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1134  	WARN_ON_ONCE(!list_empty(&req->r_wait));
1135  	kmem_cache_free(ceph_mds_request_cachep, req);
1136  }
1137  
DEFINE_RB_FUNCS(request,struct ceph_mds_request,r_tid,r_node)1138  DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1139  
1140  /*
1141   * lookup session, bump ref if found.
1142   *
1143   * called under mdsc->mutex.
1144   */
1145  static struct ceph_mds_request *
1146  lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1147  {
1148  	struct ceph_mds_request *req;
1149  
1150  	req = lookup_request(&mdsc->request_tree, tid);
1151  	if (req)
1152  		ceph_mdsc_get_request(req);
1153  
1154  	return req;
1155  }
1156  
1157  /*
1158   * Register an in-flight request, and assign a tid.  Link to directory
1159   * are modifying (if any).
1160   *
1161   * Called under mdsc->mutex.
1162   */
__register_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,struct inode * dir)1163  static void __register_request(struct ceph_mds_client *mdsc,
1164  			       struct ceph_mds_request *req,
1165  			       struct inode *dir)
1166  {
1167  	struct ceph_client *cl = mdsc->fsc->client;
1168  	int ret = 0;
1169  
1170  	req->r_tid = ++mdsc->last_tid;
1171  	if (req->r_num_caps) {
1172  		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1173  					req->r_num_caps);
1174  		if (ret < 0) {
1175  			pr_err_client(cl, "%p failed to reserve caps: %d\n",
1176  				      req, ret);
1177  			/* set req->r_err to fail early from __do_request */
1178  			req->r_err = ret;
1179  			return;
1180  		}
1181  	}
1182  	doutc(cl, "%p tid %lld\n", req, req->r_tid);
1183  	ceph_mdsc_get_request(req);
1184  	insert_request(&mdsc->request_tree, req);
1185  
1186  	req->r_cred = get_current_cred();
1187  	if (!req->r_mnt_idmap)
1188  		req->r_mnt_idmap = &nop_mnt_idmap;
1189  
1190  	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1191  		mdsc->oldest_tid = req->r_tid;
1192  
1193  	if (dir) {
1194  		struct ceph_inode_info *ci = ceph_inode(dir);
1195  
1196  		ihold(dir);
1197  		req->r_unsafe_dir = dir;
1198  		spin_lock(&ci->i_unsafe_lock);
1199  		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1200  		spin_unlock(&ci->i_unsafe_lock);
1201  	}
1202  }
1203  
__unregister_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)1204  static void __unregister_request(struct ceph_mds_client *mdsc,
1205  				 struct ceph_mds_request *req)
1206  {
1207  	doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid);
1208  
1209  	/* Never leave an unregistered request on an unsafe list! */
1210  	list_del_init(&req->r_unsafe_item);
1211  
1212  	if (req->r_tid == mdsc->oldest_tid) {
1213  		struct rb_node *p = rb_next(&req->r_node);
1214  		mdsc->oldest_tid = 0;
1215  		while (p) {
1216  			struct ceph_mds_request *next_req =
1217  				rb_entry(p, struct ceph_mds_request, r_node);
1218  			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1219  				mdsc->oldest_tid = next_req->r_tid;
1220  				break;
1221  			}
1222  			p = rb_next(p);
1223  		}
1224  	}
1225  
1226  	erase_request(&mdsc->request_tree, req);
1227  
1228  	if (req->r_unsafe_dir) {
1229  		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1230  		spin_lock(&ci->i_unsafe_lock);
1231  		list_del_init(&req->r_unsafe_dir_item);
1232  		spin_unlock(&ci->i_unsafe_lock);
1233  	}
1234  	if (req->r_target_inode &&
1235  	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1236  		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1237  		spin_lock(&ci->i_unsafe_lock);
1238  		list_del_init(&req->r_unsafe_target_item);
1239  		spin_unlock(&ci->i_unsafe_lock);
1240  	}
1241  
1242  	if (req->r_unsafe_dir) {
1243  		iput(req->r_unsafe_dir);
1244  		req->r_unsafe_dir = NULL;
1245  	}
1246  
1247  	complete_all(&req->r_safe_completion);
1248  
1249  	ceph_mdsc_put_request(req);
1250  }
1251  
1252  /*
1253   * Walk back up the dentry tree until we hit a dentry representing a
1254   * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1255   * when calling this) to ensure that the objects won't disappear while we're
1256   * working with them. Once we hit a candidate dentry, we attempt to take a
1257   * reference to it, and return that as the result.
1258   */
get_nonsnap_parent(struct dentry * dentry)1259  static struct inode *get_nonsnap_parent(struct dentry *dentry)
1260  {
1261  	struct inode *inode = NULL;
1262  
1263  	while (dentry && !IS_ROOT(dentry)) {
1264  		inode = d_inode_rcu(dentry);
1265  		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1266  			break;
1267  		dentry = dentry->d_parent;
1268  	}
1269  	if (inode)
1270  		inode = igrab(inode);
1271  	return inode;
1272  }
1273  
1274  /*
1275   * Choose mds to send request to next.  If there is a hint set in the
1276   * request (e.g., due to a prior forward hint from the mds), use that.
1277   * Otherwise, consult frag tree and/or caps to identify the
1278   * appropriate mds.  If all else fails, choose randomly.
1279   *
1280   * Called under mdsc->mutex.
1281   */
__choose_mds(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,bool * random)1282  static int __choose_mds(struct ceph_mds_client *mdsc,
1283  			struct ceph_mds_request *req,
1284  			bool *random)
1285  {
1286  	struct inode *inode;
1287  	struct ceph_inode_info *ci;
1288  	struct ceph_cap *cap;
1289  	int mode = req->r_direct_mode;
1290  	int mds = -1;
1291  	u32 hash = req->r_direct_hash;
1292  	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1293  	struct ceph_client *cl = mdsc->fsc->client;
1294  
1295  	if (random)
1296  		*random = false;
1297  
1298  	/*
1299  	 * is there a specific mds we should try?  ignore hint if we have
1300  	 * no session and the mds is not up (active or recovering).
1301  	 */
1302  	if (req->r_resend_mds >= 0 &&
1303  	    (__have_session(mdsc, req->r_resend_mds) ||
1304  	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1305  		doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds);
1306  		return req->r_resend_mds;
1307  	}
1308  
1309  	if (mode == USE_RANDOM_MDS)
1310  		goto random;
1311  
1312  	inode = NULL;
1313  	if (req->r_inode) {
1314  		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1315  			inode = req->r_inode;
1316  			ihold(inode);
1317  		} else {
1318  			/* req->r_dentry is non-null for LSSNAP request */
1319  			rcu_read_lock();
1320  			inode = get_nonsnap_parent(req->r_dentry);
1321  			rcu_read_unlock();
1322  			doutc(cl, "using snapdir's parent %p %llx.%llx\n",
1323  			      inode, ceph_vinop(inode));
1324  		}
1325  	} else if (req->r_dentry) {
1326  		/* ignore race with rename; old or new d_parent is okay */
1327  		struct dentry *parent;
1328  		struct inode *dir;
1329  
1330  		rcu_read_lock();
1331  		parent = READ_ONCE(req->r_dentry->d_parent);
1332  		dir = req->r_parent ? : d_inode_rcu(parent);
1333  
1334  		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1335  			/*  not this fs or parent went negative */
1336  			inode = d_inode(req->r_dentry);
1337  			if (inode)
1338  				ihold(inode);
1339  		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1340  			/* direct snapped/virtual snapdir requests
1341  			 * based on parent dir inode */
1342  			inode = get_nonsnap_parent(parent);
1343  			doutc(cl, "using nonsnap parent %p %llx.%llx\n",
1344  			      inode, ceph_vinop(inode));
1345  		} else {
1346  			/* dentry target */
1347  			inode = d_inode(req->r_dentry);
1348  			if (!inode || mode == USE_AUTH_MDS) {
1349  				/* dir + name */
1350  				inode = igrab(dir);
1351  				hash = ceph_dentry_hash(dir, req->r_dentry);
1352  				is_hash = true;
1353  			} else {
1354  				ihold(inode);
1355  			}
1356  		}
1357  		rcu_read_unlock();
1358  	}
1359  
1360  	if (!inode)
1361  		goto random;
1362  
1363  	doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode,
1364  	      ceph_vinop(inode), (int)is_hash, hash, mode);
1365  	ci = ceph_inode(inode);
1366  
1367  	if (is_hash && S_ISDIR(inode->i_mode)) {
1368  		struct ceph_inode_frag frag;
1369  		int found;
1370  
1371  		ceph_choose_frag(ci, hash, &frag, &found);
1372  		if (found) {
1373  			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1374  				u8 r;
1375  
1376  				/* choose a random replica */
1377  				get_random_bytes(&r, 1);
1378  				r %= frag.ndist;
1379  				mds = frag.dist[r];
1380  				doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
1381  				      inode, ceph_vinop(inode), frag.frag,
1382  				      mds, (int)r, frag.ndist);
1383  				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1384  				    CEPH_MDS_STATE_ACTIVE &&
1385  				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1386  					goto out;
1387  			}
1388  
1389  			/* since this file/dir wasn't known to be
1390  			 * replicated, then we want to look for the
1391  			 * authoritative mds. */
1392  			if (frag.mds >= 0) {
1393  				/* choose auth mds */
1394  				mds = frag.mds;
1395  				doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
1396  				      inode, ceph_vinop(inode), frag.frag, mds);
1397  				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1398  				    CEPH_MDS_STATE_ACTIVE) {
1399  					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1400  								  mds))
1401  						goto out;
1402  				}
1403  			}
1404  			mode = USE_AUTH_MDS;
1405  		}
1406  	}
1407  
1408  	spin_lock(&ci->i_ceph_lock);
1409  	cap = NULL;
1410  	if (mode == USE_AUTH_MDS)
1411  		cap = ci->i_auth_cap;
1412  	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1413  		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1414  	if (!cap) {
1415  		spin_unlock(&ci->i_ceph_lock);
1416  		iput(inode);
1417  		goto random;
1418  	}
1419  	mds = cap->session->s_mds;
1420  	doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
1421  	      ceph_vinop(inode), mds,
1422  	      cap == ci->i_auth_cap ? "auth " : "", cap);
1423  	spin_unlock(&ci->i_ceph_lock);
1424  out:
1425  	iput(inode);
1426  	return mds;
1427  
1428  random:
1429  	if (random)
1430  		*random = true;
1431  
1432  	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1433  	doutc(cl, "chose random mds%d\n", mds);
1434  	return mds;
1435  }
1436  
1437  
1438  /*
1439   * session messages
1440   */
ceph_create_session_msg(u32 op,u64 seq)1441  struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1442  {
1443  	struct ceph_msg *msg;
1444  	struct ceph_mds_session_head *h;
1445  
1446  	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1447  			   false);
1448  	if (!msg) {
1449  		pr_err("ENOMEM creating session %s msg\n",
1450  		       ceph_session_op_name(op));
1451  		return NULL;
1452  	}
1453  	h = msg->front.iov_base;
1454  	h->op = cpu_to_le32(op);
1455  	h->seq = cpu_to_le64(seq);
1456  
1457  	return msg;
1458  }
1459  
1460  static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1461  #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
encode_supported_features(void ** p,void * end)1462  static int encode_supported_features(void **p, void *end)
1463  {
1464  	static const size_t count = ARRAY_SIZE(feature_bits);
1465  
1466  	if (count > 0) {
1467  		size_t i;
1468  		size_t size = FEATURE_BYTES(count);
1469  		unsigned long bit;
1470  
1471  		if (WARN_ON_ONCE(*p + 4 + size > end))
1472  			return -ERANGE;
1473  
1474  		ceph_encode_32(p, size);
1475  		memset(*p, 0, size);
1476  		for (i = 0; i < count; i++) {
1477  			bit = feature_bits[i];
1478  			((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1479  		}
1480  		*p += size;
1481  	} else {
1482  		if (WARN_ON_ONCE(*p + 4 > end))
1483  			return -ERANGE;
1484  
1485  		ceph_encode_32(p, 0);
1486  	}
1487  
1488  	return 0;
1489  }
1490  
1491  static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1492  #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
encode_metric_spec(void ** p,void * end)1493  static int encode_metric_spec(void **p, void *end)
1494  {
1495  	static const size_t count = ARRAY_SIZE(metric_bits);
1496  
1497  	/* header */
1498  	if (WARN_ON_ONCE(*p + 2 > end))
1499  		return -ERANGE;
1500  
1501  	ceph_encode_8(p, 1); /* version */
1502  	ceph_encode_8(p, 1); /* compat */
1503  
1504  	if (count > 0) {
1505  		size_t i;
1506  		size_t size = METRIC_BYTES(count);
1507  
1508  		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1509  			return -ERANGE;
1510  
1511  		/* metric spec info length */
1512  		ceph_encode_32(p, 4 + size);
1513  
1514  		/* metric spec */
1515  		ceph_encode_32(p, size);
1516  		memset(*p, 0, size);
1517  		for (i = 0; i < count; i++)
1518  			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1519  		*p += size;
1520  	} else {
1521  		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1522  			return -ERANGE;
1523  
1524  		/* metric spec info length */
1525  		ceph_encode_32(p, 4);
1526  		/* metric spec */
1527  		ceph_encode_32(p, 0);
1528  	}
1529  
1530  	return 0;
1531  }
1532  
1533  /*
1534   * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1535   * to include additional client metadata fields.
1536   */
1537  static struct ceph_msg *
create_session_full_msg(struct ceph_mds_client * mdsc,int op,u64 seq)1538  create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq)
1539  {
1540  	struct ceph_msg *msg;
1541  	struct ceph_mds_session_head *h;
1542  	int i;
1543  	int extra_bytes = 0;
1544  	int metadata_key_count = 0;
1545  	struct ceph_options *opt = mdsc->fsc->client->options;
1546  	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1547  	struct ceph_client *cl = mdsc->fsc->client;
1548  	size_t size, count;
1549  	void *p, *end;
1550  	int ret;
1551  
1552  	const char* metadata[][2] = {
1553  		{"hostname", mdsc->nodename},
1554  		{"kernel_version", init_utsname()->release},
1555  		{"entity_id", opt->name ? : ""},
1556  		{"root", fsopt->server_path ? : "/"},
1557  		{NULL, NULL}
1558  	};
1559  
1560  	/* Calculate serialized length of metadata */
1561  	extra_bytes = 4;  /* map length */
1562  	for (i = 0; metadata[i][0]; ++i) {
1563  		extra_bytes += 8 + strlen(metadata[i][0]) +
1564  			strlen(metadata[i][1]);
1565  		metadata_key_count++;
1566  	}
1567  
1568  	/* supported feature */
1569  	size = 0;
1570  	count = ARRAY_SIZE(feature_bits);
1571  	if (count > 0)
1572  		size = FEATURE_BYTES(count);
1573  	extra_bytes += 4 + size;
1574  
1575  	/* metric spec */
1576  	size = 0;
1577  	count = ARRAY_SIZE(metric_bits);
1578  	if (count > 0)
1579  		size = METRIC_BYTES(count);
1580  	extra_bytes += 2 + 4 + 4 + size;
1581  
1582  	/* flags, mds auth caps and oldest_client_tid */
1583  	extra_bytes += 4 + 4 + 8;
1584  
1585  	/* Allocate the message */
1586  	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1587  			   GFP_NOFS, false);
1588  	if (!msg) {
1589  		pr_err_client(cl, "ENOMEM creating session open msg\n");
1590  		return ERR_PTR(-ENOMEM);
1591  	}
1592  	p = msg->front.iov_base;
1593  	end = p + msg->front.iov_len;
1594  
1595  	h = p;
1596  	h->op = cpu_to_le32(op);
1597  	h->seq = cpu_to_le64(seq);
1598  
1599  	/*
1600  	 * Serialize client metadata into waiting buffer space, using
1601  	 * the format that userspace expects for map<string, string>
1602  	 *
1603  	 * ClientSession messages with metadata are v7
1604  	 */
1605  	msg->hdr.version = cpu_to_le16(7);
1606  	msg->hdr.compat_version = cpu_to_le16(1);
1607  
1608  	/* The write pointer, following the session_head structure */
1609  	p += sizeof(*h);
1610  
1611  	/* Number of entries in the map */
1612  	ceph_encode_32(&p, metadata_key_count);
1613  
1614  	/* Two length-prefixed strings for each entry in the map */
1615  	for (i = 0; metadata[i][0]; ++i) {
1616  		size_t const key_len = strlen(metadata[i][0]);
1617  		size_t const val_len = strlen(metadata[i][1]);
1618  
1619  		ceph_encode_32(&p, key_len);
1620  		memcpy(p, metadata[i][0], key_len);
1621  		p += key_len;
1622  		ceph_encode_32(&p, val_len);
1623  		memcpy(p, metadata[i][1], val_len);
1624  		p += val_len;
1625  	}
1626  
1627  	ret = encode_supported_features(&p, end);
1628  	if (ret) {
1629  		pr_err_client(cl, "encode_supported_features failed!\n");
1630  		ceph_msg_put(msg);
1631  		return ERR_PTR(ret);
1632  	}
1633  
1634  	ret = encode_metric_spec(&p, end);
1635  	if (ret) {
1636  		pr_err_client(cl, "encode_metric_spec failed!\n");
1637  		ceph_msg_put(msg);
1638  		return ERR_PTR(ret);
1639  	}
1640  
1641  	/* version == 5, flags */
1642  	ceph_encode_32(&p, 0);
1643  
1644  	/* version == 6, mds auth caps */
1645  	ceph_encode_32(&p, 0);
1646  
1647  	/* version == 7, oldest_client_tid */
1648  	ceph_encode_64(&p, mdsc->oldest_tid);
1649  
1650  	msg->front.iov_len = p - msg->front.iov_base;
1651  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1652  
1653  	return msg;
1654  }
1655  
1656  /*
1657   * send session open request.
1658   *
1659   * called under mdsc->mutex
1660   */
__open_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1661  static int __open_session(struct ceph_mds_client *mdsc,
1662  			  struct ceph_mds_session *session)
1663  {
1664  	struct ceph_msg *msg;
1665  	int mstate;
1666  	int mds = session->s_mds;
1667  
1668  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1669  		return -EIO;
1670  
1671  	/* wait for mds to go active? */
1672  	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1673  	doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
1674  	      ceph_mds_state_name(mstate));
1675  	session->s_state = CEPH_MDS_SESSION_OPENING;
1676  	session->s_renew_requested = jiffies;
1677  
1678  	/* send connect message */
1679  	msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN,
1680  				      session->s_seq);
1681  	if (IS_ERR(msg))
1682  		return PTR_ERR(msg);
1683  	ceph_con_send(&session->s_con, msg);
1684  	return 0;
1685  }
1686  
1687  /*
1688   * open sessions for any export targets for the given mds
1689   *
1690   * called under mdsc->mutex
1691   */
1692  static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client * mdsc,int target)1693  __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1694  {
1695  	struct ceph_mds_session *session;
1696  	int ret;
1697  
1698  	session = __ceph_lookup_mds_session(mdsc, target);
1699  	if (!session) {
1700  		session = register_session(mdsc, target);
1701  		if (IS_ERR(session))
1702  			return session;
1703  	}
1704  	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1705  	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1706  		ret = __open_session(mdsc, session);
1707  		if (ret)
1708  			return ERR_PTR(ret);
1709  	}
1710  
1711  	return session;
1712  }
1713  
1714  struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client * mdsc,int target)1715  ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1716  {
1717  	struct ceph_mds_session *session;
1718  	struct ceph_client *cl = mdsc->fsc->client;
1719  
1720  	doutc(cl, "to mds%d\n", target);
1721  
1722  	mutex_lock(&mdsc->mutex);
1723  	session = __open_export_target_session(mdsc, target);
1724  	mutex_unlock(&mdsc->mutex);
1725  
1726  	return session;
1727  }
1728  
__open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1729  static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1730  					  struct ceph_mds_session *session)
1731  {
1732  	struct ceph_mds_info *mi;
1733  	struct ceph_mds_session *ts;
1734  	int i, mds = session->s_mds;
1735  	struct ceph_client *cl = mdsc->fsc->client;
1736  
1737  	if (mds >= mdsc->mdsmap->possible_max_rank)
1738  		return;
1739  
1740  	mi = &mdsc->mdsmap->m_info[mds];
1741  	doutc(cl, "for mds%d (%d targets)\n", session->s_mds,
1742  	      mi->num_export_targets);
1743  
1744  	for (i = 0; i < mi->num_export_targets; i++) {
1745  		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1746  		ceph_put_mds_session(ts);
1747  	}
1748  }
1749  
ceph_mdsc_open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1750  void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1751  					   struct ceph_mds_session *session)
1752  {
1753  	mutex_lock(&mdsc->mutex);
1754  	__open_export_target_sessions(mdsc, session);
1755  	mutex_unlock(&mdsc->mutex);
1756  }
1757  
1758  /*
1759   * session caps
1760   */
1761  
detach_cap_releases(struct ceph_mds_session * session,struct list_head * target)1762  static void detach_cap_releases(struct ceph_mds_session *session,
1763  				struct list_head *target)
1764  {
1765  	struct ceph_client *cl = session->s_mdsc->fsc->client;
1766  
1767  	lockdep_assert_held(&session->s_cap_lock);
1768  
1769  	list_splice_init(&session->s_cap_releases, target);
1770  	session->s_num_cap_releases = 0;
1771  	doutc(cl, "mds%d\n", session->s_mds);
1772  }
1773  
dispose_cap_releases(struct ceph_mds_client * mdsc,struct list_head * dispose)1774  static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1775  				 struct list_head *dispose)
1776  {
1777  	while (!list_empty(dispose)) {
1778  		struct ceph_cap *cap;
1779  		/* zero out the in-progress message */
1780  		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1781  		list_del(&cap->session_caps);
1782  		ceph_put_cap(mdsc, cap);
1783  	}
1784  }
1785  
cleanup_session_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1786  static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1787  				     struct ceph_mds_session *session)
1788  {
1789  	struct ceph_client *cl = mdsc->fsc->client;
1790  	struct ceph_mds_request *req;
1791  	struct rb_node *p;
1792  
1793  	doutc(cl, "mds%d\n", session->s_mds);
1794  	mutex_lock(&mdsc->mutex);
1795  	while (!list_empty(&session->s_unsafe)) {
1796  		req = list_first_entry(&session->s_unsafe,
1797  				       struct ceph_mds_request, r_unsafe_item);
1798  		pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
1799  					   req->r_tid);
1800  		if (req->r_target_inode)
1801  			mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1802  		if (req->r_unsafe_dir)
1803  			mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1804  		__unregister_request(mdsc, req);
1805  	}
1806  	/* zero r_attempts, so kick_requests() will re-send requests */
1807  	p = rb_first(&mdsc->request_tree);
1808  	while (p) {
1809  		req = rb_entry(p, struct ceph_mds_request, r_node);
1810  		p = rb_next(p);
1811  		if (req->r_session &&
1812  		    req->r_session->s_mds == session->s_mds)
1813  			req->r_attempts = 0;
1814  	}
1815  	mutex_unlock(&mdsc->mutex);
1816  }
1817  
1818  /*
1819   * Helper to safely iterate over all caps associated with a session, with
1820   * special care taken to handle a racing __ceph_remove_cap().
1821   *
1822   * Caller must hold session s_mutex.
1823   */
ceph_iterate_session_caps(struct ceph_mds_session * session,int (* cb)(struct inode *,int mds,void *),void * arg)1824  int ceph_iterate_session_caps(struct ceph_mds_session *session,
1825  			      int (*cb)(struct inode *, int mds, void *),
1826  			      void *arg)
1827  {
1828  	struct ceph_client *cl = session->s_mdsc->fsc->client;
1829  	struct list_head *p;
1830  	struct ceph_cap *cap;
1831  	struct inode *inode, *last_inode = NULL;
1832  	struct ceph_cap *old_cap = NULL;
1833  	int ret;
1834  
1835  	doutc(cl, "%p mds%d\n", session, session->s_mds);
1836  	spin_lock(&session->s_cap_lock);
1837  	p = session->s_caps.next;
1838  	while (p != &session->s_caps) {
1839  		int mds;
1840  
1841  		cap = list_entry(p, struct ceph_cap, session_caps);
1842  		inode = igrab(&cap->ci->netfs.inode);
1843  		if (!inode) {
1844  			p = p->next;
1845  			continue;
1846  		}
1847  		session->s_cap_iterator = cap;
1848  		mds = cap->mds;
1849  		spin_unlock(&session->s_cap_lock);
1850  
1851  		if (last_inode) {
1852  			iput(last_inode);
1853  			last_inode = NULL;
1854  		}
1855  		if (old_cap) {
1856  			ceph_put_cap(session->s_mdsc, old_cap);
1857  			old_cap = NULL;
1858  		}
1859  
1860  		ret = cb(inode, mds, arg);
1861  		last_inode = inode;
1862  
1863  		spin_lock(&session->s_cap_lock);
1864  		p = p->next;
1865  		if (!cap->ci) {
1866  			doutc(cl, "finishing cap %p removal\n", cap);
1867  			BUG_ON(cap->session != session);
1868  			cap->session = NULL;
1869  			list_del_init(&cap->session_caps);
1870  			session->s_nr_caps--;
1871  			atomic64_dec(&session->s_mdsc->metric.total_caps);
1872  			if (cap->queue_release)
1873  				__ceph_queue_cap_release(session, cap);
1874  			else
1875  				old_cap = cap;  /* put_cap it w/o locks held */
1876  		}
1877  		if (ret < 0)
1878  			goto out;
1879  	}
1880  	ret = 0;
1881  out:
1882  	session->s_cap_iterator = NULL;
1883  	spin_unlock(&session->s_cap_lock);
1884  
1885  	iput(last_inode);
1886  	if (old_cap)
1887  		ceph_put_cap(session->s_mdsc, old_cap);
1888  
1889  	return ret;
1890  }
1891  
remove_session_caps_cb(struct inode * inode,int mds,void * arg)1892  static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1893  {
1894  	struct ceph_inode_info *ci = ceph_inode(inode);
1895  	struct ceph_client *cl = ceph_inode_to_client(inode);
1896  	bool invalidate = false;
1897  	struct ceph_cap *cap;
1898  	int iputs = 0;
1899  
1900  	spin_lock(&ci->i_ceph_lock);
1901  	cap = __get_cap_for_mds(ci, mds);
1902  	if (cap) {
1903  		doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
1904  		      cap, ci, &ci->netfs.inode);
1905  
1906  		iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1907  	}
1908  	spin_unlock(&ci->i_ceph_lock);
1909  
1910  	if (cap)
1911  		wake_up_all(&ci->i_cap_wq);
1912  	if (invalidate)
1913  		ceph_queue_invalidate(inode);
1914  	while (iputs--)
1915  		iput(inode);
1916  	return 0;
1917  }
1918  
1919  /*
1920   * caller must hold session s_mutex
1921   */
remove_session_caps(struct ceph_mds_session * session)1922  static void remove_session_caps(struct ceph_mds_session *session)
1923  {
1924  	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1925  	struct super_block *sb = fsc->sb;
1926  	LIST_HEAD(dispose);
1927  
1928  	doutc(fsc->client, "on %p\n", session);
1929  	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1930  
1931  	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1932  
1933  	spin_lock(&session->s_cap_lock);
1934  	if (session->s_nr_caps > 0) {
1935  		struct inode *inode;
1936  		struct ceph_cap *cap, *prev = NULL;
1937  		struct ceph_vino vino;
1938  		/*
1939  		 * iterate_session_caps() skips inodes that are being
1940  		 * deleted, we need to wait until deletions are complete.
1941  		 * __wait_on_freeing_inode() is designed for the job,
1942  		 * but it is not exported, so use lookup inode function
1943  		 * to access it.
1944  		 */
1945  		while (!list_empty(&session->s_caps)) {
1946  			cap = list_entry(session->s_caps.next,
1947  					 struct ceph_cap, session_caps);
1948  			if (cap == prev)
1949  				break;
1950  			prev = cap;
1951  			vino = cap->ci->i_vino;
1952  			spin_unlock(&session->s_cap_lock);
1953  
1954  			inode = ceph_find_inode(sb, vino);
1955  			iput(inode);
1956  
1957  			spin_lock(&session->s_cap_lock);
1958  		}
1959  	}
1960  
1961  	// drop cap expires and unlock s_cap_lock
1962  	detach_cap_releases(session, &dispose);
1963  
1964  	BUG_ON(session->s_nr_caps > 0);
1965  	BUG_ON(!list_empty(&session->s_cap_flushing));
1966  	spin_unlock(&session->s_cap_lock);
1967  	dispose_cap_releases(session->s_mdsc, &dispose);
1968  }
1969  
1970  enum {
1971  	RECONNECT,
1972  	RENEWCAPS,
1973  	FORCE_RO,
1974  };
1975  
1976  /*
1977   * wake up any threads waiting on this session's caps.  if the cap is
1978   * old (didn't get renewed on the client reconnect), remove it now.
1979   *
1980   * caller must hold s_mutex.
1981   */
wake_up_session_cb(struct inode * inode,int mds,void * arg)1982  static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1983  {
1984  	struct ceph_inode_info *ci = ceph_inode(inode);
1985  	unsigned long ev = (unsigned long)arg;
1986  
1987  	if (ev == RECONNECT) {
1988  		spin_lock(&ci->i_ceph_lock);
1989  		ci->i_wanted_max_size = 0;
1990  		ci->i_requested_max_size = 0;
1991  		spin_unlock(&ci->i_ceph_lock);
1992  	} else if (ev == RENEWCAPS) {
1993  		struct ceph_cap *cap;
1994  
1995  		spin_lock(&ci->i_ceph_lock);
1996  		cap = __get_cap_for_mds(ci, mds);
1997  		/* mds did not re-issue stale cap */
1998  		if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1999  			cap->issued = cap->implemented = CEPH_CAP_PIN;
2000  		spin_unlock(&ci->i_ceph_lock);
2001  	} else if (ev == FORCE_RO) {
2002  	}
2003  	wake_up_all(&ci->i_cap_wq);
2004  	return 0;
2005  }
2006  
wake_up_session_caps(struct ceph_mds_session * session,int ev)2007  static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
2008  {
2009  	struct ceph_client *cl = session->s_mdsc->fsc->client;
2010  
2011  	doutc(cl, "session %p mds%d\n", session, session->s_mds);
2012  	ceph_iterate_session_caps(session, wake_up_session_cb,
2013  				  (void *)(unsigned long)ev);
2014  }
2015  
2016  /*
2017   * Send periodic message to MDS renewing all currently held caps.  The
2018   * ack will reset the expiration for all caps from this session.
2019   *
2020   * caller holds s_mutex
2021   */
send_renew_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2022  static int send_renew_caps(struct ceph_mds_client *mdsc,
2023  			   struct ceph_mds_session *session)
2024  {
2025  	struct ceph_client *cl = mdsc->fsc->client;
2026  	struct ceph_msg *msg;
2027  	int state;
2028  
2029  	if (time_after_eq(jiffies, session->s_cap_ttl) &&
2030  	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
2031  		pr_info_client(cl, "mds%d caps stale\n", session->s_mds);
2032  	session->s_renew_requested = jiffies;
2033  
2034  	/* do not try to renew caps until a recovering mds has reconnected
2035  	 * with its clients. */
2036  	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
2037  	if (state < CEPH_MDS_STATE_RECONNECT) {
2038  		doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
2039  		      ceph_mds_state_name(state));
2040  		return 0;
2041  	}
2042  
2043  	doutc(cl, "to mds%d (%s)\n", session->s_mds,
2044  	      ceph_mds_state_name(state));
2045  	msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS,
2046  				      ++session->s_renew_seq);
2047  	if (IS_ERR(msg))
2048  		return PTR_ERR(msg);
2049  	ceph_con_send(&session->s_con, msg);
2050  	return 0;
2051  }
2052  
send_flushmsg_ack(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,u64 seq)2053  static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2054  			     struct ceph_mds_session *session, u64 seq)
2055  {
2056  	struct ceph_client *cl = mdsc->fsc->client;
2057  	struct ceph_msg *msg;
2058  
2059  	doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds,
2060  	      ceph_session_state_name(session->s_state), seq);
2061  	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2062  	if (!msg)
2063  		return -ENOMEM;
2064  	ceph_con_send(&session->s_con, msg);
2065  	return 0;
2066  }
2067  
2068  
2069  /*
2070   * Note new cap ttl, and any transition from stale -> not stale (fresh?).
2071   *
2072   * Called under session->s_mutex
2073   */
renewed_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int is_renew)2074  static void renewed_caps(struct ceph_mds_client *mdsc,
2075  			 struct ceph_mds_session *session, int is_renew)
2076  {
2077  	struct ceph_client *cl = mdsc->fsc->client;
2078  	int was_stale;
2079  	int wake = 0;
2080  
2081  	spin_lock(&session->s_cap_lock);
2082  	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2083  
2084  	session->s_cap_ttl = session->s_renew_requested +
2085  		mdsc->mdsmap->m_session_timeout*HZ;
2086  
2087  	if (was_stale) {
2088  		if (time_before(jiffies, session->s_cap_ttl)) {
2089  			pr_info_client(cl, "mds%d caps renewed\n",
2090  				       session->s_mds);
2091  			wake = 1;
2092  		} else {
2093  			pr_info_client(cl, "mds%d caps still stale\n",
2094  				       session->s_mds);
2095  		}
2096  	}
2097  	doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds,
2098  	      session->s_cap_ttl, was_stale ? "stale" : "fresh",
2099  	      time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2100  	spin_unlock(&session->s_cap_lock);
2101  
2102  	if (wake)
2103  		wake_up_session_caps(session, RENEWCAPS);
2104  }
2105  
2106  /*
2107   * send a session close request
2108   */
request_close_session(struct ceph_mds_session * session)2109  static int request_close_session(struct ceph_mds_session *session)
2110  {
2111  	struct ceph_client *cl = session->s_mdsc->fsc->client;
2112  	struct ceph_msg *msg;
2113  
2114  	doutc(cl, "mds%d state %s seq %lld\n", session->s_mds,
2115  	      ceph_session_state_name(session->s_state), session->s_seq);
2116  	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2117  				      session->s_seq);
2118  	if (!msg)
2119  		return -ENOMEM;
2120  	ceph_con_send(&session->s_con, msg);
2121  	return 1;
2122  }
2123  
2124  /*
2125   * Called with s_mutex held.
2126   */
__close_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2127  static int __close_session(struct ceph_mds_client *mdsc,
2128  			 struct ceph_mds_session *session)
2129  {
2130  	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2131  		return 0;
2132  	session->s_state = CEPH_MDS_SESSION_CLOSING;
2133  	return request_close_session(session);
2134  }
2135  
drop_negative_children(struct dentry * dentry)2136  static bool drop_negative_children(struct dentry *dentry)
2137  {
2138  	struct dentry *child;
2139  	bool all_negative = true;
2140  
2141  	if (!d_is_dir(dentry))
2142  		goto out;
2143  
2144  	spin_lock(&dentry->d_lock);
2145  	hlist_for_each_entry(child, &dentry->d_children, d_sib) {
2146  		if (d_really_is_positive(child)) {
2147  			all_negative = false;
2148  			break;
2149  		}
2150  	}
2151  	spin_unlock(&dentry->d_lock);
2152  
2153  	if (all_negative)
2154  		shrink_dcache_parent(dentry);
2155  out:
2156  	return all_negative;
2157  }
2158  
2159  /*
2160   * Trim old(er) caps.
2161   *
2162   * Because we can't cache an inode without one or more caps, we do
2163   * this indirectly: if a cap is unused, we prune its aliases, at which
2164   * point the inode will hopefully get dropped to.
2165   *
2166   * Yes, this is a bit sloppy.  Our only real goal here is to respond to
2167   * memory pressure from the MDS, though, so it needn't be perfect.
2168   */
trim_caps_cb(struct inode * inode,int mds,void * arg)2169  static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2170  {
2171  	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2172  	struct ceph_client *cl = mdsc->fsc->client;
2173  	int *remaining = arg;
2174  	struct ceph_inode_info *ci = ceph_inode(inode);
2175  	int used, wanted, oissued, mine;
2176  	struct ceph_cap *cap;
2177  
2178  	if (*remaining <= 0)
2179  		return -1;
2180  
2181  	spin_lock(&ci->i_ceph_lock);
2182  	cap = __get_cap_for_mds(ci, mds);
2183  	if (!cap) {
2184  		spin_unlock(&ci->i_ceph_lock);
2185  		return 0;
2186  	}
2187  	mine = cap->issued | cap->implemented;
2188  	used = __ceph_caps_used(ci);
2189  	wanted = __ceph_caps_file_wanted(ci);
2190  	oissued = __ceph_caps_issued_other(ci, cap);
2191  
2192  	doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
2193  	      inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
2194  	      ceph_cap_string(oissued), ceph_cap_string(used),
2195  	      ceph_cap_string(wanted));
2196  	if (cap == ci->i_auth_cap) {
2197  		if (ci->i_dirty_caps || ci->i_flushing_caps ||
2198  		    !list_empty(&ci->i_cap_snaps))
2199  			goto out;
2200  		if ((used | wanted) & CEPH_CAP_ANY_WR)
2201  			goto out;
2202  		/* Note: it's possible that i_filelock_ref becomes non-zero
2203  		 * after dropping auth caps. It doesn't hurt because reply
2204  		 * of lock mds request will re-add auth caps. */
2205  		if (atomic_read(&ci->i_filelock_ref) > 0)
2206  			goto out;
2207  	}
2208  	/* The inode has cached pages, but it's no longer used.
2209  	 * we can safely drop it */
2210  	if (S_ISREG(inode->i_mode) &&
2211  	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2212  	    !(oissued & CEPH_CAP_FILE_CACHE)) {
2213  	  used = 0;
2214  	  oissued = 0;
2215  	}
2216  	if ((used | wanted) & ~oissued & mine)
2217  		goto out;   /* we need these caps */
2218  
2219  	if (oissued) {
2220  		/* we aren't the only cap.. just remove us */
2221  		ceph_remove_cap(mdsc, cap, true);
2222  		(*remaining)--;
2223  	} else {
2224  		struct dentry *dentry;
2225  		/* try dropping referring dentries */
2226  		spin_unlock(&ci->i_ceph_lock);
2227  		dentry = d_find_any_alias(inode);
2228  		if (dentry && drop_negative_children(dentry)) {
2229  			int count;
2230  			dput(dentry);
2231  			d_prune_aliases(inode);
2232  			count = atomic_read(&inode->i_count);
2233  			if (count == 1)
2234  				(*remaining)--;
2235  			doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
2236  			      inode, ceph_vinop(inode), cap, count);
2237  		} else {
2238  			dput(dentry);
2239  		}
2240  		return 0;
2241  	}
2242  
2243  out:
2244  	spin_unlock(&ci->i_ceph_lock);
2245  	return 0;
2246  }
2247  
2248  /*
2249   * Trim session cap count down to some max number.
2250   */
ceph_trim_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int max_caps)2251  int ceph_trim_caps(struct ceph_mds_client *mdsc,
2252  		   struct ceph_mds_session *session,
2253  		   int max_caps)
2254  {
2255  	struct ceph_client *cl = mdsc->fsc->client;
2256  	int trim_caps = session->s_nr_caps - max_caps;
2257  
2258  	doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
2259  	      session->s_nr_caps, max_caps, trim_caps);
2260  	if (trim_caps > 0) {
2261  		int remaining = trim_caps;
2262  
2263  		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2264  		doutc(cl, "mds%d done: %d / %d, trimmed %d\n",
2265  		      session->s_mds, session->s_nr_caps, max_caps,
2266  		      trim_caps - remaining);
2267  	}
2268  
2269  	ceph_flush_session_cap_releases(mdsc, session);
2270  	return 0;
2271  }
2272  
check_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2273  static int check_caps_flush(struct ceph_mds_client *mdsc,
2274  			    u64 want_flush_tid)
2275  {
2276  	struct ceph_client *cl = mdsc->fsc->client;
2277  	int ret = 1;
2278  
2279  	spin_lock(&mdsc->cap_dirty_lock);
2280  	if (!list_empty(&mdsc->cap_flush_list)) {
2281  		struct ceph_cap_flush *cf =
2282  			list_first_entry(&mdsc->cap_flush_list,
2283  					 struct ceph_cap_flush, g_list);
2284  		if (cf->tid <= want_flush_tid) {
2285  			doutc(cl, "still flushing tid %llu <= %llu\n",
2286  			      cf->tid, want_flush_tid);
2287  			ret = 0;
2288  		}
2289  	}
2290  	spin_unlock(&mdsc->cap_dirty_lock);
2291  	return ret;
2292  }
2293  
2294  /*
2295   * flush all dirty inode data to disk.
2296   *
2297   * returns true if we've flushed through want_flush_tid
2298   */
wait_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2299  static void wait_caps_flush(struct ceph_mds_client *mdsc,
2300  			    u64 want_flush_tid)
2301  {
2302  	struct ceph_client *cl = mdsc->fsc->client;
2303  
2304  	doutc(cl, "want %llu\n", want_flush_tid);
2305  
2306  	wait_event(mdsc->cap_flushing_wq,
2307  		   check_caps_flush(mdsc, want_flush_tid));
2308  
2309  	doutc(cl, "ok, flushed thru %llu\n", want_flush_tid);
2310  }
2311  
2312  /*
2313   * called under s_mutex
2314   */
ceph_send_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2315  static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2316  				   struct ceph_mds_session *session)
2317  {
2318  	struct ceph_client *cl = mdsc->fsc->client;
2319  	struct ceph_msg *msg = NULL;
2320  	struct ceph_mds_cap_release *head;
2321  	struct ceph_mds_cap_item *item;
2322  	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2323  	struct ceph_cap *cap;
2324  	LIST_HEAD(tmp_list);
2325  	int num_cap_releases;
2326  	__le32	barrier, *cap_barrier;
2327  
2328  	down_read(&osdc->lock);
2329  	barrier = cpu_to_le32(osdc->epoch_barrier);
2330  	up_read(&osdc->lock);
2331  
2332  	spin_lock(&session->s_cap_lock);
2333  again:
2334  	list_splice_init(&session->s_cap_releases, &tmp_list);
2335  	num_cap_releases = session->s_num_cap_releases;
2336  	session->s_num_cap_releases = 0;
2337  	spin_unlock(&session->s_cap_lock);
2338  
2339  	while (!list_empty(&tmp_list)) {
2340  		if (!msg) {
2341  			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2342  					PAGE_SIZE, GFP_NOFS, false);
2343  			if (!msg)
2344  				goto out_err;
2345  			head = msg->front.iov_base;
2346  			head->num = cpu_to_le32(0);
2347  			msg->front.iov_len = sizeof(*head);
2348  
2349  			msg->hdr.version = cpu_to_le16(2);
2350  			msg->hdr.compat_version = cpu_to_le16(1);
2351  		}
2352  
2353  		cap = list_first_entry(&tmp_list, struct ceph_cap,
2354  					session_caps);
2355  		list_del(&cap->session_caps);
2356  		num_cap_releases--;
2357  
2358  		head = msg->front.iov_base;
2359  		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2360  				   &head->num);
2361  		item = msg->front.iov_base + msg->front.iov_len;
2362  		item->ino = cpu_to_le64(cap->cap_ino);
2363  		item->cap_id = cpu_to_le64(cap->cap_id);
2364  		item->migrate_seq = cpu_to_le32(cap->mseq);
2365  		item->seq = cpu_to_le32(cap->issue_seq);
2366  		msg->front.iov_len += sizeof(*item);
2367  
2368  		ceph_put_cap(mdsc, cap);
2369  
2370  		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2371  			// Append cap_barrier field
2372  			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2373  			*cap_barrier = barrier;
2374  			msg->front.iov_len += sizeof(*cap_barrier);
2375  
2376  			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2377  			doutc(cl, "mds%d %p\n", session->s_mds, msg);
2378  			ceph_con_send(&session->s_con, msg);
2379  			msg = NULL;
2380  		}
2381  	}
2382  
2383  	BUG_ON(num_cap_releases != 0);
2384  
2385  	spin_lock(&session->s_cap_lock);
2386  	if (!list_empty(&session->s_cap_releases))
2387  		goto again;
2388  	spin_unlock(&session->s_cap_lock);
2389  
2390  	if (msg) {
2391  		// Append cap_barrier field
2392  		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2393  		*cap_barrier = barrier;
2394  		msg->front.iov_len += sizeof(*cap_barrier);
2395  
2396  		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2397  		doutc(cl, "mds%d %p\n", session->s_mds, msg);
2398  		ceph_con_send(&session->s_con, msg);
2399  	}
2400  	return;
2401  out_err:
2402  	pr_err_client(cl, "mds%d, failed to allocate message\n",
2403  		      session->s_mds);
2404  	spin_lock(&session->s_cap_lock);
2405  	list_splice(&tmp_list, &session->s_cap_releases);
2406  	session->s_num_cap_releases += num_cap_releases;
2407  	spin_unlock(&session->s_cap_lock);
2408  }
2409  
ceph_cap_release_work(struct work_struct * work)2410  static void ceph_cap_release_work(struct work_struct *work)
2411  {
2412  	struct ceph_mds_session *session =
2413  		container_of(work, struct ceph_mds_session, s_cap_release_work);
2414  
2415  	mutex_lock(&session->s_mutex);
2416  	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2417  	    session->s_state == CEPH_MDS_SESSION_HUNG)
2418  		ceph_send_cap_releases(session->s_mdsc, session);
2419  	mutex_unlock(&session->s_mutex);
2420  	ceph_put_mds_session(session);
2421  }
2422  
ceph_flush_session_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2423  void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc,
2424  		             struct ceph_mds_session *session)
2425  {
2426  	struct ceph_client *cl = mdsc->fsc->client;
2427  	if (mdsc->stopping)
2428  		return;
2429  
2430  	ceph_get_mds_session(session);
2431  	if (queue_work(mdsc->fsc->cap_wq,
2432  		       &session->s_cap_release_work)) {
2433  		doutc(cl, "cap release work queued\n");
2434  	} else {
2435  		ceph_put_mds_session(session);
2436  		doutc(cl, "failed to queue cap release work\n");
2437  	}
2438  }
2439  
2440  /*
2441   * caller holds session->s_cap_lock
2442   */
__ceph_queue_cap_release(struct ceph_mds_session * session,struct ceph_cap * cap)2443  void __ceph_queue_cap_release(struct ceph_mds_session *session,
2444  			      struct ceph_cap *cap)
2445  {
2446  	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2447  	session->s_num_cap_releases++;
2448  
2449  	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2450  		ceph_flush_session_cap_releases(session->s_mdsc, session);
2451  }
2452  
ceph_cap_reclaim_work(struct work_struct * work)2453  static void ceph_cap_reclaim_work(struct work_struct *work)
2454  {
2455  	struct ceph_mds_client *mdsc =
2456  		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2457  	int ret = ceph_trim_dentries(mdsc);
2458  	if (ret == -EAGAIN)
2459  		ceph_queue_cap_reclaim_work(mdsc);
2460  }
2461  
ceph_queue_cap_reclaim_work(struct ceph_mds_client * mdsc)2462  void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2463  {
2464  	struct ceph_client *cl = mdsc->fsc->client;
2465  	if (mdsc->stopping)
2466  		return;
2467  
2468          if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2469                  doutc(cl, "caps reclaim work queued\n");
2470          } else {
2471                  doutc(cl, "failed to queue caps release work\n");
2472          }
2473  }
2474  
ceph_reclaim_caps_nr(struct ceph_mds_client * mdsc,int nr)2475  void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2476  {
2477  	int val;
2478  	if (!nr)
2479  		return;
2480  	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2481  	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2482  		atomic_set(&mdsc->cap_reclaim_pending, 0);
2483  		ceph_queue_cap_reclaim_work(mdsc);
2484  	}
2485  }
2486  
ceph_queue_cap_unlink_work(struct ceph_mds_client * mdsc)2487  void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
2488  {
2489  	struct ceph_client *cl = mdsc->fsc->client;
2490  	if (mdsc->stopping)
2491  		return;
2492  
2493          if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
2494                  doutc(cl, "caps unlink work queued\n");
2495          } else {
2496                  doutc(cl, "failed to queue caps unlink work\n");
2497          }
2498  }
2499  
ceph_cap_unlink_work(struct work_struct * work)2500  static void ceph_cap_unlink_work(struct work_struct *work)
2501  {
2502  	struct ceph_mds_client *mdsc =
2503  		container_of(work, struct ceph_mds_client, cap_unlink_work);
2504  	struct ceph_client *cl = mdsc->fsc->client;
2505  
2506  	doutc(cl, "begin\n");
2507  	spin_lock(&mdsc->cap_delay_lock);
2508  	while (!list_empty(&mdsc->cap_unlink_delay_list)) {
2509  		struct ceph_inode_info *ci;
2510  		struct inode *inode;
2511  
2512  		ci = list_first_entry(&mdsc->cap_unlink_delay_list,
2513  				      struct ceph_inode_info,
2514  				      i_cap_delay_list);
2515  		list_del_init(&ci->i_cap_delay_list);
2516  
2517  		inode = igrab(&ci->netfs.inode);
2518  		if (inode) {
2519  			spin_unlock(&mdsc->cap_delay_lock);
2520  			doutc(cl, "on %p %llx.%llx\n", inode,
2521  			      ceph_vinop(inode));
2522  			ceph_check_caps(ci, CHECK_CAPS_FLUSH);
2523  			iput(inode);
2524  			spin_lock(&mdsc->cap_delay_lock);
2525  		}
2526  	}
2527  	spin_unlock(&mdsc->cap_delay_lock);
2528  	doutc(cl, "done\n");
2529  }
2530  
2531  /*
2532   * requests
2533   */
2534  
ceph_alloc_readdir_reply_buffer(struct ceph_mds_request * req,struct inode * dir)2535  int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2536  				    struct inode *dir)
2537  {
2538  	struct ceph_inode_info *ci = ceph_inode(dir);
2539  	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2540  	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2541  	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2542  	unsigned int num_entries;
2543  	int order;
2544  
2545  	spin_lock(&ci->i_ceph_lock);
2546  	num_entries = ci->i_files + ci->i_subdirs;
2547  	spin_unlock(&ci->i_ceph_lock);
2548  	num_entries = max(num_entries, 1U);
2549  	num_entries = min(num_entries, opt->max_readdir);
2550  
2551  	order = get_order(size * num_entries);
2552  	while (order >= 0) {
2553  		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2554  							     __GFP_NOWARN |
2555  							     __GFP_ZERO,
2556  							     order);
2557  		if (rinfo->dir_entries)
2558  			break;
2559  		order--;
2560  	}
2561  	if (!rinfo->dir_entries)
2562  		return -ENOMEM;
2563  
2564  	num_entries = (PAGE_SIZE << order) / size;
2565  	num_entries = min(num_entries, opt->max_readdir);
2566  
2567  	rinfo->dir_buf_size = PAGE_SIZE << order;
2568  	req->r_num_caps = num_entries + 1;
2569  	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2570  	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2571  	return 0;
2572  }
2573  
2574  /*
2575   * Create an mds request.
2576   */
2577  struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client * mdsc,int op,int mode)2578  ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2579  {
2580  	struct ceph_mds_request *req;
2581  
2582  	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2583  	if (!req)
2584  		return ERR_PTR(-ENOMEM);
2585  
2586  	mutex_init(&req->r_fill_mutex);
2587  	req->r_mdsc = mdsc;
2588  	req->r_started = jiffies;
2589  	req->r_start_latency = ktime_get();
2590  	req->r_resend_mds = -1;
2591  	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2592  	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2593  	req->r_fmode = -1;
2594  	req->r_feature_needed = -1;
2595  	kref_init(&req->r_kref);
2596  	RB_CLEAR_NODE(&req->r_node);
2597  	INIT_LIST_HEAD(&req->r_wait);
2598  	init_completion(&req->r_completion);
2599  	init_completion(&req->r_safe_completion);
2600  	INIT_LIST_HEAD(&req->r_unsafe_item);
2601  
2602  	ktime_get_coarse_real_ts64(&req->r_stamp);
2603  
2604  	req->r_op = op;
2605  	req->r_direct_mode = mode;
2606  	return req;
2607  }
2608  
2609  /*
2610   * return oldest (lowest) request, tid in request tree, 0 if none.
2611   *
2612   * called under mdsc->mutex.
2613   */
__get_oldest_req(struct ceph_mds_client * mdsc)2614  static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2615  {
2616  	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2617  		return NULL;
2618  	return rb_entry(rb_first(&mdsc->request_tree),
2619  			struct ceph_mds_request, r_node);
2620  }
2621  
__get_oldest_tid(struct ceph_mds_client * mdsc)2622  static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2623  {
2624  	return mdsc->oldest_tid;
2625  }
2626  
2627  #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
get_fscrypt_altname(const struct ceph_mds_request * req,u32 * plen)2628  static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2629  {
2630  	struct inode *dir = req->r_parent;
2631  	struct dentry *dentry = req->r_dentry;
2632  	u8 *cryptbuf = NULL;
2633  	u32 len = 0;
2634  	int ret = 0;
2635  
2636  	/* only encode if we have parent and dentry */
2637  	if (!dir || !dentry)
2638  		goto success;
2639  
2640  	/* No-op unless this is encrypted */
2641  	if (!IS_ENCRYPTED(dir))
2642  		goto success;
2643  
2644  	ret = ceph_fscrypt_prepare_readdir(dir);
2645  	if (ret < 0)
2646  		return ERR_PTR(ret);
2647  
2648  	/* No key? Just ignore it. */
2649  	if (!fscrypt_has_encryption_key(dir))
2650  		goto success;
2651  
2652  	if (!fscrypt_fname_encrypted_size(dir, dentry->d_name.len, NAME_MAX,
2653  					  &len)) {
2654  		WARN_ON_ONCE(1);
2655  		return ERR_PTR(-ENAMETOOLONG);
2656  	}
2657  
2658  	/* No need to append altname if name is short enough */
2659  	if (len <= CEPH_NOHASH_NAME_MAX) {
2660  		len = 0;
2661  		goto success;
2662  	}
2663  
2664  	cryptbuf = kmalloc(len, GFP_KERNEL);
2665  	if (!cryptbuf)
2666  		return ERR_PTR(-ENOMEM);
2667  
2668  	ret = fscrypt_fname_encrypt(dir, &dentry->d_name, cryptbuf, len);
2669  	if (ret) {
2670  		kfree(cryptbuf);
2671  		return ERR_PTR(ret);
2672  	}
2673  success:
2674  	*plen = len;
2675  	return cryptbuf;
2676  }
2677  #else
get_fscrypt_altname(const struct ceph_mds_request * req,u32 * plen)2678  static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2679  {
2680  	*plen = 0;
2681  	return NULL;
2682  }
2683  #endif
2684  
2685  /**
2686   * ceph_mdsc_build_path - build a path string to a given dentry
2687   * @mdsc: mds client
2688   * @dentry: dentry to which path should be built
2689   * @plen: returned length of string
2690   * @pbase: returned base inode number
2691   * @for_wire: is this path going to be sent to the MDS?
2692   *
2693   * Build a string that represents the path to the dentry. This is mostly called
2694   * for two different purposes:
2695   *
2696   * 1) we need to build a path string to send to the MDS (for_wire == true)
2697   * 2) we need a path string for local presentation (e.g. debugfs)
2698   *    (for_wire == false)
2699   *
2700   * The path is built in reverse, starting with the dentry. Walk back up toward
2701   * the root, building the path until the first non-snapped inode is reached
2702   * (for_wire) or the root inode is reached (!for_wire).
2703   *
2704   * Encode hidden .snap dirs as a double /, i.e.
2705   *   foo/.snap/bar -> foo//bar
2706   */
ceph_mdsc_build_path(struct ceph_mds_client * mdsc,struct dentry * dentry,int * plen,u64 * pbase,int for_wire)2707  char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2708  			   int *plen, u64 *pbase, int for_wire)
2709  {
2710  	struct ceph_client *cl = mdsc->fsc->client;
2711  	struct dentry *cur;
2712  	struct inode *inode;
2713  	char *path;
2714  	int pos;
2715  	unsigned seq;
2716  	u64 base;
2717  
2718  	if (!dentry)
2719  		return ERR_PTR(-EINVAL);
2720  
2721  	path = __getname();
2722  	if (!path)
2723  		return ERR_PTR(-ENOMEM);
2724  retry:
2725  	pos = PATH_MAX - 1;
2726  	path[pos] = '\0';
2727  
2728  	seq = read_seqbegin(&rename_lock);
2729  	cur = dget(dentry);
2730  	for (;;) {
2731  		struct dentry *parent;
2732  
2733  		spin_lock(&cur->d_lock);
2734  		inode = d_inode(cur);
2735  		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2736  			doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur);
2737  			spin_unlock(&cur->d_lock);
2738  			parent = dget_parent(cur);
2739  		} else if (for_wire && inode && dentry != cur &&
2740  			   ceph_snap(inode) == CEPH_NOSNAP) {
2741  			spin_unlock(&cur->d_lock);
2742  			pos++; /* get rid of any prepended '/' */
2743  			break;
2744  		} else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2745  			pos -= cur->d_name.len;
2746  			if (pos < 0) {
2747  				spin_unlock(&cur->d_lock);
2748  				break;
2749  			}
2750  			memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2751  			spin_unlock(&cur->d_lock);
2752  			parent = dget_parent(cur);
2753  		} else {
2754  			int len, ret;
2755  			char buf[NAME_MAX];
2756  
2757  			/*
2758  			 * Proactively copy name into buf, in case we need to
2759  			 * present it as-is.
2760  			 */
2761  			memcpy(buf, cur->d_name.name, cur->d_name.len);
2762  			len = cur->d_name.len;
2763  			spin_unlock(&cur->d_lock);
2764  			parent = dget_parent(cur);
2765  
2766  			ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2767  			if (ret < 0) {
2768  				dput(parent);
2769  				dput(cur);
2770  				return ERR_PTR(ret);
2771  			}
2772  
2773  			if (fscrypt_has_encryption_key(d_inode(parent))) {
2774  				len = ceph_encode_encrypted_fname(d_inode(parent),
2775  								  cur, buf);
2776  				if (len < 0) {
2777  					dput(parent);
2778  					dput(cur);
2779  					return ERR_PTR(len);
2780  				}
2781  			}
2782  			pos -= len;
2783  			if (pos < 0) {
2784  				dput(parent);
2785  				break;
2786  			}
2787  			memcpy(path + pos, buf, len);
2788  		}
2789  		dput(cur);
2790  		cur = parent;
2791  
2792  		/* Are we at the root? */
2793  		if (IS_ROOT(cur))
2794  			break;
2795  
2796  		/* Are we out of buffer? */
2797  		if (--pos < 0)
2798  			break;
2799  
2800  		path[pos] = '/';
2801  	}
2802  	inode = d_inode(cur);
2803  	base = inode ? ceph_ino(inode) : 0;
2804  	dput(cur);
2805  
2806  	if (read_seqretry(&rename_lock, seq))
2807  		goto retry;
2808  
2809  	if (pos < 0) {
2810  		/*
2811  		 * A rename didn't occur, but somehow we didn't end up where
2812  		 * we thought we would. Throw a warning and try again.
2813  		 */
2814  		pr_warn_client(cl, "did not end path lookup where expected (pos = %d)\n",
2815  			       pos);
2816  		goto retry;
2817  	}
2818  
2819  	*pbase = base;
2820  	*plen = PATH_MAX - 1 - pos;
2821  	doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry),
2822  	      base, *plen, path + pos);
2823  	return path + pos;
2824  }
2825  
build_dentry_path(struct ceph_mds_client * mdsc,struct dentry * dentry,struct inode * dir,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath,bool parent_locked)2826  static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2827  			     struct inode *dir, const char **ppath, int *ppathlen,
2828  			     u64 *pino, bool *pfreepath, bool parent_locked)
2829  {
2830  	char *path;
2831  
2832  	rcu_read_lock();
2833  	if (!dir)
2834  		dir = d_inode_rcu(dentry->d_parent);
2835  	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2836  	    !IS_ENCRYPTED(dir)) {
2837  		*pino = ceph_ino(dir);
2838  		rcu_read_unlock();
2839  		*ppath = dentry->d_name.name;
2840  		*ppathlen = dentry->d_name.len;
2841  		return 0;
2842  	}
2843  	rcu_read_unlock();
2844  	path = ceph_mdsc_build_path(mdsc, dentry, ppathlen, pino, 1);
2845  	if (IS_ERR(path))
2846  		return PTR_ERR(path);
2847  	*ppath = path;
2848  	*pfreepath = true;
2849  	return 0;
2850  }
2851  
build_inode_path(struct inode * inode,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath)2852  static int build_inode_path(struct inode *inode,
2853  			    const char **ppath, int *ppathlen, u64 *pino,
2854  			    bool *pfreepath)
2855  {
2856  	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2857  	struct dentry *dentry;
2858  	char *path;
2859  
2860  	if (ceph_snap(inode) == CEPH_NOSNAP) {
2861  		*pino = ceph_ino(inode);
2862  		*ppathlen = 0;
2863  		return 0;
2864  	}
2865  	dentry = d_find_alias(inode);
2866  	path = ceph_mdsc_build_path(mdsc, dentry, ppathlen, pino, 1);
2867  	dput(dentry);
2868  	if (IS_ERR(path))
2869  		return PTR_ERR(path);
2870  	*ppath = path;
2871  	*pfreepath = true;
2872  	return 0;
2873  }
2874  
2875  /*
2876   * request arguments may be specified via an inode *, a dentry *, or
2877   * an explicit ino+path.
2878   */
set_request_path_attr(struct ceph_mds_client * mdsc,struct inode * rinode,struct dentry * rdentry,struct inode * rdiri,const char * rpath,u64 rino,const char ** ppath,int * pathlen,u64 * ino,bool * freepath,bool parent_locked)2879  static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode,
2880  				 struct dentry *rdentry, struct inode *rdiri,
2881  				 const char *rpath, u64 rino, const char **ppath,
2882  				 int *pathlen, u64 *ino, bool *freepath,
2883  				 bool parent_locked)
2884  {
2885  	struct ceph_client *cl = mdsc->fsc->client;
2886  	int r = 0;
2887  
2888  	if (rinode) {
2889  		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2890  		doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2891  		      ceph_snap(rinode));
2892  	} else if (rdentry) {
2893  		r = build_dentry_path(mdsc, rdentry, rdiri, ppath, pathlen, ino,
2894  					freepath, parent_locked);
2895  		doutc(cl, " dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, *ppath);
2896  	} else if (rpath || rino) {
2897  		*ino = rino;
2898  		*ppath = rpath;
2899  		*pathlen = rpath ? strlen(rpath) : 0;
2900  		doutc(cl, " path %.*s\n", *pathlen, rpath);
2901  	}
2902  
2903  	return r;
2904  }
2905  
encode_mclientrequest_tail(void ** p,const struct ceph_mds_request * req)2906  static void encode_mclientrequest_tail(void **p,
2907  				       const struct ceph_mds_request *req)
2908  {
2909  	struct ceph_timespec ts;
2910  	int i;
2911  
2912  	ceph_encode_timespec64(&ts, &req->r_stamp);
2913  	ceph_encode_copy(p, &ts, sizeof(ts));
2914  
2915  	/* v4: gid_list */
2916  	ceph_encode_32(p, req->r_cred->group_info->ngroups);
2917  	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2918  		ceph_encode_64(p, from_kgid(&init_user_ns,
2919  					    req->r_cred->group_info->gid[i]));
2920  
2921  	/* v5: altname */
2922  	ceph_encode_32(p, req->r_altname_len);
2923  	ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2924  
2925  	/* v6: fscrypt_auth and fscrypt_file */
2926  	if (req->r_fscrypt_auth) {
2927  		u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2928  
2929  		ceph_encode_32(p, authlen);
2930  		ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2931  	} else {
2932  		ceph_encode_32(p, 0);
2933  	}
2934  	if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2935  		ceph_encode_32(p, sizeof(__le64));
2936  		ceph_encode_64(p, req->r_fscrypt_file);
2937  	} else {
2938  		ceph_encode_32(p, 0);
2939  	}
2940  }
2941  
mds_supported_head_version(struct ceph_mds_session * session)2942  static inline u16 mds_supported_head_version(struct ceph_mds_session *session)
2943  {
2944  	if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features))
2945  		return 1;
2946  
2947  	if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features))
2948  		return 2;
2949  
2950  	return CEPH_MDS_REQUEST_HEAD_VERSION;
2951  }
2952  
2953  static struct ceph_mds_request_head_legacy *
find_legacy_request_head(void * p,u64 features)2954  find_legacy_request_head(void *p, u64 features)
2955  {
2956  	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2957  	struct ceph_mds_request_head_old *ohead;
2958  
2959  	if (legacy)
2960  		return (struct ceph_mds_request_head_legacy *)p;
2961  	ohead = (struct ceph_mds_request_head_old *)p;
2962  	return (struct ceph_mds_request_head_legacy *)&ohead->oldest_client_tid;
2963  }
2964  
2965  /*
2966   * called under mdsc->mutex
2967   */
create_request_message(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)2968  static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2969  					       struct ceph_mds_request *req,
2970  					       bool drop_cap_releases)
2971  {
2972  	int mds = session->s_mds;
2973  	struct ceph_mds_client *mdsc = session->s_mdsc;
2974  	struct ceph_client *cl = mdsc->fsc->client;
2975  	struct ceph_msg *msg;
2976  	struct ceph_mds_request_head_legacy *lhead;
2977  	const char *path1 = NULL;
2978  	const char *path2 = NULL;
2979  	u64 ino1 = 0, ino2 = 0;
2980  	int pathlen1 = 0, pathlen2 = 0;
2981  	bool freepath1 = false, freepath2 = false;
2982  	struct dentry *old_dentry = NULL;
2983  	int len;
2984  	u16 releases;
2985  	void *p, *end;
2986  	int ret;
2987  	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2988  	u16 request_head_version = mds_supported_head_version(session);
2989  	kuid_t caller_fsuid = req->r_cred->fsuid;
2990  	kgid_t caller_fsgid = req->r_cred->fsgid;
2991  
2992  	ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry,
2993  			      req->r_parent, req->r_path1, req->r_ino1.ino,
2994  			      &path1, &pathlen1, &ino1, &freepath1,
2995  			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2996  					&req->r_req_flags));
2997  	if (ret < 0) {
2998  		msg = ERR_PTR(ret);
2999  		goto out;
3000  	}
3001  
3002  	/* If r_old_dentry is set, then assume that its parent is locked */
3003  	if (req->r_old_dentry &&
3004  	    !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
3005  		old_dentry = req->r_old_dentry;
3006  	ret = set_request_path_attr(mdsc, NULL, old_dentry,
3007  			      req->r_old_dentry_dir,
3008  			      req->r_path2, req->r_ino2.ino,
3009  			      &path2, &pathlen2, &ino2, &freepath2, true);
3010  	if (ret < 0) {
3011  		msg = ERR_PTR(ret);
3012  		goto out_free1;
3013  	}
3014  
3015  	req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
3016  	if (IS_ERR(req->r_altname)) {
3017  		msg = ERR_CAST(req->r_altname);
3018  		req->r_altname = NULL;
3019  		goto out_free2;
3020  	}
3021  
3022  	/*
3023  	 * For old cephs without supporting the 32bit retry/fwd feature
3024  	 * it will copy the raw memories directly when decoding the
3025  	 * requests. While new cephs will decode the head depending the
3026  	 * version member, so we need to make sure it will be compatible
3027  	 * with them both.
3028  	 */
3029  	if (legacy)
3030  		len = sizeof(struct ceph_mds_request_head_legacy);
3031  	else if (request_head_version == 1)
3032  		len = sizeof(struct ceph_mds_request_head_old);
3033  	else if (request_head_version == 2)
3034  		len = offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3035  	else
3036  		len = sizeof(struct ceph_mds_request_head);
3037  
3038  	/* filepaths */
3039  	len += 2 * (1 + sizeof(u32) + sizeof(u64));
3040  	len += pathlen1 + pathlen2;
3041  
3042  	/* cap releases */
3043  	len += sizeof(struct ceph_mds_request_release) *
3044  		(!!req->r_inode_drop + !!req->r_dentry_drop +
3045  		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
3046  
3047  	if (req->r_dentry_drop)
3048  		len += pathlen1;
3049  	if (req->r_old_dentry_drop)
3050  		len += pathlen2;
3051  
3052  	/* MClientRequest tail */
3053  
3054  	/* req->r_stamp */
3055  	len += sizeof(struct ceph_timespec);
3056  
3057  	/* gid list */
3058  	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
3059  
3060  	/* alternate name */
3061  	len += sizeof(u32) + req->r_altname_len;
3062  
3063  	/* fscrypt_auth */
3064  	len += sizeof(u32); // fscrypt_auth
3065  	if (req->r_fscrypt_auth)
3066  		len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
3067  
3068  	/* fscrypt_file */
3069  	len += sizeof(u32);
3070  	if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
3071  		len += sizeof(__le64);
3072  
3073  	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
3074  	if (!msg) {
3075  		msg = ERR_PTR(-ENOMEM);
3076  		goto out_free2;
3077  	}
3078  
3079  	msg->hdr.tid = cpu_to_le64(req->r_tid);
3080  
3081  	lhead = find_legacy_request_head(msg->front.iov_base,
3082  					 session->s_con.peer_features);
3083  
3084  	if ((req->r_mnt_idmap != &nop_mnt_idmap) &&
3085  	    !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) {
3086  		WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op));
3087  
3088  		if (enable_unsafe_idmap) {
3089  			pr_warn_once_client(cl,
3090  				"idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3091  				" is not supported by MDS. UID/GID-based restrictions may"
3092  				" not work properly.\n");
3093  
3094  			caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3095  						   VFSUIDT_INIT(req->r_cred->fsuid));
3096  			caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3097  						   VFSGIDT_INIT(req->r_cred->fsgid));
3098  		} else {
3099  			pr_err_ratelimited_client(cl,
3100  				"idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3101  				" is not supported by MDS. Fail request with -EIO.\n");
3102  
3103  			ret = -EIO;
3104  			goto out_err;
3105  		}
3106  	}
3107  
3108  	/*
3109  	 * The ceph_mds_request_head_legacy didn't contain a version field, and
3110  	 * one was added when we moved the message version from 3->4.
3111  	 */
3112  	if (legacy) {
3113  		msg->hdr.version = cpu_to_le16(3);
3114  		p = msg->front.iov_base + sizeof(*lhead);
3115  	} else if (request_head_version == 1) {
3116  		struct ceph_mds_request_head_old *ohead = msg->front.iov_base;
3117  
3118  		msg->hdr.version = cpu_to_le16(4);
3119  		ohead->version = cpu_to_le16(1);
3120  		p = msg->front.iov_base + sizeof(*ohead);
3121  	} else if (request_head_version == 2) {
3122  		struct ceph_mds_request_head *nhead = msg->front.iov_base;
3123  
3124  		msg->hdr.version = cpu_to_le16(6);
3125  		nhead->version = cpu_to_le16(2);
3126  
3127  		p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3128  	} else {
3129  		struct ceph_mds_request_head *nhead = msg->front.iov_base;
3130  		kuid_t owner_fsuid;
3131  		kgid_t owner_fsgid;
3132  
3133  		msg->hdr.version = cpu_to_le16(6);
3134  		nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
3135  		nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head));
3136  
3137  		if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) {
3138  			owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3139  						VFSUIDT_INIT(req->r_cred->fsuid));
3140  			owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3141  						VFSGIDT_INIT(req->r_cred->fsgid));
3142  			nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid));
3143  			nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid));
3144  		} else {
3145  			nhead->owner_uid = cpu_to_le32(-1);
3146  			nhead->owner_gid = cpu_to_le32(-1);
3147  		}
3148  
3149  		p = msg->front.iov_base + sizeof(*nhead);
3150  	}
3151  
3152  	end = msg->front.iov_base + msg->front.iov_len;
3153  
3154  	lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
3155  	lhead->op = cpu_to_le32(req->r_op);
3156  	lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
3157  						  caller_fsuid));
3158  	lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
3159  						  caller_fsgid));
3160  	lhead->ino = cpu_to_le64(req->r_deleg_ino);
3161  	lhead->args = req->r_args;
3162  
3163  	ceph_encode_filepath(&p, end, ino1, path1);
3164  	ceph_encode_filepath(&p, end, ino2, path2);
3165  
3166  	/* make note of release offset, in case we need to replay */
3167  	req->r_request_release_offset = p - msg->front.iov_base;
3168  
3169  	/* cap releases */
3170  	releases = 0;
3171  	if (req->r_inode_drop)
3172  		releases += ceph_encode_inode_release(&p,
3173  		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
3174  		      mds, req->r_inode_drop, req->r_inode_unless,
3175  		      req->r_op == CEPH_MDS_OP_READDIR);
3176  	if (req->r_dentry_drop) {
3177  		ret = ceph_encode_dentry_release(&p, req->r_dentry,
3178  				req->r_parent, mds, req->r_dentry_drop,
3179  				req->r_dentry_unless);
3180  		if (ret < 0)
3181  			goto out_err;
3182  		releases += ret;
3183  	}
3184  	if (req->r_old_dentry_drop) {
3185  		ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
3186  				req->r_old_dentry_dir, mds,
3187  				req->r_old_dentry_drop,
3188  				req->r_old_dentry_unless);
3189  		if (ret < 0)
3190  			goto out_err;
3191  		releases += ret;
3192  	}
3193  	if (req->r_old_inode_drop)
3194  		releases += ceph_encode_inode_release(&p,
3195  		      d_inode(req->r_old_dentry),
3196  		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3197  
3198  	if (drop_cap_releases) {
3199  		releases = 0;
3200  		p = msg->front.iov_base + req->r_request_release_offset;
3201  	}
3202  
3203  	lhead->num_releases = cpu_to_le16(releases);
3204  
3205  	encode_mclientrequest_tail(&p, req);
3206  
3207  	if (WARN_ON_ONCE(p > end)) {
3208  		ceph_msg_put(msg);
3209  		msg = ERR_PTR(-ERANGE);
3210  		goto out_free2;
3211  	}
3212  
3213  	msg->front.iov_len = p - msg->front.iov_base;
3214  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3215  
3216  	if (req->r_pagelist) {
3217  		struct ceph_pagelist *pagelist = req->r_pagelist;
3218  		ceph_msg_data_add_pagelist(msg, pagelist);
3219  		msg->hdr.data_len = cpu_to_le32(pagelist->length);
3220  	} else {
3221  		msg->hdr.data_len = 0;
3222  	}
3223  
3224  	msg->hdr.data_off = cpu_to_le16(0);
3225  
3226  out_free2:
3227  	if (freepath2)
3228  		ceph_mdsc_free_path((char *)path2, pathlen2);
3229  out_free1:
3230  	if (freepath1)
3231  		ceph_mdsc_free_path((char *)path1, pathlen1);
3232  out:
3233  	return msg;
3234  out_err:
3235  	ceph_msg_put(msg);
3236  	msg = ERR_PTR(ret);
3237  	goto out_free2;
3238  }
3239  
3240  /*
3241   * called under mdsc->mutex if error, under no mutex if
3242   * success.
3243   */
complete_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3244  static void complete_request(struct ceph_mds_client *mdsc,
3245  			     struct ceph_mds_request *req)
3246  {
3247  	req->r_end_latency = ktime_get();
3248  
3249  	if (req->r_callback)
3250  		req->r_callback(mdsc, req);
3251  	complete_all(&req->r_completion);
3252  }
3253  
3254  /*
3255   * called under mdsc->mutex
3256   */
__prepare_send_request(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)3257  static int __prepare_send_request(struct ceph_mds_session *session,
3258  				  struct ceph_mds_request *req,
3259  				  bool drop_cap_releases)
3260  {
3261  	int mds = session->s_mds;
3262  	struct ceph_mds_client *mdsc = session->s_mdsc;
3263  	struct ceph_client *cl = mdsc->fsc->client;
3264  	struct ceph_mds_request_head_legacy *lhead;
3265  	struct ceph_mds_request_head *nhead;
3266  	struct ceph_msg *msg;
3267  	int flags = 0, old_max_retry;
3268  	bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
3269  				     &session->s_features);
3270  
3271  	/*
3272  	 * Avoid inifinite retrying after overflow. The client will
3273  	 * increase the retry count and if the MDS is old version,
3274  	 * so we limit to retry at most 256 times.
3275  	 */
3276  	if (req->r_attempts) {
3277  	       old_max_retry = sizeof_field(struct ceph_mds_request_head_old,
3278  					    num_retry);
3279  	       old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
3280  	       if ((old_version && req->r_attempts >= old_max_retry) ||
3281  		   ((uint32_t)req->r_attempts >= U32_MAX)) {
3282  			pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n",
3283  						   req->r_tid);
3284  			return -EMULTIHOP;
3285  	       }
3286  	}
3287  
3288  	req->r_attempts++;
3289  	if (req->r_inode) {
3290  		struct ceph_cap *cap =
3291  			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3292  
3293  		if (cap)
3294  			req->r_sent_on_mseq = cap->mseq;
3295  		else
3296  			req->r_sent_on_mseq = -1;
3297  	}
3298  	doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid,
3299  	      ceph_mds_op_name(req->r_op), req->r_attempts);
3300  
3301  	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3302  		void *p;
3303  
3304  		/*
3305  		 * Replay.  Do not regenerate message (and rebuild
3306  		 * paths, etc.); just use the original message.
3307  		 * Rebuilding paths will break for renames because
3308  		 * d_move mangles the src name.
3309  		 */
3310  		msg = req->r_request;
3311  		lhead = find_legacy_request_head(msg->front.iov_base,
3312  						 session->s_con.peer_features);
3313  
3314  		flags = le32_to_cpu(lhead->flags);
3315  		flags |= CEPH_MDS_FLAG_REPLAY;
3316  		lhead->flags = cpu_to_le32(flags);
3317  
3318  		if (req->r_target_inode)
3319  			lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3320  
3321  		lhead->num_retry = req->r_attempts - 1;
3322  		if (!old_version) {
3323  			nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3324  			nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3325  		}
3326  
3327  		/* remove cap/dentry releases from message */
3328  		lhead->num_releases = 0;
3329  
3330  		p = msg->front.iov_base + req->r_request_release_offset;
3331  		encode_mclientrequest_tail(&p, req);
3332  
3333  		msg->front.iov_len = p - msg->front.iov_base;
3334  		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3335  		return 0;
3336  	}
3337  
3338  	if (req->r_request) {
3339  		ceph_msg_put(req->r_request);
3340  		req->r_request = NULL;
3341  	}
3342  	msg = create_request_message(session, req, drop_cap_releases);
3343  	if (IS_ERR(msg)) {
3344  		req->r_err = PTR_ERR(msg);
3345  		return PTR_ERR(msg);
3346  	}
3347  	req->r_request = msg;
3348  
3349  	lhead = find_legacy_request_head(msg->front.iov_base,
3350  					 session->s_con.peer_features);
3351  	lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3352  	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3353  		flags |= CEPH_MDS_FLAG_REPLAY;
3354  	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3355  		flags |= CEPH_MDS_FLAG_ASYNC;
3356  	if (req->r_parent)
3357  		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3358  	lhead->flags = cpu_to_le32(flags);
3359  	lhead->num_fwd = req->r_num_fwd;
3360  	lhead->num_retry = req->r_attempts - 1;
3361  	if (!old_version) {
3362  		nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3363  		nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
3364  		nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3365  	}
3366  
3367  	doutc(cl, " r_parent = %p\n", req->r_parent);
3368  	return 0;
3369  }
3370  
3371  /*
3372   * called under mdsc->mutex
3373   */
__send_request(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)3374  static int __send_request(struct ceph_mds_session *session,
3375  			  struct ceph_mds_request *req,
3376  			  bool drop_cap_releases)
3377  {
3378  	int err;
3379  
3380  	err = __prepare_send_request(session, req, drop_cap_releases);
3381  	if (!err) {
3382  		ceph_msg_get(req->r_request);
3383  		ceph_con_send(&session->s_con, req->r_request);
3384  	}
3385  
3386  	return err;
3387  }
3388  
3389  /*
3390   * send request, or put it on the appropriate wait list.
3391   */
__do_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3392  static void __do_request(struct ceph_mds_client *mdsc,
3393  			struct ceph_mds_request *req)
3394  {
3395  	struct ceph_client *cl = mdsc->fsc->client;
3396  	struct ceph_mds_session *session = NULL;
3397  	int mds = -1;
3398  	int err = 0;
3399  	bool random;
3400  
3401  	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3402  		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3403  			__unregister_request(mdsc, req);
3404  		return;
3405  	}
3406  
3407  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3408  		doutc(cl, "metadata corrupted\n");
3409  		err = -EIO;
3410  		goto finish;
3411  	}
3412  	if (req->r_timeout &&
3413  	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3414  		doutc(cl, "timed out\n");
3415  		err = -ETIMEDOUT;
3416  		goto finish;
3417  	}
3418  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3419  		doutc(cl, "forced umount\n");
3420  		err = -EIO;
3421  		goto finish;
3422  	}
3423  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3424  		if (mdsc->mdsmap_err) {
3425  			err = mdsc->mdsmap_err;
3426  			doutc(cl, "mdsmap err %d\n", err);
3427  			goto finish;
3428  		}
3429  		if (mdsc->mdsmap->m_epoch == 0) {
3430  			doutc(cl, "no mdsmap, waiting for map\n");
3431  			list_add(&req->r_wait, &mdsc->waiting_for_map);
3432  			return;
3433  		}
3434  		if (!(mdsc->fsc->mount_options->flags &
3435  		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
3436  		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3437  			err = -EHOSTUNREACH;
3438  			goto finish;
3439  		}
3440  	}
3441  
3442  	put_request_session(req);
3443  
3444  	mds = __choose_mds(mdsc, req, &random);
3445  	if (mds < 0 ||
3446  	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3447  		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3448  			err = -EJUKEBOX;
3449  			goto finish;
3450  		}
3451  		doutc(cl, "no mds or not active, waiting for map\n");
3452  		list_add(&req->r_wait, &mdsc->waiting_for_map);
3453  		return;
3454  	}
3455  
3456  	/* get, open session */
3457  	session = __ceph_lookup_mds_session(mdsc, mds);
3458  	if (!session) {
3459  		session = register_session(mdsc, mds);
3460  		if (IS_ERR(session)) {
3461  			err = PTR_ERR(session);
3462  			goto finish;
3463  		}
3464  	}
3465  	req->r_session = ceph_get_mds_session(session);
3466  
3467  	doutc(cl, "mds%d session %p state %s\n", mds, session,
3468  	      ceph_session_state_name(session->s_state));
3469  
3470  	/*
3471  	 * The old ceph will crash the MDSs when see unknown OPs
3472  	 */
3473  	if (req->r_feature_needed > 0 &&
3474  	    !test_bit(req->r_feature_needed, &session->s_features)) {
3475  		err = -EOPNOTSUPP;
3476  		goto out_session;
3477  	}
3478  
3479  	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3480  	    session->s_state != CEPH_MDS_SESSION_HUNG) {
3481  		/*
3482  		 * We cannot queue async requests since the caps and delegated
3483  		 * inodes are bound to the session. Just return -EJUKEBOX and
3484  		 * let the caller retry a sync request in that case.
3485  		 */
3486  		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3487  			err = -EJUKEBOX;
3488  			goto out_session;
3489  		}
3490  
3491  		/*
3492  		 * If the session has been REJECTED, then return a hard error,
3493  		 * unless it's a CLEANRECOVER mount, in which case we'll queue
3494  		 * it to the mdsc queue.
3495  		 */
3496  		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3497  			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
3498  				list_add(&req->r_wait, &mdsc->waiting_for_map);
3499  			else
3500  				err = -EACCES;
3501  			goto out_session;
3502  		}
3503  
3504  		if (session->s_state == CEPH_MDS_SESSION_NEW ||
3505  		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
3506  			err = __open_session(mdsc, session);
3507  			if (err)
3508  				goto out_session;
3509  			/* retry the same mds later */
3510  			if (random)
3511  				req->r_resend_mds = mds;
3512  		}
3513  		list_add(&req->r_wait, &session->s_waiting);
3514  		goto out_session;
3515  	}
3516  
3517  	/* send request */
3518  	req->r_resend_mds = -1;   /* forget any previous mds hint */
3519  
3520  	if (req->r_request_started == 0)   /* note request start time */
3521  		req->r_request_started = jiffies;
3522  
3523  	/*
3524  	 * For async create we will choose the auth MDS of frag in parent
3525  	 * directory to send the request and ususally this works fine, but
3526  	 * if the migrated the dirtory to another MDS before it could handle
3527  	 * it the request will be forwarded.
3528  	 *
3529  	 * And then the auth cap will be changed.
3530  	 */
3531  	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3532  		struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3533  		struct ceph_inode_info *ci;
3534  		struct ceph_cap *cap;
3535  
3536  		/*
3537  		 * The request maybe handled very fast and the new inode
3538  		 * hasn't been linked to the dentry yet. We need to wait
3539  		 * for the ceph_finish_async_create(), which shouldn't be
3540  		 * stuck too long or fail in thoery, to finish when forwarding
3541  		 * the request.
3542  		 */
3543  		if (!d_inode(req->r_dentry)) {
3544  			err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3545  					  TASK_KILLABLE);
3546  			if (err) {
3547  				mutex_lock(&req->r_fill_mutex);
3548  				set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3549  				mutex_unlock(&req->r_fill_mutex);
3550  				goto out_session;
3551  			}
3552  		}
3553  
3554  		ci = ceph_inode(d_inode(req->r_dentry));
3555  
3556  		spin_lock(&ci->i_ceph_lock);
3557  		cap = ci->i_auth_cap;
3558  		if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3559  			doutc(cl, "session changed for auth cap %d -> %d\n",
3560  			      cap->session->s_mds, session->s_mds);
3561  
3562  			/* Remove the auth cap from old session */
3563  			spin_lock(&cap->session->s_cap_lock);
3564  			cap->session->s_nr_caps--;
3565  			list_del_init(&cap->session_caps);
3566  			spin_unlock(&cap->session->s_cap_lock);
3567  
3568  			/* Add the auth cap to the new session */
3569  			cap->mds = mds;
3570  			cap->session = session;
3571  			spin_lock(&session->s_cap_lock);
3572  			session->s_nr_caps++;
3573  			list_add_tail(&cap->session_caps, &session->s_caps);
3574  			spin_unlock(&session->s_cap_lock);
3575  
3576  			change_auth_cap_ses(ci, session);
3577  		}
3578  		spin_unlock(&ci->i_ceph_lock);
3579  	}
3580  
3581  	err = __send_request(session, req, false);
3582  
3583  out_session:
3584  	ceph_put_mds_session(session);
3585  finish:
3586  	if (err) {
3587  		doutc(cl, "early error %d\n", err);
3588  		req->r_err = err;
3589  		complete_request(mdsc, req);
3590  		__unregister_request(mdsc, req);
3591  	}
3592  	return;
3593  }
3594  
3595  /*
3596   * called under mdsc->mutex
3597   */
__wake_requests(struct ceph_mds_client * mdsc,struct list_head * head)3598  static void __wake_requests(struct ceph_mds_client *mdsc,
3599  			    struct list_head *head)
3600  {
3601  	struct ceph_client *cl = mdsc->fsc->client;
3602  	struct ceph_mds_request *req;
3603  	LIST_HEAD(tmp_list);
3604  
3605  	list_splice_init(head, &tmp_list);
3606  
3607  	while (!list_empty(&tmp_list)) {
3608  		req = list_entry(tmp_list.next,
3609  				 struct ceph_mds_request, r_wait);
3610  		list_del_init(&req->r_wait);
3611  		doutc(cl, " wake request %p tid %llu\n", req,
3612  		      req->r_tid);
3613  		__do_request(mdsc, req);
3614  	}
3615  }
3616  
3617  /*
3618   * Wake up threads with requests pending for @mds, so that they can
3619   * resubmit their requests to a possibly different mds.
3620   */
kick_requests(struct ceph_mds_client * mdsc,int mds)3621  static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3622  {
3623  	struct ceph_client *cl = mdsc->fsc->client;
3624  	struct ceph_mds_request *req;
3625  	struct rb_node *p = rb_first(&mdsc->request_tree);
3626  
3627  	doutc(cl, "kick_requests mds%d\n", mds);
3628  	while (p) {
3629  		req = rb_entry(p, struct ceph_mds_request, r_node);
3630  		p = rb_next(p);
3631  		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3632  			continue;
3633  		if (req->r_attempts > 0)
3634  			continue; /* only new requests */
3635  		if (req->r_session &&
3636  		    req->r_session->s_mds == mds) {
3637  			doutc(cl, " kicking tid %llu\n", req->r_tid);
3638  			list_del_init(&req->r_wait);
3639  			__do_request(mdsc, req);
3640  		}
3641  	}
3642  }
3643  
ceph_mdsc_submit_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3644  int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3645  			      struct ceph_mds_request *req)
3646  {
3647  	struct ceph_client *cl = mdsc->fsc->client;
3648  	int err = 0;
3649  
3650  	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3651  	if (req->r_inode)
3652  		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3653  	if (req->r_parent) {
3654  		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3655  		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3656  			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3657  		spin_lock(&ci->i_ceph_lock);
3658  		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3659  		__ceph_touch_fmode(ci, mdsc, fmode);
3660  		spin_unlock(&ci->i_ceph_lock);
3661  	}
3662  	if (req->r_old_dentry_dir)
3663  		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3664  				  CEPH_CAP_PIN);
3665  
3666  	if (req->r_inode) {
3667  		err = ceph_wait_on_async_create(req->r_inode);
3668  		if (err) {
3669  			doutc(cl, "wait for async create returned: %d\n", err);
3670  			return err;
3671  		}
3672  	}
3673  
3674  	if (!err && req->r_old_inode) {
3675  		err = ceph_wait_on_async_create(req->r_old_inode);
3676  		if (err) {
3677  			doutc(cl, "wait for async create returned: %d\n", err);
3678  			return err;
3679  		}
3680  	}
3681  
3682  	doutc(cl, "submit_request on %p for inode %p\n", req, dir);
3683  	mutex_lock(&mdsc->mutex);
3684  	__register_request(mdsc, req, dir);
3685  	__do_request(mdsc, req);
3686  	err = req->r_err;
3687  	mutex_unlock(&mdsc->mutex);
3688  	return err;
3689  }
3690  
ceph_mdsc_wait_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,ceph_mds_request_wait_callback_t wait_func)3691  int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3692  			   struct ceph_mds_request *req,
3693  			   ceph_mds_request_wait_callback_t wait_func)
3694  {
3695  	struct ceph_client *cl = mdsc->fsc->client;
3696  	int err;
3697  
3698  	/* wait */
3699  	doutc(cl, "do_request waiting\n");
3700  	if (wait_func) {
3701  		err = wait_func(mdsc, req);
3702  	} else {
3703  		long timeleft = wait_for_completion_killable_timeout(
3704  					&req->r_completion,
3705  					ceph_timeout_jiffies(req->r_timeout));
3706  		if (timeleft > 0)
3707  			err = 0;
3708  		else if (!timeleft)
3709  			err = -ETIMEDOUT;  /* timed out */
3710  		else
3711  			err = timeleft;  /* killed */
3712  	}
3713  	doutc(cl, "do_request waited, got %d\n", err);
3714  	mutex_lock(&mdsc->mutex);
3715  
3716  	/* only abort if we didn't race with a real reply */
3717  	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3718  		err = le32_to_cpu(req->r_reply_info.head->result);
3719  	} else if (err < 0) {
3720  		doutc(cl, "aborted request %lld with %d\n", req->r_tid, err);
3721  
3722  		/*
3723  		 * ensure we aren't running concurrently with
3724  		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3725  		 * rely on locks (dir mutex) held by our caller.
3726  		 */
3727  		mutex_lock(&req->r_fill_mutex);
3728  		req->r_err = err;
3729  		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3730  		mutex_unlock(&req->r_fill_mutex);
3731  
3732  		if (req->r_parent &&
3733  		    (req->r_op & CEPH_MDS_OP_WRITE))
3734  			ceph_invalidate_dir_request(req);
3735  	} else {
3736  		err = req->r_err;
3737  	}
3738  
3739  	mutex_unlock(&mdsc->mutex);
3740  	return err;
3741  }
3742  
3743  /*
3744   * Synchrously perform an mds request.  Take care of all of the
3745   * session setup, forwarding, retry details.
3746   */
ceph_mdsc_do_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3747  int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3748  			 struct inode *dir,
3749  			 struct ceph_mds_request *req)
3750  {
3751  	struct ceph_client *cl = mdsc->fsc->client;
3752  	int err;
3753  
3754  	doutc(cl, "do_request on %p\n", req);
3755  
3756  	/* issue */
3757  	err = ceph_mdsc_submit_request(mdsc, dir, req);
3758  	if (!err)
3759  		err = ceph_mdsc_wait_request(mdsc, req, NULL);
3760  	doutc(cl, "do_request %p done, result %d\n", req, err);
3761  	return err;
3762  }
3763  
3764  /*
3765   * Invalidate dir's completeness, dentry lease state on an aborted MDS
3766   * namespace request.
3767   */
ceph_invalidate_dir_request(struct ceph_mds_request * req)3768  void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3769  {
3770  	struct inode *dir = req->r_parent;
3771  	struct inode *old_dir = req->r_old_dentry_dir;
3772  	struct ceph_client *cl = req->r_mdsc->fsc->client;
3773  
3774  	doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n",
3775  	      dir, old_dir);
3776  
3777  	ceph_dir_clear_complete(dir);
3778  	if (old_dir)
3779  		ceph_dir_clear_complete(old_dir);
3780  	if (req->r_dentry)
3781  		ceph_invalidate_dentry_lease(req->r_dentry);
3782  	if (req->r_old_dentry)
3783  		ceph_invalidate_dentry_lease(req->r_old_dentry);
3784  }
3785  
3786  /*
3787   * Handle mds reply.
3788   *
3789   * We take the session mutex and parse and process the reply immediately.
3790   * This preserves the logical ordering of replies, capabilities, etc., sent
3791   * by the MDS as they are applied to our local cache.
3792   */
handle_reply(struct ceph_mds_session * session,struct ceph_msg * msg)3793  static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3794  {
3795  	struct ceph_mds_client *mdsc = session->s_mdsc;
3796  	struct ceph_client *cl = mdsc->fsc->client;
3797  	struct ceph_mds_request *req;
3798  	struct ceph_mds_reply_head *head = msg->front.iov_base;
3799  	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3800  	struct ceph_snap_realm *realm;
3801  	u64 tid;
3802  	int err, result;
3803  	int mds = session->s_mds;
3804  	bool close_sessions = false;
3805  
3806  	if (msg->front.iov_len < sizeof(*head)) {
3807  		pr_err_client(cl, "got corrupt (short) reply\n");
3808  		ceph_msg_dump(msg);
3809  		return;
3810  	}
3811  
3812  	/* get request, session */
3813  	tid = le64_to_cpu(msg->hdr.tid);
3814  	mutex_lock(&mdsc->mutex);
3815  	req = lookup_get_request(mdsc, tid);
3816  	if (!req) {
3817  		doutc(cl, "on unknown tid %llu\n", tid);
3818  		mutex_unlock(&mdsc->mutex);
3819  		return;
3820  	}
3821  	doutc(cl, "handle_reply %p\n", req);
3822  
3823  	/* correct session? */
3824  	if (req->r_session != session) {
3825  		pr_err_client(cl, "got %llu on session mds%d not mds%d\n",
3826  			      tid, session->s_mds,
3827  			      req->r_session ? req->r_session->s_mds : -1);
3828  		mutex_unlock(&mdsc->mutex);
3829  		goto out;
3830  	}
3831  
3832  	/* dup? */
3833  	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3834  	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3835  		pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n",
3836  			       head->safe ? "safe" : "unsafe", tid, mds);
3837  		mutex_unlock(&mdsc->mutex);
3838  		goto out;
3839  	}
3840  	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3841  		pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n",
3842  			       tid, mds);
3843  		mutex_unlock(&mdsc->mutex);
3844  		goto out;
3845  	}
3846  
3847  	result = le32_to_cpu(head->result);
3848  
3849  	if (head->safe) {
3850  		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3851  		__unregister_request(mdsc, req);
3852  
3853  		/* last request during umount? */
3854  		if (mdsc->stopping && !__get_oldest_req(mdsc))
3855  			complete_all(&mdsc->safe_umount_waiters);
3856  
3857  		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3858  			/*
3859  			 * We already handled the unsafe response, now do the
3860  			 * cleanup.  No need to examine the response; the MDS
3861  			 * doesn't include any result info in the safe
3862  			 * response.  And even if it did, there is nothing
3863  			 * useful we could do with a revised return value.
3864  			 */
3865  			doutc(cl, "got safe reply %llu, mds%d\n", tid, mds);
3866  
3867  			mutex_unlock(&mdsc->mutex);
3868  			goto out;
3869  		}
3870  	} else {
3871  		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3872  		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3873  	}
3874  
3875  	doutc(cl, "tid %lld result %d\n", tid, result);
3876  	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3877  		err = parse_reply_info(session, msg, req, (u64)-1);
3878  	else
3879  		err = parse_reply_info(session, msg, req,
3880  				       session->s_con.peer_features);
3881  	mutex_unlock(&mdsc->mutex);
3882  
3883  	/* Must find target inode outside of mutexes to avoid deadlocks */
3884  	rinfo = &req->r_reply_info;
3885  	if ((err >= 0) && rinfo->head->is_target) {
3886  		struct inode *in = xchg(&req->r_new_inode, NULL);
3887  		struct ceph_vino tvino = {
3888  			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
3889  			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3890  		};
3891  
3892  		/*
3893  		 * If we ended up opening an existing inode, discard
3894  		 * r_new_inode
3895  		 */
3896  		if (req->r_op == CEPH_MDS_OP_CREATE &&
3897  		    !req->r_reply_info.has_create_ino) {
3898  			/* This should never happen on an async create */
3899  			WARN_ON_ONCE(req->r_deleg_ino);
3900  			iput(in);
3901  			in = NULL;
3902  		}
3903  
3904  		in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3905  		if (IS_ERR(in)) {
3906  			err = PTR_ERR(in);
3907  			mutex_lock(&session->s_mutex);
3908  			goto out_err;
3909  		}
3910  		req->r_target_inode = in;
3911  	}
3912  
3913  	mutex_lock(&session->s_mutex);
3914  	if (err < 0) {
3915  		pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n",
3916  			      mds, tid);
3917  		ceph_msg_dump(msg);
3918  		goto out_err;
3919  	}
3920  
3921  	/* snap trace */
3922  	realm = NULL;
3923  	if (rinfo->snapblob_len) {
3924  		down_write(&mdsc->snap_rwsem);
3925  		err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3926  				rinfo->snapblob + rinfo->snapblob_len,
3927  				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3928  				&realm);
3929  		if (err) {
3930  			up_write(&mdsc->snap_rwsem);
3931  			close_sessions = true;
3932  			if (err == -EIO)
3933  				ceph_msg_dump(msg);
3934  			goto out_err;
3935  		}
3936  		downgrade_write(&mdsc->snap_rwsem);
3937  	} else {
3938  		down_read(&mdsc->snap_rwsem);
3939  	}
3940  
3941  	/* insert trace into our cache */
3942  	mutex_lock(&req->r_fill_mutex);
3943  	current->journal_info = req;
3944  	err = ceph_fill_trace(mdsc->fsc->sb, req);
3945  	if (err == 0) {
3946  		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3947  				    req->r_op == CEPH_MDS_OP_LSSNAP))
3948  			err = ceph_readdir_prepopulate(req, req->r_session);
3949  	}
3950  	current->journal_info = NULL;
3951  	mutex_unlock(&req->r_fill_mutex);
3952  
3953  	up_read(&mdsc->snap_rwsem);
3954  	if (realm)
3955  		ceph_put_snap_realm(mdsc, realm);
3956  
3957  	if (err == 0) {
3958  		if (req->r_target_inode &&
3959  		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3960  			struct ceph_inode_info *ci =
3961  				ceph_inode(req->r_target_inode);
3962  			spin_lock(&ci->i_unsafe_lock);
3963  			list_add_tail(&req->r_unsafe_target_item,
3964  				      &ci->i_unsafe_iops);
3965  			spin_unlock(&ci->i_unsafe_lock);
3966  		}
3967  
3968  		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3969  	}
3970  out_err:
3971  	mutex_lock(&mdsc->mutex);
3972  	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3973  		if (err) {
3974  			req->r_err = err;
3975  		} else {
3976  			req->r_reply =  ceph_msg_get(msg);
3977  			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3978  		}
3979  	} else {
3980  		doutc(cl, "reply arrived after request %lld was aborted\n", tid);
3981  	}
3982  	mutex_unlock(&mdsc->mutex);
3983  
3984  	mutex_unlock(&session->s_mutex);
3985  
3986  	/* kick calling process */
3987  	complete_request(mdsc, req);
3988  
3989  	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3990  				     req->r_end_latency, err);
3991  out:
3992  	ceph_mdsc_put_request(req);
3993  
3994  	/* Defer closing the sessions after s_mutex lock being released */
3995  	if (close_sessions)
3996  		ceph_mdsc_close_sessions(mdsc);
3997  	return;
3998  }
3999  
4000  
4001  
4002  /*
4003   * handle mds notification that our request has been forwarded.
4004   */
handle_forward(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)4005  static void handle_forward(struct ceph_mds_client *mdsc,
4006  			   struct ceph_mds_session *session,
4007  			   struct ceph_msg *msg)
4008  {
4009  	struct ceph_client *cl = mdsc->fsc->client;
4010  	struct ceph_mds_request *req;
4011  	u64 tid = le64_to_cpu(msg->hdr.tid);
4012  	u32 next_mds;
4013  	u32 fwd_seq;
4014  	int err = -EINVAL;
4015  	void *p = msg->front.iov_base;
4016  	void *end = p + msg->front.iov_len;
4017  	bool aborted = false;
4018  
4019  	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4020  	next_mds = ceph_decode_32(&p);
4021  	fwd_seq = ceph_decode_32(&p);
4022  
4023  	mutex_lock(&mdsc->mutex);
4024  	req = lookup_get_request(mdsc, tid);
4025  	if (!req) {
4026  		mutex_unlock(&mdsc->mutex);
4027  		doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds);
4028  		return;  /* dup reply? */
4029  	}
4030  
4031  	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4032  		doutc(cl, "forward tid %llu aborted, unregistering\n", tid);
4033  		__unregister_request(mdsc, req);
4034  	} else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
4035  		/*
4036  		 * Avoid inifinite retrying after overflow.
4037  		 *
4038  		 * The MDS will increase the fwd count and in client side
4039  		 * if the num_fwd is less than the one saved in request
4040  		 * that means the MDS is an old version and overflowed of
4041  		 * 8 bits.
4042  		 */
4043  		mutex_lock(&req->r_fill_mutex);
4044  		req->r_err = -EMULTIHOP;
4045  		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
4046  		mutex_unlock(&req->r_fill_mutex);
4047  		aborted = true;
4048  		pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n",
4049  					   tid);
4050  	} else {
4051  		/* resend. forward race not possible; mds would drop */
4052  		doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds);
4053  		BUG_ON(req->r_err);
4054  		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
4055  		req->r_attempts = 0;
4056  		req->r_num_fwd = fwd_seq;
4057  		req->r_resend_mds = next_mds;
4058  		put_request_session(req);
4059  		__do_request(mdsc, req);
4060  	}
4061  	mutex_unlock(&mdsc->mutex);
4062  
4063  	/* kick calling process */
4064  	if (aborted)
4065  		complete_request(mdsc, req);
4066  	ceph_mdsc_put_request(req);
4067  	return;
4068  
4069  bad:
4070  	pr_err_client(cl, "decode error err=%d\n", err);
4071  	ceph_msg_dump(msg);
4072  }
4073  
__decode_session_metadata(void ** p,void * end,bool * blocklisted)4074  static int __decode_session_metadata(void **p, void *end,
4075  				     bool *blocklisted)
4076  {
4077  	/* map<string,string> */
4078  	u32 n;
4079  	bool err_str;
4080  	ceph_decode_32_safe(p, end, n, bad);
4081  	while (n-- > 0) {
4082  		u32 len;
4083  		ceph_decode_32_safe(p, end, len, bad);
4084  		ceph_decode_need(p, end, len, bad);
4085  		err_str = !strncmp(*p, "error_string", len);
4086  		*p += len;
4087  		ceph_decode_32_safe(p, end, len, bad);
4088  		ceph_decode_need(p, end, len, bad);
4089  		/*
4090  		 * Match "blocklisted (blacklisted)" from newer MDSes,
4091  		 * or "blacklisted" from older MDSes.
4092  		 */
4093  		if (err_str && strnstr(*p, "blacklisted", len))
4094  			*blocklisted = true;
4095  		*p += len;
4096  	}
4097  	return 0;
4098  bad:
4099  	return -1;
4100  }
4101  
4102  /*
4103   * handle a mds session control message
4104   */
handle_session(struct ceph_mds_session * session,struct ceph_msg * msg)4105  static void handle_session(struct ceph_mds_session *session,
4106  			   struct ceph_msg *msg)
4107  {
4108  	struct ceph_mds_client *mdsc = session->s_mdsc;
4109  	struct ceph_client *cl = mdsc->fsc->client;
4110  	int mds = session->s_mds;
4111  	int msg_version = le16_to_cpu(msg->hdr.version);
4112  	void *p = msg->front.iov_base;
4113  	void *end = p + msg->front.iov_len;
4114  	struct ceph_mds_session_head *h;
4115  	struct ceph_mds_cap_auth *cap_auths = NULL;
4116  	u32 op, cap_auths_num = 0;
4117  	u64 seq, features = 0;
4118  	int wake = 0;
4119  	bool blocklisted = false;
4120  	u32 i;
4121  
4122  
4123  	/* decode */
4124  	ceph_decode_need(&p, end, sizeof(*h), bad);
4125  	h = p;
4126  	p += sizeof(*h);
4127  
4128  	op = le32_to_cpu(h->op);
4129  	seq = le64_to_cpu(h->seq);
4130  
4131  	if (msg_version >= 3) {
4132  		u32 len;
4133  		/* version >= 2 and < 5, decode metadata, skip otherwise
4134  		 * as it's handled via flags.
4135  		 */
4136  		if (msg_version >= 5)
4137  			ceph_decode_skip_map(&p, end, string, string, bad);
4138  		else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
4139  			goto bad;
4140  
4141  		/* version >= 3, feature bits */
4142  		ceph_decode_32_safe(&p, end, len, bad);
4143  		if (len) {
4144  			ceph_decode_64_safe(&p, end, features, bad);
4145  			p += len - sizeof(features);
4146  		}
4147  	}
4148  
4149  	if (msg_version >= 5) {
4150  		u32 flags, len;
4151  
4152  		/* version >= 4 */
4153  		ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
4154  		ceph_decode_32_safe(&p, end, len, bad); /* len */
4155  		ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
4156  
4157  		/* version >= 5, flags   */
4158  		ceph_decode_32_safe(&p, end, flags, bad);
4159  		if (flags & CEPH_SESSION_BLOCKLISTED) {
4160  			pr_warn_client(cl, "mds%d session blocklisted\n",
4161  				       session->s_mds);
4162  			blocklisted = true;
4163  		}
4164  	}
4165  
4166  	if (msg_version >= 6) {
4167  		ceph_decode_32_safe(&p, end, cap_auths_num, bad);
4168  		doutc(cl, "cap_auths_num %d\n", cap_auths_num);
4169  
4170  		if (cap_auths_num && op != CEPH_SESSION_OPEN) {
4171  			WARN_ON_ONCE(op != CEPH_SESSION_OPEN);
4172  			goto skip_cap_auths;
4173  		}
4174  
4175  		cap_auths = kcalloc(cap_auths_num,
4176  				    sizeof(struct ceph_mds_cap_auth),
4177  				    GFP_KERNEL);
4178  		if (!cap_auths) {
4179  			pr_err_client(cl, "No memory for cap_auths\n");
4180  			return;
4181  		}
4182  
4183  		for (i = 0; i < cap_auths_num; i++) {
4184  			u32 _len, j;
4185  
4186  			/* struct_v, struct_compat, and struct_len in MDSCapAuth */
4187  			ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4188  
4189  			/* struct_v, struct_compat, and struct_len in MDSCapMatch */
4190  			ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4191  			ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad);
4192  			ceph_decode_32_safe(&p, end, _len, bad);
4193  			if (_len) {
4194  				cap_auths[i].match.gids = kcalloc(_len, sizeof(u32),
4195  								  GFP_KERNEL);
4196  				if (!cap_auths[i].match.gids) {
4197  					pr_err_client(cl, "No memory for gids\n");
4198  					goto fail;
4199  				}
4200  
4201  				cap_auths[i].match.num_gids = _len;
4202  				for (j = 0; j < _len; j++)
4203  					ceph_decode_32_safe(&p, end,
4204  							    cap_auths[i].match.gids[j],
4205  							    bad);
4206  			}
4207  
4208  			ceph_decode_32_safe(&p, end, _len, bad);
4209  			if (_len) {
4210  				cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char),
4211  								  GFP_KERNEL);
4212  				if (!cap_auths[i].match.path) {
4213  					pr_err_client(cl, "No memory for path\n");
4214  					goto fail;
4215  				}
4216  				ceph_decode_copy(&p, cap_auths[i].match.path, _len);
4217  
4218  				/* Remove the tailing '/' */
4219  				while (_len && cap_auths[i].match.path[_len - 1] == '/') {
4220  					cap_auths[i].match.path[_len - 1] = '\0';
4221  					_len -= 1;
4222  				}
4223  			}
4224  
4225  			ceph_decode_32_safe(&p, end, _len, bad);
4226  			if (_len) {
4227  				cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char),
4228  								     GFP_KERNEL);
4229  				if (!cap_auths[i].match.fs_name) {
4230  					pr_err_client(cl, "No memory for fs_name\n");
4231  					goto fail;
4232  				}
4233  				ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len);
4234  			}
4235  
4236  			ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad);
4237  			ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad);
4238  			ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad);
4239  			doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n",
4240  			      cap_auths[i].match.uid, cap_auths[i].match.num_gids,
4241  			      cap_auths[i].match.path, cap_auths[i].match.fs_name,
4242  			      cap_auths[i].match.root_squash,
4243  			      cap_auths[i].readable, cap_auths[i].writeable);
4244  		}
4245  	}
4246  
4247  skip_cap_auths:
4248  	mutex_lock(&mdsc->mutex);
4249  	if (op == CEPH_SESSION_OPEN) {
4250  		if (mdsc->s_cap_auths) {
4251  			for (i = 0; i < mdsc->s_cap_auths_num; i++) {
4252  				kfree(mdsc->s_cap_auths[i].match.gids);
4253  				kfree(mdsc->s_cap_auths[i].match.path);
4254  				kfree(mdsc->s_cap_auths[i].match.fs_name);
4255  			}
4256  			kfree(mdsc->s_cap_auths);
4257  		}
4258  		mdsc->s_cap_auths_num = cap_auths_num;
4259  		mdsc->s_cap_auths = cap_auths;
4260  	}
4261  	if (op == CEPH_SESSION_CLOSE) {
4262  		ceph_get_mds_session(session);
4263  		__unregister_session(mdsc, session);
4264  	}
4265  	/* FIXME: this ttl calculation is generous */
4266  	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
4267  	mutex_unlock(&mdsc->mutex);
4268  
4269  	mutex_lock(&session->s_mutex);
4270  
4271  	doutc(cl, "mds%d %s %p state %s seq %llu\n", mds,
4272  	      ceph_session_op_name(op), session,
4273  	      ceph_session_state_name(session->s_state), seq);
4274  
4275  	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
4276  		session->s_state = CEPH_MDS_SESSION_OPEN;
4277  		pr_info_client(cl, "mds%d came back\n", session->s_mds);
4278  	}
4279  
4280  	switch (op) {
4281  	case CEPH_SESSION_OPEN:
4282  		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4283  			pr_info_client(cl, "mds%d reconnect success\n",
4284  				       session->s_mds);
4285  
4286  		session->s_features = features;
4287  		if (session->s_state == CEPH_MDS_SESSION_OPEN) {
4288  			pr_notice_client(cl, "mds%d is already opened\n",
4289  					 session->s_mds);
4290  		} else {
4291  			session->s_state = CEPH_MDS_SESSION_OPEN;
4292  			renewed_caps(mdsc, session, 0);
4293  			if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4294  				     &session->s_features))
4295  				metric_schedule_delayed(&mdsc->metric);
4296  		}
4297  
4298  		/*
4299  		 * The connection maybe broken and the session in client
4300  		 * side has been reinitialized, need to update the seq
4301  		 * anyway.
4302  		 */
4303  		if (!session->s_seq && seq)
4304  			session->s_seq = seq;
4305  
4306  		wake = 1;
4307  		if (mdsc->stopping)
4308  			__close_session(mdsc, session);
4309  		break;
4310  
4311  	case CEPH_SESSION_RENEWCAPS:
4312  		if (session->s_renew_seq == seq)
4313  			renewed_caps(mdsc, session, 1);
4314  		break;
4315  
4316  	case CEPH_SESSION_CLOSE:
4317  		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4318  			pr_info_client(cl, "mds%d reconnect denied\n",
4319  				       session->s_mds);
4320  		session->s_state = CEPH_MDS_SESSION_CLOSED;
4321  		cleanup_session_requests(mdsc, session);
4322  		remove_session_caps(session);
4323  		wake = 2; /* for good measure */
4324  		wake_up_all(&mdsc->session_close_wq);
4325  		break;
4326  
4327  	case CEPH_SESSION_STALE:
4328  		pr_info_client(cl, "mds%d caps went stale, renewing\n",
4329  			       session->s_mds);
4330  		atomic_inc(&session->s_cap_gen);
4331  		session->s_cap_ttl = jiffies - 1;
4332  		send_renew_caps(mdsc, session);
4333  		break;
4334  
4335  	case CEPH_SESSION_RECALL_STATE:
4336  		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4337  		break;
4338  
4339  	case CEPH_SESSION_FLUSHMSG:
4340  		/* flush cap releases */
4341  		spin_lock(&session->s_cap_lock);
4342  		if (session->s_num_cap_releases)
4343  			ceph_flush_session_cap_releases(mdsc, session);
4344  		spin_unlock(&session->s_cap_lock);
4345  
4346  		send_flushmsg_ack(mdsc, session, seq);
4347  		break;
4348  
4349  	case CEPH_SESSION_FORCE_RO:
4350  		doutc(cl, "force_session_readonly %p\n", session);
4351  		spin_lock(&session->s_cap_lock);
4352  		session->s_readonly = true;
4353  		spin_unlock(&session->s_cap_lock);
4354  		wake_up_session_caps(session, FORCE_RO);
4355  		break;
4356  
4357  	case CEPH_SESSION_REJECT:
4358  		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4359  		pr_info_client(cl, "mds%d rejected session\n",
4360  			       session->s_mds);
4361  		session->s_state = CEPH_MDS_SESSION_REJECTED;
4362  		cleanup_session_requests(mdsc, session);
4363  		remove_session_caps(session);
4364  		if (blocklisted)
4365  			mdsc->fsc->blocklisted = true;
4366  		wake = 2; /* for good measure */
4367  		break;
4368  
4369  	default:
4370  		pr_err_client(cl, "bad op %d mds%d\n", op, mds);
4371  		WARN_ON(1);
4372  	}
4373  
4374  	mutex_unlock(&session->s_mutex);
4375  	if (wake) {
4376  		mutex_lock(&mdsc->mutex);
4377  		__wake_requests(mdsc, &session->s_waiting);
4378  		if (wake == 2)
4379  			kick_requests(mdsc, mds);
4380  		mutex_unlock(&mdsc->mutex);
4381  	}
4382  	if (op == CEPH_SESSION_CLOSE)
4383  		ceph_put_mds_session(session);
4384  	return;
4385  
4386  bad:
4387  	pr_err_client(cl, "corrupt message mds%d len %d\n", mds,
4388  		      (int)msg->front.iov_len);
4389  	ceph_msg_dump(msg);
4390  fail:
4391  	for (i = 0; i < cap_auths_num; i++) {
4392  		kfree(cap_auths[i].match.gids);
4393  		kfree(cap_auths[i].match.path);
4394  		kfree(cap_auths[i].match.fs_name);
4395  	}
4396  	kfree(cap_auths);
4397  	return;
4398  }
4399  
ceph_mdsc_release_dir_caps(struct ceph_mds_request * req)4400  void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4401  {
4402  	struct ceph_client *cl = req->r_mdsc->fsc->client;
4403  	int dcaps;
4404  
4405  	dcaps = xchg(&req->r_dir_caps, 0);
4406  	if (dcaps) {
4407  		doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4408  		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4409  	}
4410  }
4411  
ceph_mdsc_release_dir_caps_async(struct ceph_mds_request * req)4412  void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req)
4413  {
4414  	struct ceph_client *cl = req->r_mdsc->fsc->client;
4415  	int dcaps;
4416  
4417  	dcaps = xchg(&req->r_dir_caps, 0);
4418  	if (dcaps) {
4419  		doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4420  		ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps);
4421  	}
4422  }
4423  
4424  /*
4425   * called under session->mutex.
4426   */
replay_unsafe_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)4427  static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4428  				   struct ceph_mds_session *session)
4429  {
4430  	struct ceph_mds_request *req, *nreq;
4431  	struct rb_node *p;
4432  
4433  	doutc(mdsc->fsc->client, "mds%d\n", session->s_mds);
4434  
4435  	mutex_lock(&mdsc->mutex);
4436  	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4437  		__send_request(session, req, true);
4438  
4439  	/*
4440  	 * also re-send old requests when MDS enters reconnect stage. So that MDS
4441  	 * can process completed request in clientreplay stage.
4442  	 */
4443  	p = rb_first(&mdsc->request_tree);
4444  	while (p) {
4445  		req = rb_entry(p, struct ceph_mds_request, r_node);
4446  		p = rb_next(p);
4447  		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4448  			continue;
4449  		if (req->r_attempts == 0)
4450  			continue; /* only old requests */
4451  		if (!req->r_session)
4452  			continue;
4453  		if (req->r_session->s_mds != session->s_mds)
4454  			continue;
4455  
4456  		ceph_mdsc_release_dir_caps_async(req);
4457  
4458  		__send_request(session, req, true);
4459  	}
4460  	mutex_unlock(&mdsc->mutex);
4461  }
4462  
send_reconnect_partial(struct ceph_reconnect_state * recon_state)4463  static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4464  {
4465  	struct ceph_msg *reply;
4466  	struct ceph_pagelist *_pagelist;
4467  	struct page *page;
4468  	__le32 *addr;
4469  	int err = -ENOMEM;
4470  
4471  	if (!recon_state->allow_multi)
4472  		return -ENOSPC;
4473  
4474  	/* can't handle message that contains both caps and realm */
4475  	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4476  
4477  	/* pre-allocate new pagelist */
4478  	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
4479  	if (!_pagelist)
4480  		return -ENOMEM;
4481  
4482  	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4483  	if (!reply)
4484  		goto fail_msg;
4485  
4486  	/* placeholder for nr_caps */
4487  	err = ceph_pagelist_encode_32(_pagelist, 0);
4488  	if (err < 0)
4489  		goto fail;
4490  
4491  	if (recon_state->nr_caps) {
4492  		/* currently encoding caps */
4493  		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4494  		if (err)
4495  			goto fail;
4496  	} else {
4497  		/* placeholder for nr_realms (currently encoding relams) */
4498  		err = ceph_pagelist_encode_32(_pagelist, 0);
4499  		if (err < 0)
4500  			goto fail;
4501  	}
4502  
4503  	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4504  	if (err)
4505  		goto fail;
4506  
4507  	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4508  	addr = kmap_atomic(page);
4509  	if (recon_state->nr_caps) {
4510  		/* currently encoding caps */
4511  		*addr = cpu_to_le32(recon_state->nr_caps);
4512  	} else {
4513  		/* currently encoding relams */
4514  		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4515  	}
4516  	kunmap_atomic(addr);
4517  
4518  	reply->hdr.version = cpu_to_le16(5);
4519  	reply->hdr.compat_version = cpu_to_le16(4);
4520  
4521  	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4522  	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4523  
4524  	ceph_con_send(&recon_state->session->s_con, reply);
4525  	ceph_pagelist_release(recon_state->pagelist);
4526  
4527  	recon_state->pagelist = _pagelist;
4528  	recon_state->nr_caps = 0;
4529  	recon_state->nr_realms = 0;
4530  	recon_state->msg_version = 5;
4531  	return 0;
4532  fail:
4533  	ceph_msg_put(reply);
4534  fail_msg:
4535  	ceph_pagelist_release(_pagelist);
4536  	return err;
4537  }
4538  
d_find_primary(struct inode * inode)4539  static struct dentry* d_find_primary(struct inode *inode)
4540  {
4541  	struct dentry *alias, *dn = NULL;
4542  
4543  	if (hlist_empty(&inode->i_dentry))
4544  		return NULL;
4545  
4546  	spin_lock(&inode->i_lock);
4547  	if (hlist_empty(&inode->i_dentry))
4548  		goto out_unlock;
4549  
4550  	if (S_ISDIR(inode->i_mode)) {
4551  		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4552  		if (!IS_ROOT(alias))
4553  			dn = dget(alias);
4554  		goto out_unlock;
4555  	}
4556  
4557  	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4558  		spin_lock(&alias->d_lock);
4559  		if (!d_unhashed(alias) &&
4560  		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4561  			dn = dget_dlock(alias);
4562  		}
4563  		spin_unlock(&alias->d_lock);
4564  		if (dn)
4565  			break;
4566  	}
4567  out_unlock:
4568  	spin_unlock(&inode->i_lock);
4569  	return dn;
4570  }
4571  
4572  /*
4573   * Encode information about a cap for a reconnect with the MDS.
4574   */
reconnect_caps_cb(struct inode * inode,int mds,void * arg)4575  static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4576  {
4577  	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
4578  	struct ceph_client *cl = ceph_inode_to_client(inode);
4579  	union {
4580  		struct ceph_mds_cap_reconnect v2;
4581  		struct ceph_mds_cap_reconnect_v1 v1;
4582  	} rec;
4583  	struct ceph_inode_info *ci = ceph_inode(inode);
4584  	struct ceph_reconnect_state *recon_state = arg;
4585  	struct ceph_pagelist *pagelist = recon_state->pagelist;
4586  	struct dentry *dentry;
4587  	struct ceph_cap *cap;
4588  	char *path;
4589  	int pathlen = 0, err;
4590  	u64 pathbase;
4591  	u64 snap_follows;
4592  
4593  	dentry = d_find_primary(inode);
4594  	if (dentry) {
4595  		/* set pathbase to parent dir when msg_version >= 2 */
4596  		path = ceph_mdsc_build_path(mdsc, dentry, &pathlen, &pathbase,
4597  					    recon_state->msg_version >= 2);
4598  		dput(dentry);
4599  		if (IS_ERR(path)) {
4600  			err = PTR_ERR(path);
4601  			goto out_err;
4602  		}
4603  	} else {
4604  		path = NULL;
4605  		pathbase = 0;
4606  	}
4607  
4608  	spin_lock(&ci->i_ceph_lock);
4609  	cap = __get_cap_for_mds(ci, mds);
4610  	if (!cap) {
4611  		spin_unlock(&ci->i_ceph_lock);
4612  		err = 0;
4613  		goto out_err;
4614  	}
4615  	doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode,
4616  	      ceph_vinop(inode), cap, cap->cap_id,
4617  	      ceph_cap_string(cap->issued));
4618  
4619  	cap->seq = 0;        /* reset cap seq */
4620  	cap->issue_seq = 0;  /* and issue_seq */
4621  	cap->mseq = 0;       /* and migrate_seq */
4622  	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4623  
4624  	/* These are lost when the session goes away */
4625  	if (S_ISDIR(inode->i_mode)) {
4626  		if (cap->issued & CEPH_CAP_DIR_CREATE) {
4627  			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4628  			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4629  		}
4630  		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4631  	}
4632  
4633  	if (recon_state->msg_version >= 2) {
4634  		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4635  		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4636  		rec.v2.issued = cpu_to_le32(cap->issued);
4637  		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4638  		rec.v2.pathbase = cpu_to_le64(pathbase);
4639  		rec.v2.flock_len = (__force __le32)
4640  			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4641  	} else {
4642  		struct timespec64 ts;
4643  
4644  		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4645  		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4646  		rec.v1.issued = cpu_to_le32(cap->issued);
4647  		rec.v1.size = cpu_to_le64(i_size_read(inode));
4648  		ts = inode_get_mtime(inode);
4649  		ceph_encode_timespec64(&rec.v1.mtime, &ts);
4650  		ts = inode_get_atime(inode);
4651  		ceph_encode_timespec64(&rec.v1.atime, &ts);
4652  		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4653  		rec.v1.pathbase = cpu_to_le64(pathbase);
4654  	}
4655  
4656  	if (list_empty(&ci->i_cap_snaps)) {
4657  		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4658  	} else {
4659  		struct ceph_cap_snap *capsnap =
4660  			list_first_entry(&ci->i_cap_snaps,
4661  					 struct ceph_cap_snap, ci_item);
4662  		snap_follows = capsnap->follows;
4663  	}
4664  	spin_unlock(&ci->i_ceph_lock);
4665  
4666  	if (recon_state->msg_version >= 2) {
4667  		int num_fcntl_locks, num_flock_locks;
4668  		struct ceph_filelock *flocks = NULL;
4669  		size_t struct_len, total_len = sizeof(u64);
4670  		u8 struct_v = 0;
4671  
4672  encode_again:
4673  		if (rec.v2.flock_len) {
4674  			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4675  		} else {
4676  			num_fcntl_locks = 0;
4677  			num_flock_locks = 0;
4678  		}
4679  		if (num_fcntl_locks + num_flock_locks > 0) {
4680  			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4681  					       sizeof(struct ceph_filelock),
4682  					       GFP_NOFS);
4683  			if (!flocks) {
4684  				err = -ENOMEM;
4685  				goto out_err;
4686  			}
4687  			err = ceph_encode_locks_to_buffer(inode, flocks,
4688  							  num_fcntl_locks,
4689  							  num_flock_locks);
4690  			if (err) {
4691  				kfree(flocks);
4692  				flocks = NULL;
4693  				if (err == -ENOSPC)
4694  					goto encode_again;
4695  				goto out_err;
4696  			}
4697  		} else {
4698  			kfree(flocks);
4699  			flocks = NULL;
4700  		}
4701  
4702  		if (recon_state->msg_version >= 3) {
4703  			/* version, compat_version and struct_len */
4704  			total_len += 2 * sizeof(u8) + sizeof(u32);
4705  			struct_v = 2;
4706  		}
4707  		/*
4708  		 * number of encoded locks is stable, so copy to pagelist
4709  		 */
4710  		struct_len = 2 * sizeof(u32) +
4711  			    (num_fcntl_locks + num_flock_locks) *
4712  			    sizeof(struct ceph_filelock);
4713  		rec.v2.flock_len = cpu_to_le32(struct_len);
4714  
4715  		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
4716  
4717  		if (struct_v >= 2)
4718  			struct_len += sizeof(u64); /* snap_follows */
4719  
4720  		total_len += struct_len;
4721  
4722  		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4723  			err = send_reconnect_partial(recon_state);
4724  			if (err)
4725  				goto out_freeflocks;
4726  			pagelist = recon_state->pagelist;
4727  		}
4728  
4729  		err = ceph_pagelist_reserve(pagelist, total_len);
4730  		if (err)
4731  			goto out_freeflocks;
4732  
4733  		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4734  		if (recon_state->msg_version >= 3) {
4735  			ceph_pagelist_encode_8(pagelist, struct_v);
4736  			ceph_pagelist_encode_8(pagelist, 1);
4737  			ceph_pagelist_encode_32(pagelist, struct_len);
4738  		}
4739  		ceph_pagelist_encode_string(pagelist, path, pathlen);
4740  		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4741  		ceph_locks_to_pagelist(flocks, pagelist,
4742  				       num_fcntl_locks, num_flock_locks);
4743  		if (struct_v >= 2)
4744  			ceph_pagelist_encode_64(pagelist, snap_follows);
4745  out_freeflocks:
4746  		kfree(flocks);
4747  	} else {
4748  		err = ceph_pagelist_reserve(pagelist,
4749  					    sizeof(u64) + sizeof(u32) +
4750  					    pathlen + sizeof(rec.v1));
4751  		if (err)
4752  			goto out_err;
4753  
4754  		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4755  		ceph_pagelist_encode_string(pagelist, path, pathlen);
4756  		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4757  	}
4758  
4759  out_err:
4760  	ceph_mdsc_free_path(path, pathlen);
4761  	if (!err)
4762  		recon_state->nr_caps++;
4763  	return err;
4764  }
4765  
encode_snap_realms(struct ceph_mds_client * mdsc,struct ceph_reconnect_state * recon_state)4766  static int encode_snap_realms(struct ceph_mds_client *mdsc,
4767  			      struct ceph_reconnect_state *recon_state)
4768  {
4769  	struct rb_node *p;
4770  	struct ceph_pagelist *pagelist = recon_state->pagelist;
4771  	struct ceph_client *cl = mdsc->fsc->client;
4772  	int err = 0;
4773  
4774  	if (recon_state->msg_version >= 4) {
4775  		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4776  		if (err < 0)
4777  			goto fail;
4778  	}
4779  
4780  	/*
4781  	 * snaprealms.  we provide mds with the ino, seq (version), and
4782  	 * parent for all of our realms.  If the mds has any newer info,
4783  	 * it will tell us.
4784  	 */
4785  	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4786  		struct ceph_snap_realm *realm =
4787  		       rb_entry(p, struct ceph_snap_realm, node);
4788  		struct ceph_mds_snaprealm_reconnect sr_rec;
4789  
4790  		if (recon_state->msg_version >= 4) {
4791  			size_t need = sizeof(u8) * 2 + sizeof(u32) +
4792  				      sizeof(sr_rec);
4793  
4794  			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4795  				err = send_reconnect_partial(recon_state);
4796  				if (err)
4797  					goto fail;
4798  				pagelist = recon_state->pagelist;
4799  			}
4800  
4801  			err = ceph_pagelist_reserve(pagelist, need);
4802  			if (err)
4803  				goto fail;
4804  
4805  			ceph_pagelist_encode_8(pagelist, 1);
4806  			ceph_pagelist_encode_8(pagelist, 1);
4807  			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4808  		}
4809  
4810  		doutc(cl, " adding snap realm %llx seq %lld parent %llx\n",
4811  		      realm->ino, realm->seq, realm->parent_ino);
4812  		sr_rec.ino = cpu_to_le64(realm->ino);
4813  		sr_rec.seq = cpu_to_le64(realm->seq);
4814  		sr_rec.parent = cpu_to_le64(realm->parent_ino);
4815  
4816  		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4817  		if (err)
4818  			goto fail;
4819  
4820  		recon_state->nr_realms++;
4821  	}
4822  fail:
4823  	return err;
4824  }
4825  
4826  
4827  /*
4828   * If an MDS fails and recovers, clients need to reconnect in order to
4829   * reestablish shared state.  This includes all caps issued through
4830   * this session _and_ the snap_realm hierarchy.  Because it's not
4831   * clear which snap realms the mds cares about, we send everything we
4832   * know about.. that ensures we'll then get any new info the
4833   * recovering MDS might have.
4834   *
4835   * This is a relatively heavyweight operation, but it's rare.
4836   */
send_mds_reconnect(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)4837  static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4838  			       struct ceph_mds_session *session)
4839  {
4840  	struct ceph_client *cl = mdsc->fsc->client;
4841  	struct ceph_msg *reply;
4842  	int mds = session->s_mds;
4843  	int err = -ENOMEM;
4844  	struct ceph_reconnect_state recon_state = {
4845  		.session = session,
4846  	};
4847  	LIST_HEAD(dispose);
4848  
4849  	pr_info_client(cl, "mds%d reconnect start\n", mds);
4850  
4851  	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4852  	if (!recon_state.pagelist)
4853  		goto fail_nopagelist;
4854  
4855  	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4856  	if (!reply)
4857  		goto fail_nomsg;
4858  
4859  	xa_destroy(&session->s_delegated_inos);
4860  
4861  	mutex_lock(&session->s_mutex);
4862  	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4863  	session->s_seq = 0;
4864  
4865  	doutc(cl, "session %p state %s\n", session,
4866  	      ceph_session_state_name(session->s_state));
4867  
4868  	atomic_inc(&session->s_cap_gen);
4869  
4870  	spin_lock(&session->s_cap_lock);
4871  	/* don't know if session is readonly */
4872  	session->s_readonly = 0;
4873  	/*
4874  	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4875  	 * If a cap get released before being added to the cap reconnect,
4876  	 * __ceph_remove_cap() should skip queuing cap release.
4877  	 */
4878  	session->s_cap_reconnect = 1;
4879  	/* drop old cap expires; we're about to reestablish that state */
4880  	detach_cap_releases(session, &dispose);
4881  	spin_unlock(&session->s_cap_lock);
4882  	dispose_cap_releases(mdsc, &dispose);
4883  
4884  	/* trim unused caps to reduce MDS's cache rejoin time */
4885  	if (mdsc->fsc->sb->s_root)
4886  		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4887  
4888  	ceph_con_close(&session->s_con);
4889  	ceph_con_open(&session->s_con,
4890  		      CEPH_ENTITY_TYPE_MDS, mds,
4891  		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4892  
4893  	/* replay unsafe requests */
4894  	replay_unsafe_requests(mdsc, session);
4895  
4896  	ceph_early_kick_flushing_caps(mdsc, session);
4897  
4898  	down_read(&mdsc->snap_rwsem);
4899  
4900  	/* placeholder for nr_caps */
4901  	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4902  	if (err)
4903  		goto fail;
4904  
4905  	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4906  		recon_state.msg_version = 3;
4907  		recon_state.allow_multi = true;
4908  	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4909  		recon_state.msg_version = 3;
4910  	} else {
4911  		recon_state.msg_version = 2;
4912  	}
4913  	/* traverse this session's caps */
4914  	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4915  
4916  	spin_lock(&session->s_cap_lock);
4917  	session->s_cap_reconnect = 0;
4918  	spin_unlock(&session->s_cap_lock);
4919  
4920  	if (err < 0)
4921  		goto fail;
4922  
4923  	/* check if all realms can be encoded into current message */
4924  	if (mdsc->num_snap_realms) {
4925  		size_t total_len =
4926  			recon_state.pagelist->length +
4927  			mdsc->num_snap_realms *
4928  			sizeof(struct ceph_mds_snaprealm_reconnect);
4929  		if (recon_state.msg_version >= 4) {
4930  			/* number of realms */
4931  			total_len += sizeof(u32);
4932  			/* version, compat_version and struct_len */
4933  			total_len += mdsc->num_snap_realms *
4934  				     (2 * sizeof(u8) + sizeof(u32));
4935  		}
4936  		if (total_len > RECONNECT_MAX_SIZE) {
4937  			if (!recon_state.allow_multi) {
4938  				err = -ENOSPC;
4939  				goto fail;
4940  			}
4941  			if (recon_state.nr_caps) {
4942  				err = send_reconnect_partial(&recon_state);
4943  				if (err)
4944  					goto fail;
4945  			}
4946  			recon_state.msg_version = 5;
4947  		}
4948  	}
4949  
4950  	err = encode_snap_realms(mdsc, &recon_state);
4951  	if (err < 0)
4952  		goto fail;
4953  
4954  	if (recon_state.msg_version >= 5) {
4955  		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4956  		if (err < 0)
4957  			goto fail;
4958  	}
4959  
4960  	if (recon_state.nr_caps || recon_state.nr_realms) {
4961  		struct page *page =
4962  			list_first_entry(&recon_state.pagelist->head,
4963  					struct page, lru);
4964  		__le32 *addr = kmap_atomic(page);
4965  		if (recon_state.nr_caps) {
4966  			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4967  			*addr = cpu_to_le32(recon_state.nr_caps);
4968  		} else if (recon_state.msg_version >= 4) {
4969  			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4970  		}
4971  		kunmap_atomic(addr);
4972  	}
4973  
4974  	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4975  	if (recon_state.msg_version >= 4)
4976  		reply->hdr.compat_version = cpu_to_le16(4);
4977  
4978  	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4979  	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4980  
4981  	ceph_con_send(&session->s_con, reply);
4982  
4983  	mutex_unlock(&session->s_mutex);
4984  
4985  	mutex_lock(&mdsc->mutex);
4986  	__wake_requests(mdsc, &session->s_waiting);
4987  	mutex_unlock(&mdsc->mutex);
4988  
4989  	up_read(&mdsc->snap_rwsem);
4990  	ceph_pagelist_release(recon_state.pagelist);
4991  	return;
4992  
4993  fail:
4994  	ceph_msg_put(reply);
4995  	up_read(&mdsc->snap_rwsem);
4996  	mutex_unlock(&session->s_mutex);
4997  fail_nomsg:
4998  	ceph_pagelist_release(recon_state.pagelist);
4999  fail_nopagelist:
5000  	pr_err_client(cl, "error %d preparing reconnect for mds%d\n",
5001  		      err, mds);
5002  	return;
5003  }
5004  
5005  
5006  /*
5007   * compare old and new mdsmaps, kicking requests
5008   * and closing out old connections as necessary
5009   *
5010   * called under mdsc->mutex.
5011   */
check_new_map(struct ceph_mds_client * mdsc,struct ceph_mdsmap * newmap,struct ceph_mdsmap * oldmap)5012  static void check_new_map(struct ceph_mds_client *mdsc,
5013  			  struct ceph_mdsmap *newmap,
5014  			  struct ceph_mdsmap *oldmap)
5015  {
5016  	int i, j, err;
5017  	int oldstate, newstate;
5018  	struct ceph_mds_session *s;
5019  	unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
5020  	struct ceph_client *cl = mdsc->fsc->client;
5021  
5022  	doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch);
5023  
5024  	if (newmap->m_info) {
5025  		for (i = 0; i < newmap->possible_max_rank; i++) {
5026  			for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
5027  				set_bit(newmap->m_info[i].export_targets[j], targets);
5028  		}
5029  	}
5030  
5031  	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5032  		if (!mdsc->sessions[i])
5033  			continue;
5034  		s = mdsc->sessions[i];
5035  		oldstate = ceph_mdsmap_get_state(oldmap, i);
5036  		newstate = ceph_mdsmap_get_state(newmap, i);
5037  
5038  		doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n",
5039  		      i, ceph_mds_state_name(oldstate),
5040  		      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
5041  		      ceph_mds_state_name(newstate),
5042  		      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
5043  		      ceph_session_state_name(s->s_state));
5044  
5045  		if (i >= newmap->possible_max_rank) {
5046  			/* force close session for stopped mds */
5047  			ceph_get_mds_session(s);
5048  			__unregister_session(mdsc, s);
5049  			__wake_requests(mdsc, &s->s_waiting);
5050  			mutex_unlock(&mdsc->mutex);
5051  
5052  			mutex_lock(&s->s_mutex);
5053  			cleanup_session_requests(mdsc, s);
5054  			remove_session_caps(s);
5055  			mutex_unlock(&s->s_mutex);
5056  
5057  			ceph_put_mds_session(s);
5058  
5059  			mutex_lock(&mdsc->mutex);
5060  			kick_requests(mdsc, i);
5061  			continue;
5062  		}
5063  
5064  		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
5065  			   ceph_mdsmap_get_addr(newmap, i),
5066  			   sizeof(struct ceph_entity_addr))) {
5067  			/* just close it */
5068  			mutex_unlock(&mdsc->mutex);
5069  			mutex_lock(&s->s_mutex);
5070  			mutex_lock(&mdsc->mutex);
5071  			ceph_con_close(&s->s_con);
5072  			mutex_unlock(&s->s_mutex);
5073  			s->s_state = CEPH_MDS_SESSION_RESTARTING;
5074  		} else if (oldstate == newstate) {
5075  			continue;  /* nothing new with this mds */
5076  		}
5077  
5078  		/*
5079  		 * send reconnect?
5080  		 */
5081  		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
5082  		    newstate >= CEPH_MDS_STATE_RECONNECT) {
5083  			mutex_unlock(&mdsc->mutex);
5084  			clear_bit(i, targets);
5085  			send_mds_reconnect(mdsc, s);
5086  			mutex_lock(&mdsc->mutex);
5087  		}
5088  
5089  		/*
5090  		 * kick request on any mds that has gone active.
5091  		 */
5092  		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
5093  		    newstate >= CEPH_MDS_STATE_ACTIVE) {
5094  			if (oldstate != CEPH_MDS_STATE_CREATING &&
5095  			    oldstate != CEPH_MDS_STATE_STARTING)
5096  				pr_info_client(cl, "mds%d recovery completed\n",
5097  					       s->s_mds);
5098  			kick_requests(mdsc, i);
5099  			mutex_unlock(&mdsc->mutex);
5100  			mutex_lock(&s->s_mutex);
5101  			mutex_lock(&mdsc->mutex);
5102  			ceph_kick_flushing_caps(mdsc, s);
5103  			mutex_unlock(&s->s_mutex);
5104  			wake_up_session_caps(s, RECONNECT);
5105  		}
5106  	}
5107  
5108  	/*
5109  	 * Only open and reconnect sessions that don't exist yet.
5110  	 */
5111  	for (i = 0; i < newmap->possible_max_rank; i++) {
5112  		/*
5113  		 * In case the import MDS is crashed just after
5114  		 * the EImportStart journal is flushed, so when
5115  		 * a standby MDS takes over it and is replaying
5116  		 * the EImportStart journal the new MDS daemon
5117  		 * will wait the client to reconnect it, but the
5118  		 * client may never register/open the session yet.
5119  		 *
5120  		 * Will try to reconnect that MDS daemon if the
5121  		 * rank number is in the export targets array and
5122  		 * is the up:reconnect state.
5123  		 */
5124  		newstate = ceph_mdsmap_get_state(newmap, i);
5125  		if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
5126  			continue;
5127  
5128  		/*
5129  		 * The session maybe registered and opened by some
5130  		 * requests which were choosing random MDSes during
5131  		 * the mdsc->mutex's unlock/lock gap below in rare
5132  		 * case. But the related MDS daemon will just queue
5133  		 * that requests and be still waiting for the client's
5134  		 * reconnection request in up:reconnect state.
5135  		 */
5136  		s = __ceph_lookup_mds_session(mdsc, i);
5137  		if (likely(!s)) {
5138  			s = __open_export_target_session(mdsc, i);
5139  			if (IS_ERR(s)) {
5140  				err = PTR_ERR(s);
5141  				pr_err_client(cl,
5142  					      "failed to open export target session, err %d\n",
5143  					      err);
5144  				continue;
5145  			}
5146  		}
5147  		doutc(cl, "send reconnect to export target mds.%d\n", i);
5148  		mutex_unlock(&mdsc->mutex);
5149  		send_mds_reconnect(mdsc, s);
5150  		ceph_put_mds_session(s);
5151  		mutex_lock(&mdsc->mutex);
5152  	}
5153  
5154  	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5155  		s = mdsc->sessions[i];
5156  		if (!s)
5157  			continue;
5158  		if (!ceph_mdsmap_is_laggy(newmap, i))
5159  			continue;
5160  		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5161  		    s->s_state == CEPH_MDS_SESSION_HUNG ||
5162  		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
5163  			doutc(cl, " connecting to export targets of laggy mds%d\n", i);
5164  			__open_export_target_sessions(mdsc, s);
5165  		}
5166  	}
5167  }
5168  
5169  
5170  
5171  /*
5172   * leases
5173   */
5174  
5175  /*
5176   * caller must hold session s_mutex, dentry->d_lock
5177   */
__ceph_mdsc_drop_dentry_lease(struct dentry * dentry)5178  void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
5179  {
5180  	struct ceph_dentry_info *di = ceph_dentry(dentry);
5181  
5182  	ceph_put_mds_session(di->lease_session);
5183  	di->lease_session = NULL;
5184  }
5185  
handle_lease(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)5186  static void handle_lease(struct ceph_mds_client *mdsc,
5187  			 struct ceph_mds_session *session,
5188  			 struct ceph_msg *msg)
5189  {
5190  	struct ceph_client *cl = mdsc->fsc->client;
5191  	struct super_block *sb = mdsc->fsc->sb;
5192  	struct inode *inode;
5193  	struct dentry *parent, *dentry;
5194  	struct ceph_dentry_info *di;
5195  	int mds = session->s_mds;
5196  	struct ceph_mds_lease *h = msg->front.iov_base;
5197  	u32 seq;
5198  	struct ceph_vino vino;
5199  	struct qstr dname;
5200  	int release = 0;
5201  
5202  	doutc(cl, "from mds%d\n", mds);
5203  
5204  	if (!ceph_inc_mds_stopping_blocker(mdsc, session))
5205  		return;
5206  
5207  	/* decode */
5208  	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
5209  		goto bad;
5210  	vino.ino = le64_to_cpu(h->ino);
5211  	vino.snap = CEPH_NOSNAP;
5212  	seq = le32_to_cpu(h->seq);
5213  	dname.len = get_unaligned_le32(h + 1);
5214  	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
5215  		goto bad;
5216  	dname.name = (void *)(h + 1) + sizeof(u32);
5217  
5218  	/* lookup inode */
5219  	inode = ceph_find_inode(sb, vino);
5220  	doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action),
5221  	      vino.ino, inode, dname.len, dname.name);
5222  
5223  	mutex_lock(&session->s_mutex);
5224  	if (!inode) {
5225  		doutc(cl, "no inode %llx\n", vino.ino);
5226  		goto release;
5227  	}
5228  
5229  	/* dentry */
5230  	parent = d_find_alias(inode);
5231  	if (!parent) {
5232  		doutc(cl, "no parent dentry on inode %p\n", inode);
5233  		WARN_ON(1);
5234  		goto release;  /* hrm... */
5235  	}
5236  	dname.hash = full_name_hash(parent, dname.name, dname.len);
5237  	dentry = d_lookup(parent, &dname);
5238  	dput(parent);
5239  	if (!dentry)
5240  		goto release;
5241  
5242  	spin_lock(&dentry->d_lock);
5243  	di = ceph_dentry(dentry);
5244  	switch (h->action) {
5245  	case CEPH_MDS_LEASE_REVOKE:
5246  		if (di->lease_session == session) {
5247  			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
5248  				h->seq = cpu_to_le32(di->lease_seq);
5249  			__ceph_mdsc_drop_dentry_lease(dentry);
5250  		}
5251  		release = 1;
5252  		break;
5253  
5254  	case CEPH_MDS_LEASE_RENEW:
5255  		if (di->lease_session == session &&
5256  		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
5257  		    di->lease_renew_from &&
5258  		    di->lease_renew_after == 0) {
5259  			unsigned long duration =
5260  				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
5261  
5262  			di->lease_seq = seq;
5263  			di->time = di->lease_renew_from + duration;
5264  			di->lease_renew_after = di->lease_renew_from +
5265  				(duration >> 1);
5266  			di->lease_renew_from = 0;
5267  		}
5268  		break;
5269  	}
5270  	spin_unlock(&dentry->d_lock);
5271  	dput(dentry);
5272  
5273  	if (!release)
5274  		goto out;
5275  
5276  release:
5277  	/* let's just reuse the same message */
5278  	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
5279  	ceph_msg_get(msg);
5280  	ceph_con_send(&session->s_con, msg);
5281  
5282  out:
5283  	mutex_unlock(&session->s_mutex);
5284  	iput(inode);
5285  
5286  	ceph_dec_mds_stopping_blocker(mdsc);
5287  	return;
5288  
5289  bad:
5290  	ceph_dec_mds_stopping_blocker(mdsc);
5291  
5292  	pr_err_client(cl, "corrupt lease message\n");
5293  	ceph_msg_dump(msg);
5294  }
5295  
ceph_mdsc_lease_send_msg(struct ceph_mds_session * session,struct dentry * dentry,char action,u32 seq)5296  void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
5297  			      struct dentry *dentry, char action,
5298  			      u32 seq)
5299  {
5300  	struct ceph_client *cl = session->s_mdsc->fsc->client;
5301  	struct ceph_msg *msg;
5302  	struct ceph_mds_lease *lease;
5303  	struct inode *dir;
5304  	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
5305  
5306  	doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action),
5307  	      session->s_mds);
5308  
5309  	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
5310  	if (!msg)
5311  		return;
5312  	lease = msg->front.iov_base;
5313  	lease->action = action;
5314  	lease->seq = cpu_to_le32(seq);
5315  
5316  	spin_lock(&dentry->d_lock);
5317  	dir = d_inode(dentry->d_parent);
5318  	lease->ino = cpu_to_le64(ceph_ino(dir));
5319  	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5320  
5321  	put_unaligned_le32(dentry->d_name.len, lease + 1);
5322  	memcpy((void *)(lease + 1) + 4,
5323  	       dentry->d_name.name, dentry->d_name.len);
5324  	spin_unlock(&dentry->d_lock);
5325  
5326  	ceph_con_send(&session->s_con, msg);
5327  }
5328  
5329  /*
5330   * lock unlock the session, to wait ongoing session activities
5331   */
lock_unlock_session(struct ceph_mds_session * s)5332  static void lock_unlock_session(struct ceph_mds_session *s)
5333  {
5334  	mutex_lock(&s->s_mutex);
5335  	mutex_unlock(&s->s_mutex);
5336  }
5337  
maybe_recover_session(struct ceph_mds_client * mdsc)5338  static void maybe_recover_session(struct ceph_mds_client *mdsc)
5339  {
5340  	struct ceph_client *cl = mdsc->fsc->client;
5341  	struct ceph_fs_client *fsc = mdsc->fsc;
5342  
5343  	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5344  		return;
5345  
5346  	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5347  		return;
5348  
5349  	if (!READ_ONCE(fsc->blocklisted))
5350  		return;
5351  
5352  	pr_info_client(cl, "auto reconnect after blocklisted\n");
5353  	ceph_force_reconnect(fsc->sb);
5354  }
5355  
check_session_state(struct ceph_mds_session * s)5356  bool check_session_state(struct ceph_mds_session *s)
5357  {
5358  	struct ceph_client *cl = s->s_mdsc->fsc->client;
5359  
5360  	switch (s->s_state) {
5361  	case CEPH_MDS_SESSION_OPEN:
5362  		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5363  			s->s_state = CEPH_MDS_SESSION_HUNG;
5364  			pr_info_client(cl, "mds%d hung\n", s->s_mds);
5365  		}
5366  		break;
5367  	case CEPH_MDS_SESSION_CLOSING:
5368  	case CEPH_MDS_SESSION_NEW:
5369  	case CEPH_MDS_SESSION_RESTARTING:
5370  	case CEPH_MDS_SESSION_CLOSED:
5371  	case CEPH_MDS_SESSION_REJECTED:
5372  		return false;
5373  	}
5374  
5375  	return true;
5376  }
5377  
5378  /*
5379   * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5380   * then we need to retransmit that request.
5381   */
inc_session_sequence(struct ceph_mds_session * s)5382  void inc_session_sequence(struct ceph_mds_session *s)
5383  {
5384  	struct ceph_client *cl = s->s_mdsc->fsc->client;
5385  
5386  	lockdep_assert_held(&s->s_mutex);
5387  
5388  	s->s_seq++;
5389  
5390  	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5391  		int ret;
5392  
5393  		doutc(cl, "resending session close request for mds%d\n", s->s_mds);
5394  		ret = request_close_session(s);
5395  		if (ret < 0)
5396  			pr_err_client(cl, "unable to close session to mds%d: %d\n",
5397  				      s->s_mds, ret);
5398  	}
5399  }
5400  
5401  /*
5402   * delayed work -- periodically trim expired leases, renew caps with mds.  If
5403   * the @delay parameter is set to 0 or if it's more than 5 secs, the default
5404   * workqueue delay value of 5 secs will be used.
5405   */
schedule_delayed(struct ceph_mds_client * mdsc,unsigned long delay)5406  static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5407  {
5408  	unsigned long max_delay = HZ * 5;
5409  
5410  	/* 5 secs default delay */
5411  	if (!delay || (delay > max_delay))
5412  		delay = max_delay;
5413  	schedule_delayed_work(&mdsc->delayed_work,
5414  			      round_jiffies_relative(delay));
5415  }
5416  
delayed_work(struct work_struct * work)5417  static void delayed_work(struct work_struct *work)
5418  {
5419  	struct ceph_mds_client *mdsc =
5420  		container_of(work, struct ceph_mds_client, delayed_work.work);
5421  	unsigned long delay;
5422  	int renew_interval;
5423  	int renew_caps;
5424  	int i;
5425  
5426  	doutc(mdsc->fsc->client, "mdsc delayed_work\n");
5427  
5428  	if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5429  		return;
5430  
5431  	mutex_lock(&mdsc->mutex);
5432  	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5433  	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5434  				   mdsc->last_renew_caps);
5435  	if (renew_caps)
5436  		mdsc->last_renew_caps = jiffies;
5437  
5438  	for (i = 0; i < mdsc->max_sessions; i++) {
5439  		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5440  		if (!s)
5441  			continue;
5442  
5443  		if (!check_session_state(s)) {
5444  			ceph_put_mds_session(s);
5445  			continue;
5446  		}
5447  		mutex_unlock(&mdsc->mutex);
5448  
5449  		ceph_flush_session_cap_releases(mdsc, s);
5450  
5451  		mutex_lock(&s->s_mutex);
5452  		if (renew_caps)
5453  			send_renew_caps(mdsc, s);
5454  		else
5455  			ceph_con_keepalive(&s->s_con);
5456  		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5457  		    s->s_state == CEPH_MDS_SESSION_HUNG)
5458  			ceph_send_cap_releases(mdsc, s);
5459  		mutex_unlock(&s->s_mutex);
5460  		ceph_put_mds_session(s);
5461  
5462  		mutex_lock(&mdsc->mutex);
5463  	}
5464  	mutex_unlock(&mdsc->mutex);
5465  
5466  	delay = ceph_check_delayed_caps(mdsc);
5467  
5468  	ceph_queue_cap_reclaim_work(mdsc);
5469  
5470  	ceph_trim_snapid_map(mdsc);
5471  
5472  	maybe_recover_session(mdsc);
5473  
5474  	schedule_delayed(mdsc, delay);
5475  }
5476  
ceph_mdsc_init(struct ceph_fs_client * fsc)5477  int ceph_mdsc_init(struct ceph_fs_client *fsc)
5478  
5479  {
5480  	struct ceph_mds_client *mdsc;
5481  	int err;
5482  
5483  	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
5484  	if (!mdsc)
5485  		return -ENOMEM;
5486  	mdsc->fsc = fsc;
5487  	mutex_init(&mdsc->mutex);
5488  	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
5489  	if (!mdsc->mdsmap) {
5490  		err = -ENOMEM;
5491  		goto err_mdsc;
5492  	}
5493  
5494  	init_completion(&mdsc->safe_umount_waiters);
5495  	spin_lock_init(&mdsc->stopping_lock);
5496  	atomic_set(&mdsc->stopping_blockers, 0);
5497  	init_completion(&mdsc->stopping_waiter);
5498  	init_waitqueue_head(&mdsc->session_close_wq);
5499  	INIT_LIST_HEAD(&mdsc->waiting_for_map);
5500  	mdsc->quotarealms_inodes = RB_ROOT;
5501  	mutex_init(&mdsc->quotarealms_inodes_mutex);
5502  	init_rwsem(&mdsc->snap_rwsem);
5503  	mdsc->snap_realms = RB_ROOT;
5504  	INIT_LIST_HEAD(&mdsc->snap_empty);
5505  	spin_lock_init(&mdsc->snap_empty_lock);
5506  	mdsc->request_tree = RB_ROOT;
5507  	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5508  	mdsc->last_renew_caps = jiffies;
5509  	INIT_LIST_HEAD(&mdsc->cap_delay_list);
5510  #ifdef CONFIG_DEBUG_FS
5511  	INIT_LIST_HEAD(&mdsc->cap_wait_list);
5512  #endif
5513  	spin_lock_init(&mdsc->cap_delay_lock);
5514  	INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list);
5515  	INIT_LIST_HEAD(&mdsc->snap_flush_list);
5516  	spin_lock_init(&mdsc->snap_flush_lock);
5517  	mdsc->last_cap_flush_tid = 1;
5518  	INIT_LIST_HEAD(&mdsc->cap_flush_list);
5519  	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5520  	spin_lock_init(&mdsc->cap_dirty_lock);
5521  	init_waitqueue_head(&mdsc->cap_flushing_wq);
5522  	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5523  	INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work);
5524  	err = ceph_metric_init(&mdsc->metric);
5525  	if (err)
5526  		goto err_mdsmap;
5527  
5528  	spin_lock_init(&mdsc->dentry_list_lock);
5529  	INIT_LIST_HEAD(&mdsc->dentry_leases);
5530  	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5531  
5532  	ceph_caps_init(mdsc);
5533  	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5534  
5535  	spin_lock_init(&mdsc->snapid_map_lock);
5536  	mdsc->snapid_map_tree = RB_ROOT;
5537  	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5538  
5539  	init_rwsem(&mdsc->pool_perm_rwsem);
5540  	mdsc->pool_perm_tree = RB_ROOT;
5541  
5542  	strscpy(mdsc->nodename, utsname()->nodename,
5543  		sizeof(mdsc->nodename));
5544  
5545  	fsc->mdsc = mdsc;
5546  	return 0;
5547  
5548  err_mdsmap:
5549  	kfree(mdsc->mdsmap);
5550  err_mdsc:
5551  	kfree(mdsc);
5552  	return err;
5553  }
5554  
5555  /*
5556   * Wait for safe replies on open mds requests.  If we time out, drop
5557   * all requests from the tree to avoid dangling dentry refs.
5558   */
wait_requests(struct ceph_mds_client * mdsc)5559  static void wait_requests(struct ceph_mds_client *mdsc)
5560  {
5561  	struct ceph_client *cl = mdsc->fsc->client;
5562  	struct ceph_options *opts = mdsc->fsc->client->options;
5563  	struct ceph_mds_request *req;
5564  
5565  	mutex_lock(&mdsc->mutex);
5566  	if (__get_oldest_req(mdsc)) {
5567  		mutex_unlock(&mdsc->mutex);
5568  
5569  		doutc(cl, "waiting for requests\n");
5570  		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5571  				    ceph_timeout_jiffies(opts->mount_timeout));
5572  
5573  		/* tear down remaining requests */
5574  		mutex_lock(&mdsc->mutex);
5575  		while ((req = __get_oldest_req(mdsc))) {
5576  			doutc(cl, "timed out on tid %llu\n", req->r_tid);
5577  			list_del_init(&req->r_wait);
5578  			__unregister_request(mdsc, req);
5579  		}
5580  	}
5581  	mutex_unlock(&mdsc->mutex);
5582  	doutc(cl, "done\n");
5583  }
5584  
send_flush_mdlog(struct ceph_mds_session * s)5585  void send_flush_mdlog(struct ceph_mds_session *s)
5586  {
5587  	struct ceph_client *cl = s->s_mdsc->fsc->client;
5588  	struct ceph_msg *msg;
5589  
5590  	/*
5591  	 * Pre-luminous MDS crashes when it sees an unknown session request
5592  	 */
5593  	if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5594  		return;
5595  
5596  	mutex_lock(&s->s_mutex);
5597  	doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n",
5598  	      s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5599  	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5600  				      s->s_seq);
5601  	if (!msg) {
5602  		pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n",
5603  			      s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5604  	} else {
5605  		ceph_con_send(&s->s_con, msg);
5606  	}
5607  	mutex_unlock(&s->s_mutex);
5608  }
5609  
ceph_mds_auth_match(struct ceph_mds_client * mdsc,struct ceph_mds_cap_auth * auth,char * tpath)5610  static int ceph_mds_auth_match(struct ceph_mds_client *mdsc,
5611  			       struct ceph_mds_cap_auth *auth,
5612  			       char *tpath)
5613  {
5614  	const struct cred *cred = get_current_cred();
5615  	u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5616  	u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5617  	struct ceph_client *cl = mdsc->fsc->client;
5618  	const char *spath = mdsc->fsc->mount_options->server_path;
5619  	bool gid_matched = false;
5620  	u32 gid, tlen, len;
5621  	int i, j;
5622  
5623  	doutc(cl, "match.uid %lld\n", auth->match.uid);
5624  	if (auth->match.uid != MDS_AUTH_UID_ANY) {
5625  		if (auth->match.uid != caller_uid)
5626  			return 0;
5627  		if (auth->match.num_gids) {
5628  			for (i = 0; i < auth->match.num_gids; i++) {
5629  				if (caller_gid == auth->match.gids[i])
5630  					gid_matched = true;
5631  			}
5632  			if (!gid_matched && cred->group_info->ngroups) {
5633  				for (i = 0; i < cred->group_info->ngroups; i++) {
5634  					gid = from_kgid(&init_user_ns,
5635  							cred->group_info->gid[i]);
5636  					for (j = 0; j < auth->match.num_gids; j++) {
5637  						if (gid == auth->match.gids[j]) {
5638  							gid_matched = true;
5639  							break;
5640  						}
5641  					}
5642  					if (gid_matched)
5643  						break;
5644  				}
5645  			}
5646  			if (!gid_matched)
5647  				return 0;
5648  		}
5649  	}
5650  
5651  	/* path match */
5652  	if (auth->match.path) {
5653  		if (!tpath)
5654  			return 0;
5655  
5656  		tlen = strlen(tpath);
5657  		len = strlen(auth->match.path);
5658  		if (len) {
5659  			char *_tpath = tpath;
5660  			bool free_tpath = false;
5661  			int m, n;
5662  
5663  			doutc(cl, "server path %s, tpath %s, match.path %s\n",
5664  			      spath, tpath, auth->match.path);
5665  			if (spath && (m = strlen(spath)) != 1) {
5666  				/* mount path + '/' + tpath + an extra space */
5667  				n = m + 1 + tlen + 1;
5668  				_tpath = kmalloc(n, GFP_NOFS);
5669  				if (!_tpath)
5670  					return -ENOMEM;
5671  				/* remove the leading '/' */
5672  				snprintf(_tpath, n, "%s/%s", spath + 1, tpath);
5673  				free_tpath = true;
5674  				tlen = strlen(_tpath);
5675  			}
5676  
5677  			/*
5678  			 * Please note the tailing '/' for match.path has already
5679  			 * been removed when parsing.
5680  			 *
5681  			 * Remove the tailing '/' for the target path.
5682  			 */
5683  			while (tlen && _tpath[tlen - 1] == '/') {
5684  				_tpath[tlen - 1] = '\0';
5685  				tlen -= 1;
5686  			}
5687  			doutc(cl, "_tpath %s\n", _tpath);
5688  
5689  			/*
5690  			 * In case first == _tpath && tlen == len:
5691  			 *  match.path=/foo  --> /foo _path=/foo     --> match
5692  			 *  match.path=/foo/ --> /foo _path=/foo     --> match
5693  			 *
5694  			 * In case first == _tmatch.path && tlen > len:
5695  			 *  match.path=/foo/ --> /foo _path=/foo/    --> match
5696  			 *  match.path=/foo  --> /foo _path=/foo/    --> match
5697  			 *  match.path=/foo/ --> /foo _path=/foo/d   --> match
5698  			 *  match.path=/foo  --> /foo _path=/food    --> mismatch
5699  			 *
5700  			 * All the other cases                       --> mismatch
5701  			 */
5702  			char *first = strstr(_tpath, auth->match.path);
5703  			if (first != _tpath) {
5704  				if (free_tpath)
5705  					kfree(_tpath);
5706  				return 0;
5707  			}
5708  
5709  			if (tlen > len && _tpath[len] != '/') {
5710  				if (free_tpath)
5711  					kfree(_tpath);
5712  				return 0;
5713  			}
5714  		}
5715  	}
5716  
5717  	doutc(cl, "matched\n");
5718  	return 1;
5719  }
5720  
ceph_mds_check_access(struct ceph_mds_client * mdsc,char * tpath,int mask)5721  int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask)
5722  {
5723  	const struct cred *cred = get_current_cred();
5724  	u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5725  	u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5726  	struct ceph_mds_cap_auth *rw_perms_s = NULL;
5727  	struct ceph_client *cl = mdsc->fsc->client;
5728  	bool root_squash_perms = true;
5729  	int i, err;
5730  
5731  	doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n",
5732  	      tpath, mask, caller_uid, caller_gid);
5733  
5734  	for (i = 0; i < mdsc->s_cap_auths_num; i++) {
5735  		struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i];
5736  
5737  		err = ceph_mds_auth_match(mdsc, s, tpath);
5738  		if (err < 0) {
5739  			return err;
5740  		} else if (err > 0) {
5741  			/* always follow the last auth caps' permision */
5742  			root_squash_perms = true;
5743  			rw_perms_s = NULL;
5744  			if ((mask & MAY_WRITE) && s->writeable &&
5745  			    s->match.root_squash && (!caller_uid || !caller_gid))
5746  				root_squash_perms = false;
5747  
5748  			if (((mask & MAY_WRITE) && !s->writeable) ||
5749  			    ((mask & MAY_READ) && !s->readable))
5750  				rw_perms_s = s;
5751  		}
5752  	}
5753  
5754  	doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms,
5755  	      rw_perms_s);
5756  	if (root_squash_perms && rw_perms_s == NULL) {
5757  		doutc(cl, "access allowed\n");
5758  		return 0;
5759  	}
5760  
5761  	if (!root_squash_perms) {
5762  		doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write",
5763  		      caller_uid, caller_gid);
5764  	}
5765  	if (rw_perms_s) {
5766  		doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d",
5767  		      rw_perms_s->readable, rw_perms_s->writeable,
5768  		      !!(mask & MAY_READ), !!(mask & MAY_WRITE));
5769  	}
5770  	doutc(cl, "access denied\n");
5771  	return -EACCES;
5772  }
5773  
5774  /*
5775   * called before mount is ro, and before dentries are torn down.
5776   * (hmm, does this still race with new lookups?)
5777   */
ceph_mdsc_pre_umount(struct ceph_mds_client * mdsc)5778  void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5779  {
5780  	doutc(mdsc->fsc->client, "begin\n");
5781  	mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5782  
5783  	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5784  	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5785  	ceph_flush_dirty_caps(mdsc);
5786  	wait_requests(mdsc);
5787  
5788  	/*
5789  	 * wait for reply handlers to drop their request refs and
5790  	 * their inode/dcache refs
5791  	 */
5792  	ceph_msgr_flush();
5793  
5794  	ceph_cleanup_quotarealms_inodes(mdsc);
5795  	doutc(mdsc->fsc->client, "done\n");
5796  }
5797  
5798  /*
5799   * flush the mdlog and wait for all write mds requests to flush.
5800   */
flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client * mdsc,u64 want_tid)5801  static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5802  						 u64 want_tid)
5803  {
5804  	struct ceph_client *cl = mdsc->fsc->client;
5805  	struct ceph_mds_request *req = NULL, *nextreq;
5806  	struct ceph_mds_session *last_session = NULL;
5807  	struct rb_node *n;
5808  
5809  	mutex_lock(&mdsc->mutex);
5810  	doutc(cl, "want %lld\n", want_tid);
5811  restart:
5812  	req = __get_oldest_req(mdsc);
5813  	while (req && req->r_tid <= want_tid) {
5814  		/* find next request */
5815  		n = rb_next(&req->r_node);
5816  		if (n)
5817  			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5818  		else
5819  			nextreq = NULL;
5820  		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5821  		    (req->r_op & CEPH_MDS_OP_WRITE)) {
5822  			struct ceph_mds_session *s = req->r_session;
5823  
5824  			if (!s) {
5825  				req = nextreq;
5826  				continue;
5827  			}
5828  
5829  			/* write op */
5830  			ceph_mdsc_get_request(req);
5831  			if (nextreq)
5832  				ceph_mdsc_get_request(nextreq);
5833  			s = ceph_get_mds_session(s);
5834  			mutex_unlock(&mdsc->mutex);
5835  
5836  			/* send flush mdlog request to MDS */
5837  			if (last_session != s) {
5838  				send_flush_mdlog(s);
5839  				ceph_put_mds_session(last_session);
5840  				last_session = s;
5841  			} else {
5842  				ceph_put_mds_session(s);
5843  			}
5844  			doutc(cl, "wait on %llu (want %llu)\n",
5845  			      req->r_tid, want_tid);
5846  			wait_for_completion(&req->r_safe_completion);
5847  
5848  			mutex_lock(&mdsc->mutex);
5849  			ceph_mdsc_put_request(req);
5850  			if (!nextreq)
5851  				break;  /* next dne before, so we're done! */
5852  			if (RB_EMPTY_NODE(&nextreq->r_node)) {
5853  				/* next request was removed from tree */
5854  				ceph_mdsc_put_request(nextreq);
5855  				goto restart;
5856  			}
5857  			ceph_mdsc_put_request(nextreq);  /* won't go away */
5858  		}
5859  		req = nextreq;
5860  	}
5861  	mutex_unlock(&mdsc->mutex);
5862  	ceph_put_mds_session(last_session);
5863  	doutc(cl, "done\n");
5864  }
5865  
ceph_mdsc_sync(struct ceph_mds_client * mdsc)5866  void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5867  {
5868  	struct ceph_client *cl = mdsc->fsc->client;
5869  	u64 want_tid, want_flush;
5870  
5871  	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5872  		return;
5873  
5874  	doutc(cl, "sync\n");
5875  	mutex_lock(&mdsc->mutex);
5876  	want_tid = mdsc->last_tid;
5877  	mutex_unlock(&mdsc->mutex);
5878  
5879  	ceph_flush_dirty_caps(mdsc);
5880  	ceph_flush_cap_releases(mdsc);
5881  	spin_lock(&mdsc->cap_dirty_lock);
5882  	want_flush = mdsc->last_cap_flush_tid;
5883  	if (!list_empty(&mdsc->cap_flush_list)) {
5884  		struct ceph_cap_flush *cf =
5885  			list_last_entry(&mdsc->cap_flush_list,
5886  					struct ceph_cap_flush, g_list);
5887  		cf->wake = true;
5888  	}
5889  	spin_unlock(&mdsc->cap_dirty_lock);
5890  
5891  	doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
5892  
5893  	flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5894  	wait_caps_flush(mdsc, want_flush);
5895  }
5896  
5897  /*
5898   * true if all sessions are closed, or we force unmount
5899   */
done_closing_sessions(struct ceph_mds_client * mdsc,int skipped)5900  static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5901  {
5902  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5903  		return true;
5904  	return atomic_read(&mdsc->num_sessions) <= skipped;
5905  }
5906  
5907  /*
5908   * called after sb is ro or when metadata corrupted.
5909   */
ceph_mdsc_close_sessions(struct ceph_mds_client * mdsc)5910  void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5911  {
5912  	struct ceph_options *opts = mdsc->fsc->client->options;
5913  	struct ceph_client *cl = mdsc->fsc->client;
5914  	struct ceph_mds_session *session;
5915  	int i;
5916  	int skipped = 0;
5917  
5918  	doutc(cl, "begin\n");
5919  
5920  	/* close sessions */
5921  	mutex_lock(&mdsc->mutex);
5922  	for (i = 0; i < mdsc->max_sessions; i++) {
5923  		session = __ceph_lookup_mds_session(mdsc, i);
5924  		if (!session)
5925  			continue;
5926  		mutex_unlock(&mdsc->mutex);
5927  		mutex_lock(&session->s_mutex);
5928  		if (__close_session(mdsc, session) <= 0)
5929  			skipped++;
5930  		mutex_unlock(&session->s_mutex);
5931  		ceph_put_mds_session(session);
5932  		mutex_lock(&mdsc->mutex);
5933  	}
5934  	mutex_unlock(&mdsc->mutex);
5935  
5936  	doutc(cl, "waiting for sessions to close\n");
5937  	wait_event_timeout(mdsc->session_close_wq,
5938  			   done_closing_sessions(mdsc, skipped),
5939  			   ceph_timeout_jiffies(opts->mount_timeout));
5940  
5941  	/* tear down remaining sessions */
5942  	mutex_lock(&mdsc->mutex);
5943  	for (i = 0; i < mdsc->max_sessions; i++) {
5944  		if (mdsc->sessions[i]) {
5945  			session = ceph_get_mds_session(mdsc->sessions[i]);
5946  			__unregister_session(mdsc, session);
5947  			mutex_unlock(&mdsc->mutex);
5948  			mutex_lock(&session->s_mutex);
5949  			remove_session_caps(session);
5950  			mutex_unlock(&session->s_mutex);
5951  			ceph_put_mds_session(session);
5952  			mutex_lock(&mdsc->mutex);
5953  		}
5954  	}
5955  	WARN_ON(!list_empty(&mdsc->cap_delay_list));
5956  	mutex_unlock(&mdsc->mutex);
5957  
5958  	ceph_cleanup_snapid_map(mdsc);
5959  	ceph_cleanup_global_and_empty_realms(mdsc);
5960  
5961  	cancel_work_sync(&mdsc->cap_reclaim_work);
5962  	cancel_work_sync(&mdsc->cap_unlink_work);
5963  	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
5964  
5965  	doutc(cl, "done\n");
5966  }
5967  
ceph_mdsc_force_umount(struct ceph_mds_client * mdsc)5968  void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
5969  {
5970  	struct ceph_mds_session *session;
5971  	int mds;
5972  
5973  	doutc(mdsc->fsc->client, "force umount\n");
5974  
5975  	mutex_lock(&mdsc->mutex);
5976  	for (mds = 0; mds < mdsc->max_sessions; mds++) {
5977  		session = __ceph_lookup_mds_session(mdsc, mds);
5978  		if (!session)
5979  			continue;
5980  
5981  		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
5982  			__unregister_session(mdsc, session);
5983  		__wake_requests(mdsc, &session->s_waiting);
5984  		mutex_unlock(&mdsc->mutex);
5985  
5986  		mutex_lock(&session->s_mutex);
5987  		__close_session(mdsc, session);
5988  		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
5989  			cleanup_session_requests(mdsc, session);
5990  			remove_session_caps(session);
5991  		}
5992  		mutex_unlock(&session->s_mutex);
5993  		ceph_put_mds_session(session);
5994  
5995  		mutex_lock(&mdsc->mutex);
5996  		kick_requests(mdsc, mds);
5997  	}
5998  	__wake_requests(mdsc, &mdsc->waiting_for_map);
5999  	mutex_unlock(&mdsc->mutex);
6000  }
6001  
ceph_mdsc_stop(struct ceph_mds_client * mdsc)6002  static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
6003  {
6004  	doutc(mdsc->fsc->client, "stop\n");
6005  	/*
6006  	 * Make sure the delayed work stopped before releasing
6007  	 * the resources.
6008  	 *
6009  	 * Because the cancel_delayed_work_sync() will only
6010  	 * guarantee that the work finishes executing. But the
6011  	 * delayed work will re-arm itself again after that.
6012  	 */
6013  	flush_delayed_work(&mdsc->delayed_work);
6014  
6015  	if (mdsc->mdsmap)
6016  		ceph_mdsmap_destroy(mdsc->mdsmap);
6017  	kfree(mdsc->sessions);
6018  	ceph_caps_finalize(mdsc);
6019  
6020  	if (mdsc->s_cap_auths) {
6021  		int i;
6022  
6023  		for (i = 0; i < mdsc->s_cap_auths_num; i++) {
6024  			kfree(mdsc->s_cap_auths[i].match.gids);
6025  			kfree(mdsc->s_cap_auths[i].match.path);
6026  			kfree(mdsc->s_cap_auths[i].match.fs_name);
6027  		}
6028  		kfree(mdsc->s_cap_auths);
6029  	}
6030  
6031  	ceph_pool_perm_destroy(mdsc);
6032  }
6033  
ceph_mdsc_destroy(struct ceph_fs_client * fsc)6034  void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
6035  {
6036  	struct ceph_mds_client *mdsc = fsc->mdsc;
6037  	doutc(fsc->client, "%p\n", mdsc);
6038  
6039  	if (!mdsc)
6040  		return;
6041  
6042  	/* flush out any connection work with references to us */
6043  	ceph_msgr_flush();
6044  
6045  	ceph_mdsc_stop(mdsc);
6046  
6047  	ceph_metric_destroy(&mdsc->metric);
6048  
6049  	fsc->mdsc = NULL;
6050  	kfree(mdsc);
6051  	doutc(fsc->client, "%p done\n", mdsc);
6052  }
6053  
ceph_mdsc_handle_fsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)6054  void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6055  {
6056  	struct ceph_fs_client *fsc = mdsc->fsc;
6057  	struct ceph_client *cl = fsc->client;
6058  	const char *mds_namespace = fsc->mount_options->mds_namespace;
6059  	void *p = msg->front.iov_base;
6060  	void *end = p + msg->front.iov_len;
6061  	u32 epoch;
6062  	u32 num_fs;
6063  	u32 mount_fscid = (u32)-1;
6064  	int err = -EINVAL;
6065  
6066  	ceph_decode_need(&p, end, sizeof(u32), bad);
6067  	epoch = ceph_decode_32(&p);
6068  
6069  	doutc(cl, "epoch %u\n", epoch);
6070  
6071  	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
6072  	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
6073  
6074  	ceph_decode_32_safe(&p, end, num_fs, bad);
6075  	while (num_fs-- > 0) {
6076  		void *info_p, *info_end;
6077  		u32 info_len;
6078  		u32 fscid, namelen;
6079  
6080  		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
6081  		p += 2;		// info_v, info_cv
6082  		info_len = ceph_decode_32(&p);
6083  		ceph_decode_need(&p, end, info_len, bad);
6084  		info_p = p;
6085  		info_end = p + info_len;
6086  		p = info_end;
6087  
6088  		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
6089  		fscid = ceph_decode_32(&info_p);
6090  		namelen = ceph_decode_32(&info_p);
6091  		ceph_decode_need(&info_p, info_end, namelen, bad);
6092  
6093  		if (mds_namespace &&
6094  		    strlen(mds_namespace) == namelen &&
6095  		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
6096  			mount_fscid = fscid;
6097  			break;
6098  		}
6099  	}
6100  
6101  	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
6102  	if (mount_fscid != (u32)-1) {
6103  		fsc->client->monc.fs_cluster_id = mount_fscid;
6104  		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
6105  				   0, true);
6106  		ceph_monc_renew_subs(&fsc->client->monc);
6107  	} else {
6108  		err = -ENOENT;
6109  		goto err_out;
6110  	}
6111  	return;
6112  
6113  bad:
6114  	pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n",
6115  		      err);
6116  	ceph_umount_begin(mdsc->fsc->sb);
6117  	ceph_msg_dump(msg);
6118  err_out:
6119  	mutex_lock(&mdsc->mutex);
6120  	mdsc->mdsmap_err = err;
6121  	__wake_requests(mdsc, &mdsc->waiting_for_map);
6122  	mutex_unlock(&mdsc->mutex);
6123  }
6124  
6125  /*
6126   * handle mds map update.
6127   */
ceph_mdsc_handle_mdsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)6128  void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6129  {
6130  	struct ceph_client *cl = mdsc->fsc->client;
6131  	u32 epoch;
6132  	u32 maplen;
6133  	void *p = msg->front.iov_base;
6134  	void *end = p + msg->front.iov_len;
6135  	struct ceph_mdsmap *newmap, *oldmap;
6136  	struct ceph_fsid fsid;
6137  	int err = -EINVAL;
6138  
6139  	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
6140  	ceph_decode_copy(&p, &fsid, sizeof(fsid));
6141  	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
6142  		return;
6143  	epoch = ceph_decode_32(&p);
6144  	maplen = ceph_decode_32(&p);
6145  	doutc(cl, "epoch %u len %d\n", epoch, (int)maplen);
6146  
6147  	/* do we need it? */
6148  	mutex_lock(&mdsc->mutex);
6149  	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
6150  		doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch);
6151  		mutex_unlock(&mdsc->mutex);
6152  		return;
6153  	}
6154  
6155  	newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client));
6156  	if (IS_ERR(newmap)) {
6157  		err = PTR_ERR(newmap);
6158  		goto bad_unlock;
6159  	}
6160  
6161  	/* swap into place */
6162  	if (mdsc->mdsmap) {
6163  		oldmap = mdsc->mdsmap;
6164  		mdsc->mdsmap = newmap;
6165  		check_new_map(mdsc, newmap, oldmap);
6166  		ceph_mdsmap_destroy(oldmap);
6167  	} else {
6168  		mdsc->mdsmap = newmap;  /* first mds map */
6169  	}
6170  	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
6171  					MAX_LFS_FILESIZE);
6172  
6173  	__wake_requests(mdsc, &mdsc->waiting_for_map);
6174  	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
6175  			  mdsc->mdsmap->m_epoch);
6176  
6177  	mutex_unlock(&mdsc->mutex);
6178  	schedule_delayed(mdsc, 0);
6179  	return;
6180  
6181  bad_unlock:
6182  	mutex_unlock(&mdsc->mutex);
6183  bad:
6184  	pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n",
6185  		      err);
6186  	ceph_umount_begin(mdsc->fsc->sb);
6187  	ceph_msg_dump(msg);
6188  	return;
6189  }
6190  
mds_get_con(struct ceph_connection * con)6191  static struct ceph_connection *mds_get_con(struct ceph_connection *con)
6192  {
6193  	struct ceph_mds_session *s = con->private;
6194  
6195  	if (ceph_get_mds_session(s))
6196  		return con;
6197  	return NULL;
6198  }
6199  
mds_put_con(struct ceph_connection * con)6200  static void mds_put_con(struct ceph_connection *con)
6201  {
6202  	struct ceph_mds_session *s = con->private;
6203  
6204  	ceph_put_mds_session(s);
6205  }
6206  
6207  /*
6208   * if the client is unresponsive for long enough, the mds will kill
6209   * the session entirely.
6210   */
mds_peer_reset(struct ceph_connection * con)6211  static void mds_peer_reset(struct ceph_connection *con)
6212  {
6213  	struct ceph_mds_session *s = con->private;
6214  	struct ceph_mds_client *mdsc = s->s_mdsc;
6215  
6216  	pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n",
6217  		       s->s_mds);
6218  	if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO &&
6219  	    ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT)
6220  		send_mds_reconnect(mdsc, s);
6221  }
6222  
mds_dispatch(struct ceph_connection * con,struct ceph_msg * msg)6223  static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
6224  {
6225  	struct ceph_mds_session *s = con->private;
6226  	struct ceph_mds_client *mdsc = s->s_mdsc;
6227  	struct ceph_client *cl = mdsc->fsc->client;
6228  	int type = le16_to_cpu(msg->hdr.type);
6229  
6230  	mutex_lock(&mdsc->mutex);
6231  	if (__verify_registered_session(mdsc, s) < 0) {
6232  		mutex_unlock(&mdsc->mutex);
6233  		goto out;
6234  	}
6235  	mutex_unlock(&mdsc->mutex);
6236  
6237  	switch (type) {
6238  	case CEPH_MSG_MDS_MAP:
6239  		ceph_mdsc_handle_mdsmap(mdsc, msg);
6240  		break;
6241  	case CEPH_MSG_FS_MAP_USER:
6242  		ceph_mdsc_handle_fsmap(mdsc, msg);
6243  		break;
6244  	case CEPH_MSG_CLIENT_SESSION:
6245  		handle_session(s, msg);
6246  		break;
6247  	case CEPH_MSG_CLIENT_REPLY:
6248  		handle_reply(s, msg);
6249  		break;
6250  	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
6251  		handle_forward(mdsc, s, msg);
6252  		break;
6253  	case CEPH_MSG_CLIENT_CAPS:
6254  		ceph_handle_caps(s, msg);
6255  		break;
6256  	case CEPH_MSG_CLIENT_SNAP:
6257  		ceph_handle_snap(mdsc, s, msg);
6258  		break;
6259  	case CEPH_MSG_CLIENT_LEASE:
6260  		handle_lease(mdsc, s, msg);
6261  		break;
6262  	case CEPH_MSG_CLIENT_QUOTA:
6263  		ceph_handle_quota(mdsc, s, msg);
6264  		break;
6265  
6266  	default:
6267  		pr_err_client(cl, "received unknown message type %d %s\n",
6268  			      type, ceph_msg_type_name(type));
6269  	}
6270  out:
6271  	ceph_msg_put(msg);
6272  }
6273  
6274  /*
6275   * authentication
6276   */
6277  
6278  /*
6279   * Note: returned pointer is the address of a structure that's
6280   * managed separately.  Caller must *not* attempt to free it.
6281   */
6282  static struct ceph_auth_handshake *
mds_get_authorizer(struct ceph_connection * con,int * proto,int force_new)6283  mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
6284  {
6285  	struct ceph_mds_session *s = con->private;
6286  	struct ceph_mds_client *mdsc = s->s_mdsc;
6287  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6288  	struct ceph_auth_handshake *auth = &s->s_auth;
6289  	int ret;
6290  
6291  	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6292  					 force_new, proto, NULL, NULL);
6293  	if (ret)
6294  		return ERR_PTR(ret);
6295  
6296  	return auth;
6297  }
6298  
mds_add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)6299  static int mds_add_authorizer_challenge(struct ceph_connection *con,
6300  				    void *challenge_buf, int challenge_buf_len)
6301  {
6302  	struct ceph_mds_session *s = con->private;
6303  	struct ceph_mds_client *mdsc = s->s_mdsc;
6304  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6305  
6306  	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
6307  					    challenge_buf, challenge_buf_len);
6308  }
6309  
mds_verify_authorizer_reply(struct ceph_connection * con)6310  static int mds_verify_authorizer_reply(struct ceph_connection *con)
6311  {
6312  	struct ceph_mds_session *s = con->private;
6313  	struct ceph_mds_client *mdsc = s->s_mdsc;
6314  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6315  	struct ceph_auth_handshake *auth = &s->s_auth;
6316  
6317  	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
6318  		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
6319  		NULL, NULL, NULL, NULL);
6320  }
6321  
mds_invalidate_authorizer(struct ceph_connection * con)6322  static int mds_invalidate_authorizer(struct ceph_connection *con)
6323  {
6324  	struct ceph_mds_session *s = con->private;
6325  	struct ceph_mds_client *mdsc = s->s_mdsc;
6326  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6327  
6328  	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
6329  
6330  	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
6331  }
6332  
mds_get_auth_request(struct ceph_connection * con,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)6333  static int mds_get_auth_request(struct ceph_connection *con,
6334  				void *buf, int *buf_len,
6335  				void **authorizer, int *authorizer_len)
6336  {
6337  	struct ceph_mds_session *s = con->private;
6338  	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6339  	struct ceph_auth_handshake *auth = &s->s_auth;
6340  	int ret;
6341  
6342  	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6343  				       buf, buf_len);
6344  	if (ret)
6345  		return ret;
6346  
6347  	*authorizer = auth->authorizer_buf;
6348  	*authorizer_len = auth->authorizer_buf_len;
6349  	return 0;
6350  }
6351  
mds_handle_auth_reply_more(struct ceph_connection * con,void * reply,int reply_len,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)6352  static int mds_handle_auth_reply_more(struct ceph_connection *con,
6353  				      void *reply, int reply_len,
6354  				      void *buf, int *buf_len,
6355  				      void **authorizer, int *authorizer_len)
6356  {
6357  	struct ceph_mds_session *s = con->private;
6358  	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6359  	struct ceph_auth_handshake *auth = &s->s_auth;
6360  	int ret;
6361  
6362  	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
6363  					      buf, buf_len);
6364  	if (ret)
6365  		return ret;
6366  
6367  	*authorizer = auth->authorizer_buf;
6368  	*authorizer_len = auth->authorizer_buf_len;
6369  	return 0;
6370  }
6371  
mds_handle_auth_done(struct ceph_connection * con,u64 global_id,void * reply,int reply_len,u8 * session_key,int * session_key_len,u8 * con_secret,int * con_secret_len)6372  static int mds_handle_auth_done(struct ceph_connection *con,
6373  				u64 global_id, void *reply, int reply_len,
6374  				u8 *session_key, int *session_key_len,
6375  				u8 *con_secret, int *con_secret_len)
6376  {
6377  	struct ceph_mds_session *s = con->private;
6378  	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6379  	struct ceph_auth_handshake *auth = &s->s_auth;
6380  
6381  	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
6382  					       session_key, session_key_len,
6383  					       con_secret, con_secret_len);
6384  }
6385  
mds_handle_auth_bad_method(struct ceph_connection * con,int used_proto,int result,const int * allowed_protos,int proto_cnt,const int * allowed_modes,int mode_cnt)6386  static int mds_handle_auth_bad_method(struct ceph_connection *con,
6387  				      int used_proto, int result,
6388  				      const int *allowed_protos, int proto_cnt,
6389  				      const int *allowed_modes, int mode_cnt)
6390  {
6391  	struct ceph_mds_session *s = con->private;
6392  	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
6393  	int ret;
6394  
6395  	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
6396  					    used_proto, result,
6397  					    allowed_protos, proto_cnt,
6398  					    allowed_modes, mode_cnt)) {
6399  		ret = ceph_monc_validate_auth(monc);
6400  		if (ret)
6401  			return ret;
6402  	}
6403  
6404  	return -EACCES;
6405  }
6406  
mds_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)6407  static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
6408  				struct ceph_msg_header *hdr, int *skip)
6409  {
6410  	struct ceph_msg *msg;
6411  	int type = (int) le16_to_cpu(hdr->type);
6412  	int front_len = (int) le32_to_cpu(hdr->front_len);
6413  
6414  	if (con->in_msg)
6415  		return con->in_msg;
6416  
6417  	*skip = 0;
6418  	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
6419  	if (!msg) {
6420  		pr_err("unable to allocate msg type %d len %d\n",
6421  		       type, front_len);
6422  		return NULL;
6423  	}
6424  
6425  	return msg;
6426  }
6427  
mds_sign_message(struct ceph_msg * msg)6428  static int mds_sign_message(struct ceph_msg *msg)
6429  {
6430         struct ceph_mds_session *s = msg->con->private;
6431         struct ceph_auth_handshake *auth = &s->s_auth;
6432  
6433         return ceph_auth_sign_message(auth, msg);
6434  }
6435  
mds_check_message_signature(struct ceph_msg * msg)6436  static int mds_check_message_signature(struct ceph_msg *msg)
6437  {
6438         struct ceph_mds_session *s = msg->con->private;
6439         struct ceph_auth_handshake *auth = &s->s_auth;
6440  
6441         return ceph_auth_check_message_signature(auth, msg);
6442  }
6443  
6444  static const struct ceph_connection_operations mds_con_ops = {
6445  	.get = mds_get_con,
6446  	.put = mds_put_con,
6447  	.alloc_msg = mds_alloc_msg,
6448  	.dispatch = mds_dispatch,
6449  	.peer_reset = mds_peer_reset,
6450  	.get_authorizer = mds_get_authorizer,
6451  	.add_authorizer_challenge = mds_add_authorizer_challenge,
6452  	.verify_authorizer_reply = mds_verify_authorizer_reply,
6453  	.invalidate_authorizer = mds_invalidate_authorizer,
6454  	.sign_message = mds_sign_message,
6455  	.check_message_signature = mds_check_message_signature,
6456  	.get_auth_request = mds_get_auth_request,
6457  	.handle_auth_reply_more = mds_handle_auth_reply_more,
6458  	.handle_auth_done = mds_handle_auth_done,
6459  	.handle_auth_bad_method = mds_handle_auth_bad_method,
6460  };
6461  
6462  /* eof */
6463