1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * Copyright 2023 Red Hat
4   */
5  
6  /*
7   * This file contains the main entry points for normal operations on a vdo as well as functions for
8   * constructing and destroying vdo instances (in memory).
9   */
10  
11  /**
12   * DOC:
13   *
14   * A read_only_notifier has a single completion which is used to perform read-only notifications,
15   * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected
16   * by a spinlock, are used to control the read-only mode entry process. The first field holds the
17   * read-only error. The second is the state field, which may hold any of the four special values
18   * enumerated here.
19   *
20   * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field
21   * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already
22   * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in
23   * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is
24   * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then
25   * notifications are currently disallowed, generally due to the vdo being suspended. In this case,
26   * the nothing more will be done until the vdo is resumed, at which point the notification will be
27   * performed. In any other case, the vdo is already read-only, and there is nothing more to do.
28   */
29  
30  #include "vdo.h"
31  
32  #include <linux/completion.h>
33  #include <linux/device-mapper.h>
34  #include <linux/kernel.h>
35  #include <linux/lz4.h>
36  #include <linux/module.h>
37  #include <linux/mutex.h>
38  #include <linux/spinlock.h>
39  #include <linux/types.h>
40  
41  #include "logger.h"
42  #include "memory-alloc.h"
43  #include "permassert.h"
44  #include "string-utils.h"
45  
46  #include "block-map.h"
47  #include "completion.h"
48  #include "data-vio.h"
49  #include "dedupe.h"
50  #include "encodings.h"
51  #include "funnel-workqueue.h"
52  #include "io-submitter.h"
53  #include "logical-zone.h"
54  #include "packer.h"
55  #include "physical-zone.h"
56  #include "recovery-journal.h"
57  #include "slab-depot.h"
58  #include "statistics.h"
59  #include "status-codes.h"
60  #include "vio.h"
61  
62  #define PARANOID_THREAD_CONSISTENCY_CHECKS 0
63  
64  struct sync_completion {
65  	struct vdo_completion vdo_completion;
66  	struct completion completion;
67  };
68  
69  /* A linked list is adequate for the small number of entries we expect. */
70  struct device_registry {
71  	struct list_head links;
72  	/* TODO: Convert to rcu per kernel recommendation. */
73  	rwlock_t lock;
74  };
75  
76  static struct device_registry registry;
77  
78  /**
79   * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device
80   *                                         registry.
81   */
vdo_initialize_device_registry_once(void)82  void vdo_initialize_device_registry_once(void)
83  {
84  	INIT_LIST_HEAD(&registry.links);
85  	rwlock_init(&registry.lock);
86  }
87  
88  /** vdo_is_equal() - Implements vdo_filter_fn. */
vdo_is_equal(struct vdo * vdo,const void * context)89  static bool vdo_is_equal(struct vdo *vdo, const void *context)
90  {
91  	return (vdo == context);
92  }
93  
94  /**
95   * filter_vdos_locked() - Find a vdo in the registry if it exists there.
96   * @filter: The filter function to apply to devices.
97   * @context: A bit of context to provide the filter.
98   *
99   * Context: Must be called holding the lock.
100   *
101   * Return: the vdo object found, if any.
102   */
filter_vdos_locked(vdo_filter_fn filter,const void * context)103  static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter,
104  						    const void *context)
105  {
106  	struct vdo *vdo;
107  
108  	list_for_each_entry(vdo, &registry.links, registration) {
109  		if (filter(vdo, context))
110  			return vdo;
111  	}
112  
113  	return NULL;
114  }
115  
116  /**
117   * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function.
118   * @filter: The filter function to apply to vdos.
119   * @context: A bit of context to provide the filter.
120   */
vdo_find_matching(vdo_filter_fn filter,const void * context)121  struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context)
122  {
123  	struct vdo *vdo;
124  
125  	read_lock(&registry.lock);
126  	vdo = filter_vdos_locked(filter, context);
127  	read_unlock(&registry.lock);
128  
129  	return vdo;
130  }
131  
start_vdo_request_queue(void * ptr)132  static void start_vdo_request_queue(void *ptr)
133  {
134  	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
135  
136  	vdo_register_allocating_thread(&thread->allocating_thread,
137  				       &thread->vdo->allocations_allowed);
138  }
139  
finish_vdo_request_queue(void * ptr)140  static void finish_vdo_request_queue(void *ptr)
141  {
142  	vdo_unregister_allocating_thread();
143  }
144  
145  #ifdef MODULE
146  #define MODULE_NAME THIS_MODULE->name
147  #else
148  #define MODULE_NAME "dm-vdo"
149  #endif  /* MODULE */
150  
151  static const struct vdo_work_queue_type default_queue_type = {
152  	.start = start_vdo_request_queue,
153  	.finish = finish_vdo_request_queue,
154  	.max_priority = VDO_DEFAULT_Q_MAX_PRIORITY,
155  	.default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY,
156  };
157  
158  static const struct vdo_work_queue_type bio_ack_q_type = {
159  	.start = NULL,
160  	.finish = NULL,
161  	.max_priority = BIO_ACK_Q_MAX_PRIORITY,
162  	.default_priority = BIO_ACK_Q_ACK_PRIORITY,
163  };
164  
165  static const struct vdo_work_queue_type cpu_q_type = {
166  	.start = NULL,
167  	.finish = NULL,
168  	.max_priority = CPU_Q_MAX_PRIORITY,
169  	.default_priority = CPU_Q_MAX_PRIORITY,
170  };
171  
uninitialize_thread_config(struct thread_config * config)172  static void uninitialize_thread_config(struct thread_config *config)
173  {
174  	vdo_free(vdo_forget(config->logical_threads));
175  	vdo_free(vdo_forget(config->physical_threads));
176  	vdo_free(vdo_forget(config->hash_zone_threads));
177  	vdo_free(vdo_forget(config->bio_threads));
178  	memset(config, 0, sizeof(struct thread_config));
179  }
180  
assign_thread_ids(struct thread_config * config,thread_id_t thread_ids[],zone_count_t count)181  static void assign_thread_ids(struct thread_config *config,
182  			      thread_id_t thread_ids[], zone_count_t count)
183  {
184  	zone_count_t zone;
185  
186  	for (zone = 0; zone < count; zone++)
187  		thread_ids[zone] = config->thread_count++;
188  }
189  
190  /**
191   * initialize_thread_config() - Initialize the thread mapping
192   *
193   * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all
194   * three plus the packer and recovery journal. Otherwise, there must be at least one of each type,
195   * and each will have its own thread, as will the packer and recovery journal.
196   *
197   * Return: VDO_SUCCESS or an error.
198   */
initialize_thread_config(struct thread_count_config counts,struct thread_config * config)199  static int __must_check initialize_thread_config(struct thread_count_config counts,
200  						 struct thread_config *config)
201  {
202  	int result;
203  	bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0);
204  
205  	config->bio_thread_count = counts.bio_threads;
206  	if (single) {
207  		config->logical_zone_count = 1;
208  		config->physical_zone_count = 1;
209  		config->hash_zone_count = 1;
210  	} else {
211  		config->logical_zone_count = counts.logical_zones;
212  		config->physical_zone_count = counts.physical_zones;
213  		config->hash_zone_count = counts.hash_zones;
214  	}
215  
216  	result = vdo_allocate(config->logical_zone_count, thread_id_t,
217  			      "logical thread array", &config->logical_threads);
218  	if (result != VDO_SUCCESS) {
219  		uninitialize_thread_config(config);
220  		return result;
221  	}
222  
223  	result = vdo_allocate(config->physical_zone_count, thread_id_t,
224  			      "physical thread array", &config->physical_threads);
225  	if (result != VDO_SUCCESS) {
226  		uninitialize_thread_config(config);
227  		return result;
228  	}
229  
230  	result = vdo_allocate(config->hash_zone_count, thread_id_t,
231  			      "hash thread array", &config->hash_zone_threads);
232  	if (result != VDO_SUCCESS) {
233  		uninitialize_thread_config(config);
234  		return result;
235  	}
236  
237  	result = vdo_allocate(config->bio_thread_count, thread_id_t,
238  			      "bio thread array", &config->bio_threads);
239  	if (result != VDO_SUCCESS) {
240  		uninitialize_thread_config(config);
241  		return result;
242  	}
243  
244  	if (single) {
245  		config->logical_threads[0] = config->thread_count;
246  		config->physical_threads[0] = config->thread_count;
247  		config->hash_zone_threads[0] = config->thread_count++;
248  	} else {
249  		config->admin_thread = config->thread_count;
250  		config->journal_thread = config->thread_count++;
251  		config->packer_thread = config->thread_count++;
252  		assign_thread_ids(config, config->logical_threads, counts.logical_zones);
253  		assign_thread_ids(config, config->physical_threads, counts.physical_zones);
254  		assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones);
255  	}
256  
257  	config->dedupe_thread = config->thread_count++;
258  	config->bio_ack_thread =
259  		((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID);
260  	config->cpu_thread = config->thread_count++;
261  	assign_thread_ids(config, config->bio_threads, counts.bio_threads);
262  	return VDO_SUCCESS;
263  }
264  
265  /**
266   * read_geometry_block() - Synchronously read the geometry block from a vdo's underlying block
267   *                         device.
268   * @vdo: The vdo whose geometry is to be read.
269   *
270   * Return: VDO_SUCCESS or an error code.
271   */
read_geometry_block(struct vdo * vdo)272  static int __must_check read_geometry_block(struct vdo *vdo)
273  {
274  	struct vio *vio;
275  	char *block;
276  	int result;
277  
278  	result = vdo_allocate(VDO_BLOCK_SIZE, u8, __func__, &block);
279  	if (result != VDO_SUCCESS)
280  		return result;
281  
282  	result = create_metadata_vio(vdo, VIO_TYPE_GEOMETRY, VIO_PRIORITY_HIGH, NULL,
283  				     block, &vio);
284  	if (result != VDO_SUCCESS) {
285  		vdo_free(block);
286  		return result;
287  	}
288  
289  	/*
290  	 * This is only safe because, having not already loaded the geometry, the vdo's geometry's
291  	 * bio_offset field is 0, so the fact that vio_reset_bio() will subtract that offset from
292  	 * the supplied pbn is not a problem.
293  	 */
294  	result = vio_reset_bio(vio, block, NULL, REQ_OP_READ,
295  			       VDO_GEOMETRY_BLOCK_LOCATION);
296  	if (result != VDO_SUCCESS) {
297  		free_vio(vdo_forget(vio));
298  		vdo_free(block);
299  		return result;
300  	}
301  
302  	bio_set_dev(vio->bio, vdo_get_backing_device(vdo));
303  	submit_bio_wait(vio->bio);
304  	result = blk_status_to_errno(vio->bio->bi_status);
305  	free_vio(vdo_forget(vio));
306  	if (result != 0) {
307  		vdo_log_error_strerror(result, "synchronous read failed");
308  		vdo_free(block);
309  		return -EIO;
310  	}
311  
312  	result = vdo_parse_geometry_block((u8 *) block, &vdo->geometry);
313  	vdo_free(block);
314  	return result;
315  }
316  
get_zone_thread_name(const thread_id_t thread_ids[],zone_count_t count,thread_id_t id,const char * prefix,char * buffer,size_t buffer_length)317  static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count,
318  				 thread_id_t id, const char *prefix,
319  				 char *buffer, size_t buffer_length)
320  {
321  	if (id >= thread_ids[0]) {
322  		thread_id_t index = id - thread_ids[0];
323  
324  		if (index < count) {
325  			snprintf(buffer, buffer_length, "%s%d", prefix, index);
326  			return true;
327  		}
328  	}
329  
330  	return false;
331  }
332  
333  /**
334   * get_thread_name() - Format the name of the worker thread desired to support a given work queue.
335   * @thread_config: The thread configuration.
336   * @thread_id: The thread id.
337   * @buffer: Where to put the formatted name.
338   * @buffer_length: Size of the output buffer.
339   *
340   * The physical layer may add a prefix identifying the product; the output from this function
341   * should just identify the thread.
342   */
get_thread_name(const struct thread_config * thread_config,thread_id_t thread_id,char * buffer,size_t buffer_length)343  static void get_thread_name(const struct thread_config *thread_config,
344  			    thread_id_t thread_id, char *buffer, size_t buffer_length)
345  {
346  	if (thread_id == thread_config->journal_thread) {
347  		if (thread_config->packer_thread == thread_id) {
348  			/*
349  			 * This is the "single thread" config where one thread is used for the
350  			 * journal, packer, logical, physical, and hash zones. In that case, it is
351  			 * known as the "request queue."
352  			 */
353  			snprintf(buffer, buffer_length, "reqQ");
354  			return;
355  		}
356  
357  		snprintf(buffer, buffer_length, "journalQ");
358  		return;
359  	} else if (thread_id == thread_config->admin_thread) {
360  		/* Theoretically this could be different from the journal thread. */
361  		snprintf(buffer, buffer_length, "adminQ");
362  		return;
363  	} else if (thread_id == thread_config->packer_thread) {
364  		snprintf(buffer, buffer_length, "packerQ");
365  		return;
366  	} else if (thread_id == thread_config->dedupe_thread) {
367  		snprintf(buffer, buffer_length, "dedupeQ");
368  		return;
369  	} else if (thread_id == thread_config->bio_ack_thread) {
370  		snprintf(buffer, buffer_length, "ackQ");
371  		return;
372  	} else if (thread_id == thread_config->cpu_thread) {
373  		snprintf(buffer, buffer_length, "cpuQ");
374  		return;
375  	}
376  
377  	if (get_zone_thread_name(thread_config->logical_threads,
378  				 thread_config->logical_zone_count,
379  				 thread_id, "logQ", buffer, buffer_length))
380  		return;
381  
382  	if (get_zone_thread_name(thread_config->physical_threads,
383  				 thread_config->physical_zone_count,
384  				 thread_id, "physQ", buffer, buffer_length))
385  		return;
386  
387  	if (get_zone_thread_name(thread_config->hash_zone_threads,
388  				 thread_config->hash_zone_count,
389  				 thread_id, "hashQ", buffer, buffer_length))
390  		return;
391  
392  	if (get_zone_thread_name(thread_config->bio_threads,
393  				 thread_config->bio_thread_count,
394  				 thread_id, "bioQ", buffer, buffer_length))
395  		return;
396  
397  	/* Some sort of misconfiguration? */
398  	snprintf(buffer, buffer_length, "reqQ%d", thread_id);
399  }
400  
401  /**
402   * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for
403   *                     round-robin queues).
404   * @vdo: The vdo which owns the thread.
405   * @thread_id: The id of the thread to create (as determined by the thread_config).
406   * @type: The description of the work queue for this thread.
407   * @queue_count: The number of actual threads/queues contained in the "thread".
408   * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL.
409   *
410   * Each "thread" constructed by this method is represented by a unique thread id in the thread
411   * config, and completions can be enqueued to the queue and run on the threads comprising this
412   * entity.
413   *
414   * Return: VDO_SUCCESS or an error.
415   */
vdo_make_thread(struct vdo * vdo,thread_id_t thread_id,const struct vdo_work_queue_type * type,unsigned int queue_count,void * contexts[])416  int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id,
417  		    const struct vdo_work_queue_type *type,
418  		    unsigned int queue_count, void *contexts[])
419  {
420  	struct vdo_thread *thread = &vdo->threads[thread_id];
421  	char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN];
422  
423  	if (type == NULL)
424  		type = &default_queue_type;
425  
426  	if (thread->queue != NULL) {
427  		return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type),
428  				  "already constructed vdo thread %u is of the correct type",
429  				  thread_id);
430  	}
431  
432  	thread->vdo = vdo;
433  	thread->thread_id = thread_id;
434  	get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name));
435  	return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread,
436  				   type, queue_count, contexts, &thread->queue);
437  }
438  
439  /**
440   * register_vdo() - Register a VDO; it must not already be registered.
441   * @vdo: The vdo to register.
442   *
443   * Return: VDO_SUCCESS or an error.
444   */
register_vdo(struct vdo * vdo)445  static int register_vdo(struct vdo *vdo)
446  {
447  	int result;
448  
449  	write_lock(&registry.lock);
450  	result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL,
451  			    "VDO not already registered");
452  	if (result == VDO_SUCCESS) {
453  		INIT_LIST_HEAD(&vdo->registration);
454  		list_add_tail(&vdo->registration, &registry.links);
455  	}
456  	write_unlock(&registry.lock);
457  
458  	return result;
459  }
460  
461  /**
462   * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on
463   *                    error.
464   * @vdo: The vdo being initialized
465   * @config: The configuration of the vdo
466   * @instance: The instance number of the vdo
467   * @reason: The buffer to hold the failure reason on error
468   */
initialize_vdo(struct vdo * vdo,struct device_config * config,unsigned int instance,char ** reason)469  static int initialize_vdo(struct vdo *vdo, struct device_config *config,
470  			  unsigned int instance, char **reason)
471  {
472  	int result;
473  	zone_count_t i;
474  
475  	vdo->device_config = config;
476  	vdo->starting_sector_offset = config->owning_target->begin;
477  	vdo->instance = instance;
478  	vdo->allocations_allowed = true;
479  	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW);
480  	INIT_LIST_HEAD(&vdo->device_config_list);
481  	vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION);
482  	init_completion(&vdo->admin.callback_sync);
483  	mutex_init(&vdo->stats_mutex);
484  	result = read_geometry_block(vdo);
485  	if (result != VDO_SUCCESS) {
486  		*reason = "Could not load geometry block";
487  		return result;
488  	}
489  
490  	result = initialize_thread_config(config->thread_counts, &vdo->thread_config);
491  	if (result != VDO_SUCCESS) {
492  		*reason = "Cannot create thread configuration";
493  		return result;
494  	}
495  
496  	vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d",
497  		     config->thread_counts.logical_zones,
498  		     config->thread_counts.physical_zones,
499  		     config->thread_counts.hash_zones, vdo->thread_config.thread_count);
500  
501  	/* Compression context storage */
502  	result = vdo_allocate(config->thread_counts.cpu_threads, char *, "LZ4 context",
503  			      &vdo->compression_context);
504  	if (result != VDO_SUCCESS) {
505  		*reason = "cannot allocate LZ4 context";
506  		return result;
507  	}
508  
509  	for (i = 0; i < config->thread_counts.cpu_threads; i++) {
510  		result = vdo_allocate(LZ4_MEM_COMPRESS, char, "LZ4 context",
511  				      &vdo->compression_context[i]);
512  		if (result != VDO_SUCCESS) {
513  			*reason = "cannot allocate LZ4 context";
514  			return result;
515  		}
516  	}
517  
518  	result = register_vdo(vdo);
519  	if (result != VDO_SUCCESS) {
520  		*reason = "Cannot add VDO to device registry";
521  		return result;
522  	}
523  
524  	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED);
525  	return result;
526  }
527  
528  /**
529   * vdo_make() - Allocate and initialize a vdo.
530   * @instance: Device instantiation counter.
531   * @config: The device configuration.
532   * @reason: The reason for any failure during this call.
533   * @vdo_ptr: A pointer to hold the created vdo.
534   *
535   * Return: VDO_SUCCESS or an error.
536   */
vdo_make(unsigned int instance,struct device_config * config,char ** reason,struct vdo ** vdo_ptr)537  int vdo_make(unsigned int instance, struct device_config *config, char **reason,
538  	     struct vdo **vdo_ptr)
539  {
540  	int result;
541  	struct vdo *vdo;
542  
543  	/* Initialize with a generic failure reason to prevent returning garbage. */
544  	*reason = "Unspecified error";
545  
546  	result = vdo_allocate(1, struct vdo, __func__, &vdo);
547  	if (result != VDO_SUCCESS) {
548  		*reason = "Cannot allocate VDO";
549  		return result;
550  	}
551  
552  	result = initialize_vdo(vdo, config, instance, reason);
553  	if (result != VDO_SUCCESS) {
554  		vdo_destroy(vdo);
555  		return result;
556  	}
557  
558  	/* From here on, the caller will clean up if there is an error. */
559  	*vdo_ptr = vdo;
560  
561  	snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix),
562  		 "%s%u", MODULE_NAME, instance);
563  	BUG_ON(vdo->thread_name_prefix[0] == '\0');
564  	result = vdo_allocate(vdo->thread_config.thread_count,
565  			      struct vdo_thread, __func__, &vdo->threads);
566  	if (result != VDO_SUCCESS) {
567  		*reason = "Cannot allocate thread structures";
568  		return result;
569  	}
570  
571  	result = vdo_make_thread(vdo, vdo->thread_config.admin_thread,
572  				 &default_queue_type, 1, NULL);
573  	if (result != VDO_SUCCESS) {
574  		*reason = "Cannot make admin thread";
575  		return result;
576  	}
577  
578  	result = vdo_make_flusher(vdo);
579  	if (result != VDO_SUCCESS) {
580  		*reason = "Cannot make flusher zones";
581  		return result;
582  	}
583  
584  	result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer);
585  	if (result != VDO_SUCCESS) {
586  		*reason = "Cannot make packer zones";
587  		return result;
588  	}
589  
590  	BUG_ON(vdo->device_config->logical_block_size <= 0);
591  	BUG_ON(vdo->device_config->owned_device == NULL);
592  	result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS,
593  				    MAXIMUM_VDO_USER_VIOS * 3 / 4,
594  				    &vdo->data_vio_pool);
595  	if (result != VDO_SUCCESS) {
596  		*reason = "Cannot allocate data_vio pool";
597  		return result;
598  	}
599  
600  	result = vdo_make_io_submitter(config->thread_counts.bio_threads,
601  				       config->thread_counts.bio_rotation_interval,
602  				       get_data_vio_pool_request_limit(vdo->data_vio_pool),
603  				       vdo, &vdo->io_submitter);
604  	if (result != VDO_SUCCESS) {
605  		*reason = "bio submission initialization failed";
606  		return result;
607  	}
608  
609  	if (vdo_uses_bio_ack_queue(vdo)) {
610  		result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread,
611  					 &bio_ack_q_type,
612  					 config->thread_counts.bio_ack_threads, NULL);
613  		if (result != VDO_SUCCESS) {
614  			*reason = "bio ack queue initialization failed";
615  			return result;
616  		}
617  	}
618  
619  	result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type,
620  				 config->thread_counts.cpu_threads,
621  				 (void **) vdo->compression_context);
622  	if (result != VDO_SUCCESS) {
623  		*reason = "CPU queue initialization failed";
624  		return result;
625  	}
626  
627  	return VDO_SUCCESS;
628  }
629  
finish_vdo(struct vdo * vdo)630  static void finish_vdo(struct vdo *vdo)
631  {
632  	int i;
633  
634  	if (vdo->threads == NULL)
635  		return;
636  
637  	vdo_cleanup_io_submitter(vdo->io_submitter);
638  	vdo_finish_dedupe_index(vdo->hash_zones);
639  
640  	for (i = 0; i < vdo->thread_config.thread_count; i++)
641  		vdo_finish_work_queue(vdo->threads[i].queue);
642  }
643  
644  /**
645   * free_listeners() - Free the list of read-only listeners associated with a thread.
646   * @thread_data: The thread holding the list to free.
647   */
free_listeners(struct vdo_thread * thread)648  static void free_listeners(struct vdo_thread *thread)
649  {
650  	struct read_only_listener *listener, *next;
651  
652  	for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) {
653  		next = vdo_forget(listener->next);
654  		vdo_free(listener);
655  	}
656  }
657  
uninitialize_super_block(struct vdo_super_block * super_block)658  static void uninitialize_super_block(struct vdo_super_block *super_block)
659  {
660  	free_vio_components(&super_block->vio);
661  	vdo_free(super_block->buffer);
662  }
663  
664  /**
665   * unregister_vdo() - Remove a vdo from the device registry.
666   * @vdo: The vdo to remove.
667   */
unregister_vdo(struct vdo * vdo)668  static void unregister_vdo(struct vdo *vdo)
669  {
670  	write_lock(&registry.lock);
671  	if (filter_vdos_locked(vdo_is_equal, vdo) == vdo)
672  		list_del_init(&vdo->registration);
673  
674  	write_unlock(&registry.lock);
675  }
676  
677  /**
678   * vdo_destroy() - Destroy a vdo instance.
679   * @vdo: The vdo to destroy (may be NULL).
680   */
vdo_destroy(struct vdo * vdo)681  void vdo_destroy(struct vdo *vdo)
682  {
683  	unsigned int i;
684  
685  	if (vdo == NULL)
686  		return;
687  
688  	/* A running VDO should never be destroyed without suspending first. */
689  	BUG_ON(vdo_get_admin_state(vdo)->normal);
690  
691  	vdo->allocations_allowed = true;
692  
693  	finish_vdo(vdo);
694  	unregister_vdo(vdo);
695  	free_data_vio_pool(vdo->data_vio_pool);
696  	vdo_free_io_submitter(vdo_forget(vdo->io_submitter));
697  	vdo_free_flusher(vdo_forget(vdo->flusher));
698  	vdo_free_packer(vdo_forget(vdo->packer));
699  	vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal));
700  	vdo_free_slab_depot(vdo_forget(vdo->depot));
701  	vdo_uninitialize_layout(&vdo->layout);
702  	vdo_uninitialize_layout(&vdo->next_layout);
703  	if (vdo->partition_copier)
704  		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
705  	uninitialize_super_block(&vdo->super_block);
706  	vdo_free_block_map(vdo_forget(vdo->block_map));
707  	vdo_free_hash_zones(vdo_forget(vdo->hash_zones));
708  	vdo_free_physical_zones(vdo_forget(vdo->physical_zones));
709  	vdo_free_logical_zones(vdo_forget(vdo->logical_zones));
710  
711  	if (vdo->threads != NULL) {
712  		for (i = 0; i < vdo->thread_config.thread_count; i++) {
713  			free_listeners(&vdo->threads[i]);
714  			vdo_free_work_queue(vdo_forget(vdo->threads[i].queue));
715  		}
716  		vdo_free(vdo_forget(vdo->threads));
717  	}
718  
719  	uninitialize_thread_config(&vdo->thread_config);
720  
721  	if (vdo->compression_context != NULL) {
722  		for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++)
723  			vdo_free(vdo_forget(vdo->compression_context[i]));
724  
725  		vdo_free(vdo_forget(vdo->compression_context));
726  	}
727  	vdo_free(vdo);
728  }
729  
initialize_super_block(struct vdo * vdo,struct vdo_super_block * super_block)730  static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block)
731  {
732  	int result;
733  
734  	result = vdo_allocate(VDO_BLOCK_SIZE, char, "encoded super block",
735  			      (char **) &vdo->super_block.buffer);
736  	if (result != VDO_SUCCESS)
737  		return result;
738  
739  	return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK,
740  				       VIO_PRIORITY_METADATA, NULL, 1,
741  				       (char *) super_block->buffer,
742  				       &vdo->super_block.vio);
743  }
744  
745  /**
746   * finish_reading_super_block() - Continue after loading the super block.
747   * @completion: The super block vio.
748   *
749   * This callback is registered in vdo_load_super_block().
750   */
finish_reading_super_block(struct vdo_completion * completion)751  static void finish_reading_super_block(struct vdo_completion *completion)
752  {
753  	struct vdo_super_block *super_block =
754  		container_of(as_vio(completion), struct vdo_super_block, vio);
755  
756  	vdo_continue_completion(vdo_forget(completion->parent),
757  				vdo_decode_super_block(super_block->buffer));
758  }
759  
760  /**
761   * handle_super_block_read_error() - Handle an error reading the super block.
762   * @completion: The super block vio.
763   *
764   * This error handler is registered in vdo_load_super_block().
765   */
handle_super_block_read_error(struct vdo_completion * completion)766  static void handle_super_block_read_error(struct vdo_completion *completion)
767  {
768  	vio_record_metadata_io_error(as_vio(completion));
769  	finish_reading_super_block(completion);
770  }
771  
read_super_block_endio(struct bio * bio)772  static void read_super_block_endio(struct bio *bio)
773  {
774  	struct vio *vio = bio->bi_private;
775  	struct vdo_completion *parent = vio->completion.parent;
776  
777  	continue_vio_after_io(vio, finish_reading_super_block,
778  			      parent->callback_thread_id);
779  }
780  
781  /**
782   * vdo_load_super_block() - Allocate a super block and read its contents from storage.
783   * @vdo: The vdo containing the super block on disk.
784   * @parent: The completion to notify after loading the super block.
785   */
vdo_load_super_block(struct vdo * vdo,struct vdo_completion * parent)786  void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent)
787  {
788  	int result;
789  
790  	result = initialize_super_block(vdo, &vdo->super_block);
791  	if (result != VDO_SUCCESS) {
792  		vdo_continue_completion(parent, result);
793  		return;
794  	}
795  
796  	vdo->super_block.vio.completion.parent = parent;
797  	vdo_submit_metadata_vio(&vdo->super_block.vio,
798  				vdo_get_data_region_start(vdo->geometry),
799  				read_super_block_endio,
800  				handle_super_block_read_error,
801  				REQ_OP_READ);
802  }
803  
804  /**
805   * vdo_get_backing_device() - Get the block device object underlying a vdo.
806   * @vdo: The vdo.
807   *
808   * Return: The vdo's current block device.
809   */
vdo_get_backing_device(const struct vdo * vdo)810  struct block_device *vdo_get_backing_device(const struct vdo *vdo)
811  {
812  	return vdo->device_config->owned_device->bdev;
813  }
814  
815  /**
816   * vdo_get_device_name() - Get the device name associated with the vdo target.
817   * @target: The target device interface.
818   *
819   * Return: The block device name.
820   */
vdo_get_device_name(const struct dm_target * target)821  const char *vdo_get_device_name(const struct dm_target *target)
822  {
823  	return dm_device_name(dm_table_get_md(target->table));
824  }
825  
826  /**
827   * vdo_synchronous_flush() - Issue a flush request and wait for it to complete.
828   * @vdo: The vdo.
829   *
830   * Return: VDO_SUCCESS or an error.
831   */
vdo_synchronous_flush(struct vdo * vdo)832  int vdo_synchronous_flush(struct vdo *vdo)
833  {
834  	int result;
835  	struct bio bio;
836  
837  	bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0,
838  		 REQ_OP_WRITE | REQ_PREFLUSH);
839  	submit_bio_wait(&bio);
840  	result = blk_status_to_errno(bio.bi_status);
841  
842  	atomic64_inc(&vdo->stats.flush_out);
843  	if (result != 0) {
844  		vdo_log_error_strerror(result, "synchronous flush failed");
845  		result = -EIO;
846  	}
847  
848  	bio_uninit(&bio);
849  	return result;
850  }
851  
852  /**
853   * vdo_get_state() - Get the current state of the vdo.
854   * @vdo: The vdo.
855  
856   * Context: This method may be called from any thread.
857   *
858   * Return: The current state of the vdo.
859   */
vdo_get_state(const struct vdo * vdo)860  enum vdo_state vdo_get_state(const struct vdo *vdo)
861  {
862  	enum vdo_state state = atomic_read(&vdo->state);
863  
864  	/* pairs with barriers where state field is changed */
865  	smp_rmb();
866  	return state;
867  }
868  
869  /**
870   * vdo_set_state() - Set the current state of the vdo.
871   * @vdo: The vdo whose state is to be set.
872   * @state: The new state of the vdo.
873   *
874   * Context: This method may be called from any thread.
875   */
vdo_set_state(struct vdo * vdo,enum vdo_state state)876  void vdo_set_state(struct vdo *vdo, enum vdo_state state)
877  {
878  	/* pairs with barrier in vdo_get_state */
879  	smp_wmb();
880  	atomic_set(&vdo->state, state);
881  }
882  
883  /**
884   * vdo_get_admin_state() - Get the admin state of the vdo.
885   * @vdo: The vdo.
886   *
887   * Return: The code for the vdo's current admin state.
888   */
vdo_get_admin_state(const struct vdo * vdo)889  const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo)
890  {
891  	return vdo_get_admin_state_code(&vdo->admin.state);
892  }
893  
894  /**
895   * record_vdo() - Record the state of the VDO for encoding in the super block.
896   */
record_vdo(struct vdo * vdo)897  static void record_vdo(struct vdo *vdo)
898  {
899  	/* This is for backwards compatibility. */
900  	vdo->states.unused = vdo->geometry.unused;
901  	vdo->states.vdo.state = vdo_get_state(vdo);
902  	vdo->states.block_map = vdo_record_block_map(vdo->block_map);
903  	vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal);
904  	vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot);
905  	vdo->states.layout = vdo->layout;
906  }
907  
908  /**
909   * continue_super_block_parent() - Continue the parent of a super block save operation.
910   * @completion: The super block vio.
911   *
912   * This callback is registered in vdo_save_components().
913   */
continue_super_block_parent(struct vdo_completion * completion)914  static void continue_super_block_parent(struct vdo_completion *completion)
915  {
916  	vdo_continue_completion(vdo_forget(completion->parent), completion->result);
917  }
918  
919  /**
920   * handle_save_error() - Log a super block save error.
921   * @completion: The super block vio.
922   *
923   * This error handler is registered in vdo_save_components().
924   */
handle_save_error(struct vdo_completion * completion)925  static void handle_save_error(struct vdo_completion *completion)
926  {
927  	struct vdo_super_block *super_block =
928  		container_of(as_vio(completion), struct vdo_super_block, vio);
929  
930  	vio_record_metadata_io_error(&super_block->vio);
931  	vdo_log_error_strerror(completion->result, "super block save failed");
932  	/*
933  	 * Mark the super block as unwritable so that we won't attempt to write it again. This
934  	 * avoids the case where a growth attempt fails writing the super block with the new size,
935  	 * but the subsequent attempt to write out the read-only state succeeds. In this case,
936  	 * writes which happened just before the suspend would not be visible if the VDO is
937  	 * restarted without rebuilding, but, after a read-only rebuild, the effects of those
938  	 * writes would reappear.
939  	 */
940  	super_block->unwritable = true;
941  	completion->callback(completion);
942  }
943  
super_block_write_endio(struct bio * bio)944  static void super_block_write_endio(struct bio *bio)
945  {
946  	struct vio *vio = bio->bi_private;
947  	struct vdo_completion *parent = vio->completion.parent;
948  
949  	continue_vio_after_io(vio, continue_super_block_parent,
950  			      parent->callback_thread_id);
951  }
952  
953  /**
954   * vdo_save_components() - Encode the vdo and save the super block asynchronously.
955   * @vdo: The vdo whose state is being saved.
956   * @parent: The completion to notify when the save is complete.
957   */
vdo_save_components(struct vdo * vdo,struct vdo_completion * parent)958  void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent)
959  {
960  	struct vdo_super_block *super_block = &vdo->super_block;
961  
962  	if (super_block->unwritable) {
963  		vdo_continue_completion(parent, VDO_READ_ONLY);
964  		return;
965  	}
966  
967  	if (super_block->vio.completion.parent != NULL) {
968  		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
969  		return;
970  	}
971  
972  	record_vdo(vdo);
973  
974  	vdo_encode_super_block(super_block->buffer, &vdo->states);
975  	super_block->vio.completion.parent = parent;
976  	super_block->vio.completion.callback_thread_id = parent->callback_thread_id;
977  	vdo_submit_metadata_vio(&super_block->vio,
978  				vdo_get_data_region_start(vdo->geometry),
979  				super_block_write_endio, handle_save_error,
980  				REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA);
981  }
982  
983  /**
984   * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes
985   *                                     read-only.
986   * @vdo: The vdo to register with.
987   * @listener: The object to notify.
988   * @notification: The function to call to send the notification.
989   * @thread_id: The id of the thread on which to send the notification.
990   *
991   * Return: VDO_SUCCESS or an error.
992   */
vdo_register_read_only_listener(struct vdo * vdo,void * listener,vdo_read_only_notification_fn notification,thread_id_t thread_id)993  int vdo_register_read_only_listener(struct vdo *vdo, void *listener,
994  				    vdo_read_only_notification_fn notification,
995  				    thread_id_t thread_id)
996  {
997  	struct vdo_thread *thread = &vdo->threads[thread_id];
998  	struct read_only_listener *read_only_listener;
999  	int result;
1000  
1001  	result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread,
1002  			    "read only listener not registered on dedupe thread");
1003  	if (result != VDO_SUCCESS)
1004  		return result;
1005  
1006  	result = vdo_allocate(1, struct read_only_listener, __func__,
1007  			      &read_only_listener);
1008  	if (result != VDO_SUCCESS)
1009  		return result;
1010  
1011  	*read_only_listener = (struct read_only_listener) {
1012  		.listener = listener,
1013  		.notify = notification,
1014  		.next = thread->listeners,
1015  	};
1016  
1017  	thread->listeners = read_only_listener;
1018  	return VDO_SUCCESS;
1019  }
1020  
1021  /**
1022   * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only.
1023   * @listener: The vdo.
1024   * @parent: The completion to notify in order to acknowledge the notification.
1025   *
1026   * This will save the read-only state to the super block.
1027   *
1028   * Implements vdo_read_only_notification_fn.
1029   */
notify_vdo_of_read_only_mode(void * listener,struct vdo_completion * parent)1030  static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent)
1031  {
1032  	struct vdo *vdo = listener;
1033  
1034  	if (vdo_in_read_only_mode(vdo))
1035  		vdo_finish_completion(parent);
1036  
1037  	vdo_set_state(vdo, VDO_READ_ONLY_MODE);
1038  	vdo_save_components(vdo, parent);
1039  }
1040  
1041  /**
1042   * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors.
1043   * @vdo: The vdo to enable.
1044   *
1045   * Return: VDO_SUCCESS or an error.
1046   */
vdo_enable_read_only_entry(struct vdo * vdo)1047  int vdo_enable_read_only_entry(struct vdo *vdo)
1048  {
1049  	thread_id_t id;
1050  	bool is_read_only = vdo_in_read_only_mode(vdo);
1051  	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1052  
1053  	if (is_read_only) {
1054  		notifier->read_only_error = VDO_READ_ONLY;
1055  		notifier->state = NOTIFIED;
1056  	} else {
1057  		notifier->state = MAY_NOT_NOTIFY;
1058  	}
1059  
1060  	spin_lock_init(&notifier->lock);
1061  	vdo_initialize_completion(&notifier->completion, vdo,
1062  				  VDO_READ_ONLY_MODE_COMPLETION);
1063  
1064  	for (id = 0; id < vdo->thread_config.thread_count; id++)
1065  		vdo->threads[id].is_read_only = is_read_only;
1066  
1067  	return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode,
1068  					       vdo->thread_config.admin_thread);
1069  }
1070  
1071  /**
1072   * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in
1073   *                                                progress and prevent any subsequent
1074   *                                                notifications.
1075   * @parent: The completion to notify when no threads are entering read-only mode.
1076   *
1077   * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry().
1078   */
vdo_wait_until_not_entering_read_only_mode(struct vdo_completion * parent)1079  void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent)
1080  {
1081  	struct vdo *vdo = parent->vdo;
1082  	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1083  
1084  	vdo_assert_on_admin_thread(vdo, __func__);
1085  
1086  	if (notifier->waiter != NULL) {
1087  		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1088  		return;
1089  	}
1090  
1091  	spin_lock(&notifier->lock);
1092  	if (notifier->state == NOTIFYING)
1093  		notifier->waiter = parent;
1094  	else if (notifier->state == MAY_NOTIFY)
1095  		notifier->state = MAY_NOT_NOTIFY;
1096  	spin_unlock(&notifier->lock);
1097  
1098  	if (notifier->waiter == NULL) {
1099  		/*
1100  		 * A notification was not in progress, and now they are
1101  		 * disallowed.
1102  		 */
1103  		vdo_launch_completion(parent);
1104  		return;
1105  	}
1106  }
1107  
1108  /**
1109   * as_notifier() - Convert a generic vdo_completion to a read_only_notifier.
1110   * @completion: The completion to convert.
1111   *
1112   * Return: The completion as a read_only_notifier.
1113   */
as_notifier(struct vdo_completion * completion)1114  static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion)
1115  {
1116  	vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION);
1117  	return container_of(completion, struct read_only_notifier, completion);
1118  }
1119  
1120  /**
1121   * finish_entering_read_only_mode() - Complete the process of entering read only mode.
1122   * @completion: The read-only mode completion.
1123   */
finish_entering_read_only_mode(struct vdo_completion * completion)1124  static void finish_entering_read_only_mode(struct vdo_completion *completion)
1125  {
1126  	struct read_only_notifier *notifier = as_notifier(completion);
1127  
1128  	vdo_assert_on_admin_thread(completion->vdo, __func__);
1129  
1130  	spin_lock(&notifier->lock);
1131  	notifier->state = NOTIFIED;
1132  	spin_unlock(&notifier->lock);
1133  
1134  	if (notifier->waiter != NULL)
1135  		vdo_continue_completion(vdo_forget(notifier->waiter),
1136  					completion->result);
1137  }
1138  
1139  /**
1140   * make_thread_read_only() - Inform each thread that the VDO is in read-only mode.
1141   * @completion: The read-only mode completion.
1142   */
make_thread_read_only(struct vdo_completion * completion)1143  static void make_thread_read_only(struct vdo_completion *completion)
1144  {
1145  	struct vdo *vdo = completion->vdo;
1146  	thread_id_t thread_id = completion->callback_thread_id;
1147  	struct read_only_notifier *notifier = as_notifier(completion);
1148  	struct read_only_listener *listener = completion->parent;
1149  
1150  	if (listener == NULL) {
1151  		/* This is the first call on this thread */
1152  		struct vdo_thread *thread = &vdo->threads[thread_id];
1153  
1154  		thread->is_read_only = true;
1155  		listener = thread->listeners;
1156  		if (thread_id == 0)
1157  			vdo_log_error_strerror(READ_ONCE(notifier->read_only_error),
1158  					       "Unrecoverable error, entering read-only mode");
1159  	} else {
1160  		/* We've just finished notifying a listener */
1161  		listener = listener->next;
1162  	}
1163  
1164  	if (listener != NULL) {
1165  		/* We have a listener to notify */
1166  		vdo_prepare_completion(completion, make_thread_read_only,
1167  				       make_thread_read_only, thread_id,
1168  				       listener);
1169  		listener->notify(listener->listener, completion);
1170  		return;
1171  	}
1172  
1173  	/* We're done with this thread */
1174  	if (++thread_id == vdo->thread_config.dedupe_thread) {
1175  		/*
1176  		 * We don't want to notify the dedupe thread since it may be
1177  		 * blocked rebuilding the index.
1178  		 */
1179  		thread_id++;
1180  	}
1181  
1182  	if (thread_id >= vdo->thread_config.thread_count) {
1183  		/* There are no more threads */
1184  		vdo_prepare_completion(completion, finish_entering_read_only_mode,
1185  				       finish_entering_read_only_mode,
1186  				       vdo->thread_config.admin_thread, NULL);
1187  	} else {
1188  		vdo_prepare_completion(completion, make_thread_read_only,
1189  				       make_thread_read_only, thread_id, NULL);
1190  	}
1191  
1192  	vdo_launch_completion(completion);
1193  }
1194  
1195  /**
1196   * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode,
1197   *                                    reversing the effects of
1198   *                                    vdo_wait_until_not_entering_read_only_mode().
1199   * @parent: The object to notify once the operation is complete.
1200   *
1201   * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it
1202   * will be done when this method is called. If that happens, the parent will not be notified until
1203   * the vdo has actually entered read-only mode and attempted to save the super block.
1204   *
1205   * Context: This method may only be called from the admin thread.
1206   */
vdo_allow_read_only_mode_entry(struct vdo_completion * parent)1207  void vdo_allow_read_only_mode_entry(struct vdo_completion *parent)
1208  {
1209  	struct vdo *vdo = parent->vdo;
1210  	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1211  
1212  	vdo_assert_on_admin_thread(vdo, __func__);
1213  
1214  	if (notifier->waiter != NULL) {
1215  		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1216  		return;
1217  	}
1218  
1219  	spin_lock(&notifier->lock);
1220  	if (notifier->state == MAY_NOT_NOTIFY) {
1221  		if (notifier->read_only_error == VDO_SUCCESS) {
1222  			notifier->state = MAY_NOTIFY;
1223  		} else {
1224  			notifier->state = NOTIFYING;
1225  			notifier->waiter = parent;
1226  		}
1227  	}
1228  	spin_unlock(&notifier->lock);
1229  
1230  	if (notifier->waiter == NULL) {
1231  		/* We're done */
1232  		vdo_launch_completion(parent);
1233  		return;
1234  	}
1235  
1236  	/* Do the pending notification. */
1237  	make_thread_read_only(&notifier->completion);
1238  }
1239  
1240  /**
1241   * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the
1242   *                              super block.
1243   * @vdo: The vdo.
1244   * @error_code: The error which caused the VDO to enter read-only mode.
1245   *
1246   * This method is a no-op if the VDO is already read-only.
1247   */
vdo_enter_read_only_mode(struct vdo * vdo,int error_code)1248  void vdo_enter_read_only_mode(struct vdo *vdo, int error_code)
1249  {
1250  	bool notify = false;
1251  	thread_id_t thread_id = vdo_get_callback_thread_id();
1252  	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1253  	struct vdo_thread *thread;
1254  
1255  	if (thread_id != VDO_INVALID_THREAD_ID) {
1256  		thread = &vdo->threads[thread_id];
1257  		if (thread->is_read_only) {
1258  			/* This thread has already gone read-only. */
1259  			return;
1260  		}
1261  
1262  		/* Record for this thread that the VDO is read-only. */
1263  		thread->is_read_only = true;
1264  	}
1265  
1266  	spin_lock(&notifier->lock);
1267  	if (notifier->read_only_error == VDO_SUCCESS) {
1268  		WRITE_ONCE(notifier->read_only_error, error_code);
1269  		if (notifier->state == MAY_NOTIFY) {
1270  			notifier->state = NOTIFYING;
1271  			notify = true;
1272  		}
1273  	}
1274  	spin_unlock(&notifier->lock);
1275  
1276  	if (!notify) {
1277  		/* The notifier is already aware of a read-only error */
1278  		return;
1279  	}
1280  
1281  	/* Initiate a notification starting on the lowest numbered thread. */
1282  	vdo_launch_completion_callback(&notifier->completion, make_thread_read_only, 0);
1283  }
1284  
1285  /**
1286   * vdo_is_read_only() - Check whether the VDO is read-only.
1287   * @vdo: The vdo.
1288   *
1289   * Return: true if the vdo is read-only.
1290   *
1291   * This method may be called from any thread, as opposed to examining the VDO's state field which
1292   * is only safe to check from the admin thread.
1293   */
vdo_is_read_only(struct vdo * vdo)1294  bool vdo_is_read_only(struct vdo *vdo)
1295  {
1296  	return vdo->threads[vdo_get_callback_thread_id()].is_read_only;
1297  }
1298  
1299  /**
1300   * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode.
1301   * @vdo: The vdo to query.
1302   *
1303   * Return: true if the vdo is in read-only mode.
1304   */
vdo_in_read_only_mode(const struct vdo * vdo)1305  bool vdo_in_read_only_mode(const struct vdo *vdo)
1306  {
1307  	return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE);
1308  }
1309  
1310  /**
1311   * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode.
1312   * @vdo: The vdo to query.
1313   *
1314   * Return: true if the vdo is in recovery mode.
1315   */
vdo_in_recovery_mode(const struct vdo * vdo)1316  bool vdo_in_recovery_mode(const struct vdo *vdo)
1317  {
1318  	return (vdo_get_state(vdo) == VDO_RECOVERING);
1319  }
1320  
1321  /**
1322   * vdo_enter_recovery_mode() - Put the vdo into recovery mode.
1323   * @vdo: The vdo.
1324   */
vdo_enter_recovery_mode(struct vdo * vdo)1325  void vdo_enter_recovery_mode(struct vdo *vdo)
1326  {
1327  	vdo_assert_on_admin_thread(vdo, __func__);
1328  
1329  	if (vdo_in_read_only_mode(vdo))
1330  		return;
1331  
1332  	vdo_log_info("Entering recovery mode");
1333  	vdo_set_state(vdo, VDO_RECOVERING);
1334  }
1335  
1336  /**
1337   * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete.
1338   * @completion: The sync completion.
1339   */
complete_synchronous_action(struct vdo_completion * completion)1340  static void complete_synchronous_action(struct vdo_completion *completion)
1341  {
1342  	vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION);
1343  	complete(&(container_of(completion, struct sync_completion,
1344  				vdo_completion)->completion));
1345  }
1346  
1347  /**
1348   * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete.
1349   * @vdo: The vdo.
1350   * @action: The callback to launch.
1351   * @thread_id: The thread on which to run the action.
1352   * @parent: The parent of the sync completion (may be NULL).
1353   */
perform_synchronous_action(struct vdo * vdo,vdo_action_fn action,thread_id_t thread_id,void * parent)1354  static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action,
1355  				      thread_id_t thread_id, void *parent)
1356  {
1357  	struct sync_completion sync;
1358  
1359  	vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION);
1360  	init_completion(&sync.completion);
1361  	sync.vdo_completion.parent = parent;
1362  	vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id);
1363  	wait_for_completion(&sync.completion);
1364  	return sync.vdo_completion.result;
1365  }
1366  
1367  /**
1368   * set_compression_callback() - Callback to turn compression on or off.
1369   * @completion: The completion.
1370   */
set_compression_callback(struct vdo_completion * completion)1371  static void set_compression_callback(struct vdo_completion *completion)
1372  {
1373  	struct vdo *vdo = completion->vdo;
1374  	bool *enable = completion->parent;
1375  	bool was_enabled = vdo_get_compressing(vdo);
1376  
1377  	if (*enable != was_enabled) {
1378  		WRITE_ONCE(vdo->compressing, *enable);
1379  		if (was_enabled) {
1380  			/* Signal the packer to flush since compression has been disabled. */
1381  			vdo_flush_packer(vdo->packer);
1382  		}
1383  	}
1384  
1385  	vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled"));
1386  	*enable = was_enabled;
1387  	complete_synchronous_action(completion);
1388  }
1389  
1390  /**
1391   * vdo_set_compressing() - Turn compression on or off.
1392   * @vdo: The vdo.
1393   * @enable: Whether to enable or disable compression.
1394   *
1395   * Return: Whether compression was previously on or off.
1396   */
vdo_set_compressing(struct vdo * vdo,bool enable)1397  bool vdo_set_compressing(struct vdo *vdo, bool enable)
1398  {
1399  	perform_synchronous_action(vdo, set_compression_callback,
1400  				   vdo->thread_config.packer_thread,
1401  				   &enable);
1402  	return enable;
1403  }
1404  
1405  /**
1406   * vdo_get_compressing() - Get whether compression is enabled in a vdo.
1407   * @vdo: The vdo.
1408   *
1409   * Return: State of compression.
1410   */
vdo_get_compressing(struct vdo * vdo)1411  bool vdo_get_compressing(struct vdo *vdo)
1412  {
1413  	return READ_ONCE(vdo->compressing);
1414  }
1415  
get_block_map_cache_size(const struct vdo * vdo)1416  static size_t get_block_map_cache_size(const struct vdo *vdo)
1417  {
1418  	return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE;
1419  }
1420  
get_vdo_error_statistics(const struct vdo * vdo)1421  static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo)
1422  {
1423  	/*
1424  	 * The error counts can be incremented from arbitrary threads and so must be incremented
1425  	 * atomically, but they are just statistics with no semantics that could rely on memory
1426  	 * order, so unfenced reads are sufficient.
1427  	 */
1428  	const struct atomic_statistics *atoms = &vdo->stats;
1429  
1430  	return (struct error_statistics) {
1431  		.invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count),
1432  		.no_space_error_count = atomic64_read(&atoms->no_space_error_count),
1433  		.read_only_error_count = atomic64_read(&atoms->read_only_error_count),
1434  	};
1435  }
1436  
copy_bio_stat(struct bio_stats * b,const struct atomic_bio_stats * a)1437  static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a)
1438  {
1439  	b->read = atomic64_read(&a->read);
1440  	b->write = atomic64_read(&a->write);
1441  	b->discard = atomic64_read(&a->discard);
1442  	b->flush = atomic64_read(&a->flush);
1443  	b->empty_flush = atomic64_read(&a->empty_flush);
1444  	b->fua = atomic64_read(&a->fua);
1445  }
1446  
subtract_bio_stats(struct bio_stats minuend,struct bio_stats subtrahend)1447  static struct bio_stats subtract_bio_stats(struct bio_stats minuend,
1448  					   struct bio_stats subtrahend)
1449  {
1450  	return (struct bio_stats) {
1451  		.read = minuend.read - subtrahend.read,
1452  		.write = minuend.write - subtrahend.write,
1453  		.discard = minuend.discard - subtrahend.discard,
1454  		.flush = minuend.flush - subtrahend.flush,
1455  		.empty_flush = minuend.empty_flush - subtrahend.empty_flush,
1456  		.fua = minuend.fua - subtrahend.fua,
1457  	};
1458  }
1459  
1460  /**
1461   * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data.
1462   * @vdo: The vdo.
1463   *
1464   * Return: The number of blocks allocated for user data.
1465   */
vdo_get_physical_blocks_allocated(const struct vdo * vdo)1466  static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo)
1467  {
1468  	return (vdo_get_slab_depot_allocated_blocks(vdo->depot) -
1469  		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1470  }
1471  
1472  /**
1473   * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata.
1474   * @vdo: The vdo.
1475   *
1476   * Return: The number of overhead blocks.
1477   */
vdo_get_physical_blocks_overhead(const struct vdo * vdo)1478  static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo)
1479  {
1480  	/*
1481  	 * config.physical_blocks is mutated during resize and is in a packed structure,
1482  	 * but resize runs on admin thread.
1483  	 * TODO: Verify that this is always safe.
1484  	 */
1485  	return (vdo->states.vdo.config.physical_blocks -
1486  		vdo_get_slab_depot_data_blocks(vdo->depot) +
1487  		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1488  }
1489  
vdo_describe_state(enum vdo_state state)1490  static const char *vdo_describe_state(enum vdo_state state)
1491  {
1492  	/* These strings should all fit in the 15 chars of VDOStatistics.mode. */
1493  	switch (state) {
1494  	case VDO_RECOVERING:
1495  		return "recovering";
1496  
1497  	case VDO_READ_ONLY_MODE:
1498  		return "read-only";
1499  
1500  	default:
1501  		return "normal";
1502  	}
1503  }
1504  
1505  /**
1506   * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread.
1507   * @vdo: The vdo.
1508   * @stats: The statistics structure to populate.
1509   */
get_vdo_statistics(const struct vdo * vdo,struct vdo_statistics * stats)1510  static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats)
1511  {
1512  	struct recovery_journal *journal = vdo->recovery_journal;
1513  	enum vdo_state state = vdo_get_state(vdo);
1514  
1515  	vdo_assert_on_admin_thread(vdo, __func__);
1516  
1517  	/* start with a clean slate */
1518  	memset(stats, 0, sizeof(struct vdo_statistics));
1519  
1520  	/*
1521  	 * These are immutable properties of the vdo object, so it is safe to query them from any
1522  	 * thread.
1523  	 */
1524  	stats->version = STATISTICS_VERSION;
1525  	stats->logical_blocks = vdo->states.vdo.config.logical_blocks;
1526  	/*
1527  	 * config.physical_blocks is mutated during resize and is in a packed structure, but resize
1528  	 * runs on the admin thread.
1529  	 * TODO: verify that this is always safe
1530  	 */
1531  	stats->physical_blocks = vdo->states.vdo.config.physical_blocks;
1532  	stats->block_size = VDO_BLOCK_SIZE;
1533  	stats->complete_recoveries = vdo->states.vdo.complete_recoveries;
1534  	stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries;
1535  	stats->block_map_cache_size = get_block_map_cache_size(vdo);
1536  
1537  	/* The callees are responsible for thread-safety. */
1538  	stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo);
1539  	stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo);
1540  	stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal);
1541  	vdo_get_slab_depot_statistics(vdo->depot, stats);
1542  	stats->journal = vdo_get_recovery_journal_statistics(journal);
1543  	stats->packer = vdo_get_packer_statistics(vdo->packer);
1544  	stats->block_map = vdo_get_block_map_statistics(vdo->block_map);
1545  	vdo_get_dedupe_statistics(vdo->hash_zones, stats);
1546  	stats->errors = get_vdo_error_statistics(vdo);
1547  	stats->in_recovery_mode = (state == VDO_RECOVERING);
1548  	snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state));
1549  
1550  	stats->instance = vdo->instance;
1551  	stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool);
1552  	stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool);
1553  
1554  	stats->flush_out = atomic64_read(&vdo->stats.flush_out);
1555  	stats->logical_block_size = vdo->device_config->logical_block_size;
1556  	copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in);
1557  	copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial);
1558  	copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out);
1559  	copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta);
1560  	copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal);
1561  	copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache);
1562  	copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed);
1563  	copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed);
1564  	copy_bio_stat(&stats->bios_journal_completed,
1565  		      &vdo->stats.bios_journal_completed);
1566  	copy_bio_stat(&stats->bios_page_cache_completed,
1567  		      &vdo->stats.bios_page_cache_completed);
1568  	copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged);
1569  	copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial);
1570  	stats->bios_in_progress =
1571  		subtract_bio_stats(stats->bios_in, stats->bios_acknowledged);
1572  	vdo_get_memory_stats(&stats->memory_usage.bytes_used,
1573  			     &stats->memory_usage.peak_bytes_used);
1574  }
1575  
1576  /**
1577   * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics
1578   *                                   structure on the admin thread.
1579   * @completion: The completion.
1580   *
1581   * This callback is registered in vdo_fetch_statistics().
1582   */
vdo_fetch_statistics_callback(struct vdo_completion * completion)1583  static void vdo_fetch_statistics_callback(struct vdo_completion *completion)
1584  {
1585  	get_vdo_statistics(completion->vdo, completion->parent);
1586  	complete_synchronous_action(completion);
1587  }
1588  
1589  /**
1590   * vdo_fetch_statistics() - Fetch statistics on the correct thread.
1591   * @vdo: The vdo.
1592   * @stats: The vdo statistics are returned here.
1593   */
vdo_fetch_statistics(struct vdo * vdo,struct vdo_statistics * stats)1594  void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats)
1595  {
1596  	perform_synchronous_action(vdo, vdo_fetch_statistics_callback,
1597  				   vdo->thread_config.admin_thread, stats);
1598  }
1599  
1600  /**
1601   * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is
1602   *                                currently running.
1603   *
1604   * Return: The current thread ID, or -1 if no such thread.
1605   */
vdo_get_callback_thread_id(void)1606  thread_id_t vdo_get_callback_thread_id(void)
1607  {
1608  	struct vdo_work_queue *queue = vdo_get_current_work_queue();
1609  	struct vdo_thread *thread;
1610  	thread_id_t thread_id;
1611  
1612  	if (queue == NULL)
1613  		return VDO_INVALID_THREAD_ID;
1614  
1615  	thread = vdo_get_work_queue_owner(queue);
1616  	thread_id = thread->thread_id;
1617  
1618  	if (PARANOID_THREAD_CONSISTENCY_CHECKS) {
1619  		BUG_ON(thread_id >= thread->vdo->thread_config.thread_count);
1620  		BUG_ON(thread != &thread->vdo->threads[thread_id]);
1621  	}
1622  
1623  	return thread_id;
1624  }
1625  
1626  /**
1627   * vdo_dump_status() - Dump status information about a vdo to the log for debugging.
1628   * @vdo: The vdo to dump.
1629   */
vdo_dump_status(const struct vdo * vdo)1630  void vdo_dump_status(const struct vdo *vdo)
1631  {
1632  	zone_count_t zone;
1633  
1634  	vdo_dump_flusher(vdo->flusher);
1635  	vdo_dump_recovery_journal_statistics(vdo->recovery_journal);
1636  	vdo_dump_packer(vdo->packer);
1637  	vdo_dump_slab_depot(vdo->depot);
1638  
1639  	for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++)
1640  		vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]);
1641  
1642  	for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++)
1643  		vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]);
1644  
1645  	vdo_dump_hash_zones(vdo->hash_zones);
1646  }
1647  
1648  /**
1649   * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread.
1650   * @vdo: The vdo.
1651   * @name: The name of the function which should be running on the admin thread (for logging).
1652   */
vdo_assert_on_admin_thread(const struct vdo * vdo,const char * name)1653  void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name)
1654  {
1655  	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread),
1656  			    "%s called on admin thread", name);
1657  }
1658  
1659  /**
1660   * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified
1661   *                                       logical zone thread.
1662   * @vdo: The vdo.
1663   * @logical_zone: The number of the logical zone.
1664   * @name: The name of the calling function.
1665   */
vdo_assert_on_logical_zone_thread(const struct vdo * vdo,zone_count_t logical_zone,const char * name)1666  void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone,
1667  				       const char *name)
1668  {
1669  	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1670  			     vdo->thread_config.logical_threads[logical_zone]),
1671  			    "%s called on logical thread", name);
1672  }
1673  
1674  /**
1675   * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified
1676   *                                        physical zone thread.
1677   * @vdo: The vdo.
1678   * @physical_zone: The number of the physical zone.
1679   * @name: The name of the calling function.
1680   */
vdo_assert_on_physical_zone_thread(const struct vdo * vdo,zone_count_t physical_zone,const char * name)1681  void vdo_assert_on_physical_zone_thread(const struct vdo *vdo,
1682  					zone_count_t physical_zone, const char *name)
1683  {
1684  	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1685  			     vdo->thread_config.physical_threads[physical_zone]),
1686  			    "%s called on physical thread", name);
1687  }
1688  
1689  /**
1690   * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number.
1691   * @vdo: The vdo containing the physical zones.
1692   * @pbn: The PBN of the data block.
1693   * @zone_ptr: A pointer to return the physical zone.
1694   *
1695   * Gets the physical zone responsible for a given physical block number of a data block in this vdo
1696   * instance, or of the zero block (for which a NULL zone is returned). For any other block number
1697   * that is not in the range of valid data block numbers in any slab, an error will be returned.
1698   * This function is safe to call on invalid block numbers; it will not put the vdo into read-only
1699   * mode.
1700   *
1701   * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any
1702   *         other failure.
1703   */
vdo_get_physical_zone(const struct vdo * vdo,physical_block_number_t pbn,struct physical_zone ** zone_ptr)1704  int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn,
1705  			  struct physical_zone **zone_ptr)
1706  {
1707  	struct vdo_slab *slab;
1708  	int result;
1709  
1710  	if (pbn == VDO_ZERO_BLOCK) {
1711  		*zone_ptr = NULL;
1712  		return VDO_SUCCESS;
1713  	}
1714  
1715  	/*
1716  	 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first
1717  	 * because it won't trigger read-only mode on an invalid PBN.
1718  	 */
1719  	if (!vdo_is_physical_data_block(vdo->depot, pbn))
1720  		return VDO_OUT_OF_RANGE;
1721  
1722  	/* With the PBN already checked, we should always succeed in finding a slab. */
1723  	slab = vdo_get_slab(vdo->depot, pbn);
1724  	result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs");
1725  	if (result != VDO_SUCCESS)
1726  		return result;
1727  
1728  	*zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number];
1729  	return VDO_SUCCESS;
1730  }
1731