1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "dir.h"
16 #include "ast.h"
17 #include "recover.h"
18 #include "lowcomms.h"
19 #include "lock.h"
20 #include "requestqueue.h"
21 #include "recoverd.h"
22 
dlm_create_masters_list(struct dlm_ls * ls)23 static int dlm_create_masters_list(struct dlm_ls *ls)
24 {
25 	struct dlm_rsb *r;
26 	int error = 0;
27 
28 	write_lock_bh(&ls->ls_masters_lock);
29 	if (!list_empty(&ls->ls_masters_list)) {
30 		log_error(ls, "root list not empty");
31 		error = -EINVAL;
32 		goto out;
33 	}
34 
35 	read_lock_bh(&ls->ls_rsbtbl_lock);
36 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
37 		if (r->res_nodeid)
38 			continue;
39 
40 		list_add(&r->res_masters_list, &ls->ls_masters_list);
41 		dlm_hold_rsb(r);
42 	}
43 	read_unlock_bh(&ls->ls_rsbtbl_lock);
44  out:
45 	write_unlock_bh(&ls->ls_masters_lock);
46 	return error;
47 }
48 
dlm_release_masters_list(struct dlm_ls * ls)49 static void dlm_release_masters_list(struct dlm_ls *ls)
50 {
51 	struct dlm_rsb *r, *safe;
52 
53 	write_lock_bh(&ls->ls_masters_lock);
54 	list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
55 		list_del_init(&r->res_masters_list);
56 		dlm_put_rsb(r);
57 	}
58 	write_unlock_bh(&ls->ls_masters_lock);
59 }
60 
dlm_create_root_list(struct dlm_ls * ls,struct list_head * root_list)61 static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
62 {
63 	struct dlm_rsb *r;
64 
65 	read_lock_bh(&ls->ls_rsbtbl_lock);
66 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
67 		list_add(&r->res_root_list, root_list);
68 		dlm_hold_rsb(r);
69 	}
70 
71 	WARN_ON_ONCE(!list_empty(&ls->ls_slow_inactive));
72 	read_unlock_bh(&ls->ls_rsbtbl_lock);
73 }
74 
dlm_release_root_list(struct list_head * root_list)75 static void dlm_release_root_list(struct list_head *root_list)
76 {
77 	struct dlm_rsb *r, *safe;
78 
79 	list_for_each_entry_safe(r, safe, root_list, res_root_list) {
80 		list_del_init(&r->res_root_list);
81 		dlm_put_rsb(r);
82 	}
83 }
84 
85 /* If the start for which we're re-enabling locking (seq) has been superseded
86    by a newer stop (ls_recover_seq), we need to leave locking disabled.
87 
88    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
89    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
90    enables locking and clears the requestqueue between a and b. */
91 
enable_locking(struct dlm_ls * ls,uint64_t seq)92 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
93 {
94 	int error = -EINTR;
95 
96 	write_lock_bh(&ls->ls_recv_active);
97 
98 	spin_lock_bh(&ls->ls_recover_lock);
99 	if (ls->ls_recover_seq == seq) {
100 		set_bit(LSFL_RUNNING, &ls->ls_flags);
101 		/* Schedule next timer if recovery put something on inactive.
102 		 *
103 		 * The rsbs that was queued while recovery on toss hasn't
104 		 * started yet because LSFL_RUNNING was set everything
105 		 * else recovery hasn't started as well because ls_in_recovery
106 		 * is still hold. So we should not run into the case that
107 		 * resume_scan_timer() queues a timer that can occur in
108 		 * a no op.
109 		 */
110 		resume_scan_timer(ls);
111 		/* unblocks processes waiting to enter the dlm */
112 		up_write(&ls->ls_in_recovery);
113 		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
114 		error = 0;
115 	}
116 	spin_unlock_bh(&ls->ls_recover_lock);
117 
118 	write_unlock_bh(&ls->ls_recv_active);
119 	return error;
120 }
121 
ls_recover(struct dlm_ls * ls,struct dlm_recover * rv)122 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
123 {
124 	LIST_HEAD(root_list);
125 	unsigned long start;
126 	int error, neg = 0;
127 
128 	log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
129 
130 	mutex_lock(&ls->ls_recoverd_active);
131 
132 	dlm_callback_suspend(ls);
133 
134 	dlm_clear_inactive(ls);
135 
136 	/*
137 	 * This list of root rsb's will be the basis of most of the recovery
138 	 * routines.
139 	 */
140 
141 	dlm_create_root_list(ls, &root_list);
142 
143 	/*
144 	 * Add or remove nodes from the lockspace's ls_nodes list.
145 	 *
146 	 * Due to the fact that we must report all membership changes to lsops
147 	 * or midcomms layer, it is not permitted to abort ls_recover() until
148 	 * this is done.
149 	 */
150 
151 	error = dlm_recover_members(ls, rv, &neg);
152 	if (error) {
153 		log_rinfo(ls, "dlm_recover_members error %d", error);
154 		goto fail;
155 	}
156 
157 	dlm_recover_dir_nodeid(ls, &root_list);
158 
159 	/* Create a snapshot of all active rsbs were we are the master of.
160 	 * During the barrier between dlm_recover_members_wait() and
161 	 * dlm_recover_directory() other nodes can dump their necessary
162 	 * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom
163 	 * communication dlm_copy_master_names() handling.
164 	 *
165 	 * TODO We should create a per lockspace list that contains rsbs
166 	 * that we are the master of. Instead of creating this list while
167 	 * recovery we keep track of those rsbs while locking handling and
168 	 * recovery can use it when necessary.
169 	 */
170 	error = dlm_create_masters_list(ls);
171 	if (error) {
172 		log_rinfo(ls, "dlm_create_masters_list error %d", error);
173 		goto fail_root_list;
174 	}
175 
176 	ls->ls_recover_locks_in = 0;
177 
178 	dlm_set_recover_status(ls, DLM_RS_NODES);
179 
180 	error = dlm_recover_members_wait(ls, rv->seq);
181 	if (error) {
182 		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
183 		dlm_release_masters_list(ls);
184 		goto fail_root_list;
185 	}
186 
187 	start = jiffies;
188 
189 	/*
190 	 * Rebuild our own share of the directory by collecting from all other
191 	 * nodes their master rsb names that hash to us.
192 	 */
193 
194 	error = dlm_recover_directory(ls, rv->seq);
195 	if (error) {
196 		log_rinfo(ls, "dlm_recover_directory error %d", error);
197 		dlm_release_masters_list(ls);
198 		goto fail_root_list;
199 	}
200 
201 	dlm_set_recover_status(ls, DLM_RS_DIR);
202 
203 	error = dlm_recover_directory_wait(ls, rv->seq);
204 	if (error) {
205 		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
206 		dlm_release_masters_list(ls);
207 		goto fail_root_list;
208 	}
209 
210 	dlm_release_masters_list(ls);
211 
212 	/*
213 	 * We may have outstanding operations that are waiting for a reply from
214 	 * a failed node.  Mark these to be resent after recovery.  Unlock and
215 	 * cancel ops can just be completed.
216 	 */
217 
218 	dlm_recover_waiters_pre(ls);
219 
220 	if (dlm_recovery_stopped(ls)) {
221 		error = -EINTR;
222 		goto fail_root_list;
223 	}
224 
225 	if (neg || dlm_no_directory(ls)) {
226 		/*
227 		 * Clear lkb's for departed nodes.
228 		 */
229 
230 		dlm_recover_purge(ls, &root_list);
231 
232 		/*
233 		 * Get new master nodeid's for rsb's that were mastered on
234 		 * departed nodes.
235 		 */
236 
237 		error = dlm_recover_masters(ls, rv->seq, &root_list);
238 		if (error) {
239 			log_rinfo(ls, "dlm_recover_masters error %d", error);
240 			goto fail_root_list;
241 		}
242 
243 		/*
244 		 * Send our locks on remastered rsb's to the new masters.
245 		 */
246 
247 		error = dlm_recover_locks(ls, rv->seq, &root_list);
248 		if (error) {
249 			log_rinfo(ls, "dlm_recover_locks error %d", error);
250 			goto fail_root_list;
251 		}
252 
253 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
254 
255 		error = dlm_recover_locks_wait(ls, rv->seq);
256 		if (error) {
257 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
258 			goto fail_root_list;
259 		}
260 
261 		log_rinfo(ls, "dlm_recover_locks %u in",
262 			  ls->ls_recover_locks_in);
263 
264 		/*
265 		 * Finalize state in master rsb's now that all locks can be
266 		 * checked.  This includes conversion resolution and lvb
267 		 * settings.
268 		 */
269 
270 		dlm_recover_rsbs(ls, &root_list);
271 	} else {
272 		/*
273 		 * Other lockspace members may be going through the "neg" steps
274 		 * while also adding us to the lockspace, in which case they'll
275 		 * be doing the recover_locks (RS_LOCKS) barrier.
276 		 */
277 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
278 
279 		error = dlm_recover_locks_wait(ls, rv->seq);
280 		if (error) {
281 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
282 			goto fail_root_list;
283 		}
284 	}
285 
286 	dlm_release_root_list(&root_list);
287 
288 	/*
289 	 * Purge directory-related requests that are saved in requestqueue.
290 	 * All dir requests from before recovery are invalid now due to the dir
291 	 * rebuild and will be resent by the requesting nodes.
292 	 */
293 
294 	dlm_purge_requestqueue(ls);
295 
296 	dlm_set_recover_status(ls, DLM_RS_DONE);
297 
298 	error = dlm_recover_done_wait(ls, rv->seq);
299 	if (error) {
300 		log_rinfo(ls, "dlm_recover_done_wait error %d", error);
301 		goto fail;
302 	}
303 
304 	dlm_clear_members_gone(ls);
305 
306 	dlm_callback_resume(ls);
307 
308 	error = enable_locking(ls, rv->seq);
309 	if (error) {
310 		log_rinfo(ls, "enable_locking error %d", error);
311 		goto fail;
312 	}
313 
314 	error = dlm_process_requestqueue(ls);
315 	if (error) {
316 		log_rinfo(ls, "dlm_process_requestqueue error %d", error);
317 		goto fail;
318 	}
319 
320 	error = dlm_recover_waiters_post(ls);
321 	if (error) {
322 		log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
323 		goto fail;
324 	}
325 
326 	dlm_recover_grant(ls);
327 
328 	log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
329 		  (unsigned long long)rv->seq, ls->ls_generation,
330 		  jiffies_to_msecs(jiffies - start));
331 	mutex_unlock(&ls->ls_recoverd_active);
332 
333 	return 0;
334 
335  fail_root_list:
336 	dlm_release_root_list(&root_list);
337  fail:
338 	mutex_unlock(&ls->ls_recoverd_active);
339 
340 	return error;
341 }
342 
343 /* The dlm_ls_start() that created the rv we take here may already have been
344    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
345    flag set. */
346 
do_ls_recovery(struct dlm_ls * ls)347 static void do_ls_recovery(struct dlm_ls *ls)
348 {
349 	struct dlm_recover *rv = NULL;
350 	int error;
351 
352 	spin_lock_bh(&ls->ls_recover_lock);
353 	rv = ls->ls_recover_args;
354 	ls->ls_recover_args = NULL;
355 	if (rv && ls->ls_recover_seq == rv->seq)
356 		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
357 	spin_unlock_bh(&ls->ls_recover_lock);
358 
359 	if (rv) {
360 		error = ls_recover(ls, rv);
361 		switch (error) {
362 		case 0:
363 			ls->ls_recovery_result = 0;
364 			complete(&ls->ls_recovery_done);
365 
366 			dlm_lsop_recover_done(ls);
367 			break;
368 		case -EINTR:
369 			/* if recovery was interrupted -EINTR we wait for the next
370 			 * ls_recover() iteration until it hopefully succeeds.
371 			 */
372 			log_rinfo(ls, "%s %llu interrupted and should be queued to run again",
373 				  __func__, (unsigned long long)rv->seq);
374 			break;
375 		default:
376 			log_rinfo(ls, "%s %llu error %d", __func__,
377 				  (unsigned long long)rv->seq, error);
378 
379 			/* let new_lockspace() get aware of critical error */
380 			ls->ls_recovery_result = error;
381 			complete(&ls->ls_recovery_done);
382 			break;
383 		}
384 
385 		kfree(rv->nodes);
386 		kfree(rv);
387 	}
388 }
389 
dlm_recoverd(void * arg)390 static int dlm_recoverd(void *arg)
391 {
392 	struct dlm_ls *ls;
393 
394 	ls = dlm_find_lockspace_local(arg);
395 	if (!ls) {
396 		log_print("dlm_recoverd: no lockspace %p", arg);
397 		return -1;
398 	}
399 
400 	down_write(&ls->ls_in_recovery);
401 	set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
402 	wake_up(&ls->ls_recover_lock_wait);
403 
404 	while (1) {
405 		/*
406 		 * We call kthread_should_stop() after set_current_state().
407 		 * This is because it works correctly if kthread_stop() is
408 		 * called just before set_current_state().
409 		 */
410 		set_current_state(TASK_INTERRUPTIBLE);
411 		if (kthread_should_stop()) {
412 			set_current_state(TASK_RUNNING);
413 			break;
414 		}
415 		if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
416 		    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
417 			if (kthread_should_stop())
418 				break;
419 			schedule();
420 		}
421 		set_current_state(TASK_RUNNING);
422 
423 		if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
424 			down_write(&ls->ls_in_recovery);
425 			set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
426 			wake_up(&ls->ls_recover_lock_wait);
427 		}
428 
429 		if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
430 			do_ls_recovery(ls);
431 	}
432 
433 	if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
434 		up_write(&ls->ls_in_recovery);
435 
436 	dlm_put_lockspace(ls);
437 	return 0;
438 }
439 
dlm_recoverd_start(struct dlm_ls * ls)440 int dlm_recoverd_start(struct dlm_ls *ls)
441 {
442 	struct task_struct *p;
443 	int error = 0;
444 
445 	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
446 	if (IS_ERR(p))
447 		error = PTR_ERR(p);
448 	else
449                 ls->ls_recoverd_task = p;
450 	return error;
451 }
452 
dlm_recoverd_stop(struct dlm_ls * ls)453 void dlm_recoverd_stop(struct dlm_ls *ls)
454 {
455 	kthread_stop(ls->ls_recoverd_task);
456 }
457 
dlm_recoverd_suspend(struct dlm_ls * ls)458 void dlm_recoverd_suspend(struct dlm_ls *ls)
459 {
460 	wake_up(&ls->ls_wait_general);
461 	mutex_lock(&ls->ls_recoverd_active);
462 }
463 
dlm_recoverd_resume(struct dlm_ls * ls)464 void dlm_recoverd_resume(struct dlm_ls *ls)
465 {
466 	mutex_unlock(&ls->ls_recoverd_active);
467 }
468 
469