1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * Copyright 2023 Red Hat
4   */
5  
6  #include "vio.h"
7  
8  #include <linux/bio.h>
9  #include <linux/blkdev.h>
10  #include <linux/kernel.h>
11  #include <linux/ratelimit.h>
12  
13  #include "logger.h"
14  #include "memory-alloc.h"
15  #include "permassert.h"
16  
17  #include "constants.h"
18  #include "io-submitter.h"
19  #include "vdo.h"
20  
21  /* A vio_pool is a collection of preallocated vios. */
22  struct vio_pool {
23  	/* The number of objects managed by the pool */
24  	size_t size;
25  	/* The list of objects which are available */
26  	struct list_head available;
27  	/* The queue of requestors waiting for objects from the pool */
28  	struct vdo_wait_queue waiting;
29  	/* The number of objects currently in use */
30  	size_t busy_count;
31  	/* The list of objects which are in use */
32  	struct list_head busy;
33  	/* The ID of the thread on which this pool may be used */
34  	thread_id_t thread_id;
35  	/* The buffer backing the pool's vios */
36  	char *buffer;
37  	/* The pool entries */
38  	struct pooled_vio vios[];
39  };
40  
pbn_from_vio_bio(struct bio * bio)41  physical_block_number_t pbn_from_vio_bio(struct bio *bio)
42  {
43  	struct vio *vio = bio->bi_private;
44  	struct vdo *vdo = vio->completion.vdo;
45  	physical_block_number_t pbn = bio->bi_iter.bi_sector / VDO_SECTORS_PER_BLOCK;
46  
47  	return ((pbn == VDO_GEOMETRY_BLOCK_LOCATION) ? pbn : pbn + vdo->geometry.bio_offset);
48  }
49  
create_multi_block_bio(block_count_t size,struct bio ** bio_ptr)50  static int create_multi_block_bio(block_count_t size, struct bio **bio_ptr)
51  {
52  	struct bio *bio = NULL;
53  	int result;
54  
55  	result = vdo_allocate_extended(struct bio, size + 1, struct bio_vec,
56  				       "bio", &bio);
57  	if (result != VDO_SUCCESS)
58  		return result;
59  
60  	*bio_ptr = bio;
61  	return VDO_SUCCESS;
62  }
63  
vdo_create_bio(struct bio ** bio_ptr)64  int vdo_create_bio(struct bio **bio_ptr)
65  {
66  	return create_multi_block_bio(1, bio_ptr);
67  }
68  
vdo_free_bio(struct bio * bio)69  void vdo_free_bio(struct bio *bio)
70  {
71  	if (bio == NULL)
72  		return;
73  
74  	bio_uninit(bio);
75  	vdo_free(vdo_forget(bio));
76  }
77  
allocate_vio_components(struct vdo * vdo,enum vio_type vio_type,enum vio_priority priority,void * parent,unsigned int block_count,char * data,struct vio * vio)78  int allocate_vio_components(struct vdo *vdo, enum vio_type vio_type,
79  			    enum vio_priority priority, void *parent,
80  			    unsigned int block_count, char *data, struct vio *vio)
81  {
82  	struct bio *bio;
83  	int result;
84  
85  	result = VDO_ASSERT(block_count <= MAX_BLOCKS_PER_VIO,
86  			    "block count %u does not exceed maximum %u", block_count,
87  			    MAX_BLOCKS_PER_VIO);
88  	if (result != VDO_SUCCESS)
89  		return result;
90  
91  	result = VDO_ASSERT(((vio_type != VIO_TYPE_UNINITIALIZED) && (vio_type != VIO_TYPE_DATA)),
92  			    "%d is a metadata type", vio_type);
93  	if (result != VDO_SUCCESS)
94  		return result;
95  
96  	result = create_multi_block_bio(block_count, &bio);
97  	if (result != VDO_SUCCESS)
98  		return result;
99  
100  	initialize_vio(vio, bio, block_count, vio_type, priority, vdo);
101  	vio->completion.parent = parent;
102  	vio->data = data;
103  	return VDO_SUCCESS;
104  }
105  
106  /**
107   * create_multi_block_metadata_vio() - Create a vio.
108   * @vdo: The vdo on which the vio will operate.
109   * @vio_type: The type of vio to create.
110   * @priority: The relative priority to assign to the vio.
111   * @parent: The parent of the vio.
112   * @block_count: The size of the vio in blocks.
113   * @data: The buffer.
114   * @vio_ptr: A pointer to hold the new vio.
115   *
116   * Return: VDO_SUCCESS or an error.
117   */
create_multi_block_metadata_vio(struct vdo * vdo,enum vio_type vio_type,enum vio_priority priority,void * parent,unsigned int block_count,char * data,struct vio ** vio_ptr)118  int create_multi_block_metadata_vio(struct vdo *vdo, enum vio_type vio_type,
119  				    enum vio_priority priority, void *parent,
120  				    unsigned int block_count, char *data,
121  				    struct vio **vio_ptr)
122  {
123  	struct vio *vio;
124  	int result;
125  
126  	BUILD_BUG_ON(sizeof(struct vio) > 256);
127  
128  	/*
129  	 * Metadata vios should use direct allocation and not use the buffer pool, which is
130  	 * reserved for submissions from the linux block layer.
131  	 */
132  	result = vdo_allocate(1, struct vio, __func__, &vio);
133  	if (result != VDO_SUCCESS) {
134  		vdo_log_error("metadata vio allocation failure %d", result);
135  		return result;
136  	}
137  
138  	result = allocate_vio_components(vdo, vio_type, priority, parent, block_count,
139  					 data, vio);
140  	if (result != VDO_SUCCESS) {
141  		vdo_free(vio);
142  		return result;
143  	}
144  
145  	*vio_ptr  = vio;
146  	return VDO_SUCCESS;
147  }
148  
149  /**
150   * free_vio_components() - Free the components of a vio embedded in a larger structure.
151   * @vio: The vio to destroy
152   */
free_vio_components(struct vio * vio)153  void free_vio_components(struct vio *vio)
154  {
155  	if (vio == NULL)
156  		return;
157  
158  	BUG_ON(is_data_vio(vio));
159  	vdo_free_bio(vdo_forget(vio->bio));
160  }
161  
162  /**
163   * free_vio() - Destroy a vio.
164   * @vio: The vio to destroy.
165   */
free_vio(struct vio * vio)166  void free_vio(struct vio *vio)
167  {
168  	free_vio_components(vio);
169  	vdo_free(vio);
170  }
171  
172  /* Set bio properties for a VDO read or write. */
vdo_set_bio_properties(struct bio * bio,struct vio * vio,bio_end_io_t callback,blk_opf_t bi_opf,physical_block_number_t pbn)173  void vdo_set_bio_properties(struct bio *bio, struct vio *vio, bio_end_io_t callback,
174  			    blk_opf_t bi_opf, physical_block_number_t pbn)
175  {
176  	struct vdo *vdo = vio->completion.vdo;
177  	struct device_config *config = vdo->device_config;
178  
179  	pbn -= vdo->geometry.bio_offset;
180  	vio->bio_zone = ((pbn / config->thread_counts.bio_rotation_interval) %
181  			 config->thread_counts.bio_threads);
182  
183  	bio->bi_private = vio;
184  	bio->bi_end_io = callback;
185  	bio->bi_opf = bi_opf;
186  	bio->bi_iter.bi_sector = pbn * VDO_SECTORS_PER_BLOCK;
187  }
188  
189  /*
190   * Prepares the bio to perform IO with the specified buffer. May only be used on a VDO-allocated
191   * bio, as it assumes the bio wraps a 4k buffer that is 4k aligned, but there does not have to be a
192   * vio associated with the bio.
193   */
vio_reset_bio(struct vio * vio,char * data,bio_end_io_t callback,blk_opf_t bi_opf,physical_block_number_t pbn)194  int vio_reset_bio(struct vio *vio, char *data, bio_end_io_t callback,
195  		  blk_opf_t bi_opf, physical_block_number_t pbn)
196  {
197  	int bvec_count, offset, len, i;
198  	struct bio *bio = vio->bio;
199  
200  	bio_reset(bio, bio->bi_bdev, bi_opf);
201  	vdo_set_bio_properties(bio, vio, callback, bi_opf, pbn);
202  	if (data == NULL)
203  		return VDO_SUCCESS;
204  
205  	bio->bi_io_vec = bio->bi_inline_vecs;
206  	bio->bi_max_vecs = vio->block_count + 1;
207  	len = VDO_BLOCK_SIZE * vio->block_count;
208  	offset = offset_in_page(data);
209  	bvec_count = DIV_ROUND_UP(offset + len, PAGE_SIZE);
210  
211  	/*
212  	 * If we knew that data was always on one page, or contiguous pages, we wouldn't need the
213  	 * loop. But if we're using vmalloc, it's not impossible that the data is in different
214  	 * pages that can't be merged in bio_add_page...
215  	 */
216  	for (i = 0; (i < bvec_count) && (len > 0); i++) {
217  		struct page *page;
218  		int bytes_added;
219  		int bytes = PAGE_SIZE - offset;
220  
221  		if (bytes > len)
222  			bytes = len;
223  
224  		page = is_vmalloc_addr(data) ? vmalloc_to_page(data) : virt_to_page(data);
225  		bytes_added = bio_add_page(bio, page, bytes, offset);
226  
227  		if (bytes_added != bytes) {
228  			return vdo_log_error_strerror(VDO_BIO_CREATION_FAILED,
229  						      "Could only add %i bytes to bio",
230  						      bytes_added);
231  		}
232  
233  		data += bytes;
234  		len -= bytes;
235  		offset = 0;
236  	}
237  
238  	return VDO_SUCCESS;
239  }
240  
241  /**
242   * update_vio_error_stats() - Update per-vio error stats and log the error.
243   * @vio: The vio which got an error.
244   * @format: The format of the message to log (a printf style format).
245   */
update_vio_error_stats(struct vio * vio,const char * format,...)246  void update_vio_error_stats(struct vio *vio, const char *format, ...)
247  {
248  	static DEFINE_RATELIMIT_STATE(error_limiter, DEFAULT_RATELIMIT_INTERVAL,
249  				      DEFAULT_RATELIMIT_BURST);
250  	va_list args;
251  	int priority;
252  	struct vdo *vdo = vio->completion.vdo;
253  
254  	switch (vio->completion.result) {
255  	case VDO_READ_ONLY:
256  		atomic64_inc(&vdo->stats.read_only_error_count);
257  		return;
258  
259  	case VDO_NO_SPACE:
260  		atomic64_inc(&vdo->stats.no_space_error_count);
261  		priority = VDO_LOG_DEBUG;
262  		break;
263  
264  	default:
265  		priority = VDO_LOG_ERR;
266  	}
267  
268  	if (!__ratelimit(&error_limiter))
269  		return;
270  
271  	va_start(args, format);
272  	vdo_vlog_strerror(priority, vio->completion.result, VDO_LOGGING_MODULE_NAME,
273  			  format, args);
274  	va_end(args);
275  }
276  
vio_record_metadata_io_error(struct vio * vio)277  void vio_record_metadata_io_error(struct vio *vio)
278  {
279  	const char *description;
280  	physical_block_number_t pbn = pbn_from_vio_bio(vio->bio);
281  
282  	if (bio_op(vio->bio) == REQ_OP_READ) {
283  		description = "read";
284  	} else if ((vio->bio->bi_opf & REQ_PREFLUSH) == REQ_PREFLUSH) {
285  		description = (((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) ?
286  			       "write+preflush+fua" :
287  			       "write+preflush");
288  	} else if ((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) {
289  		description = "write+fua";
290  	} else {
291  		description = "write";
292  	}
293  
294  	update_vio_error_stats(vio,
295  			       "Completing %s vio of type %u for physical block %llu with error",
296  			       description, vio->type, (unsigned long long) pbn);
297  }
298  
299  /**
300   * make_vio_pool() - Create a new vio pool.
301   * @vdo: The vdo.
302   * @pool_size: The number of vios in the pool.
303   * @thread_id: The ID of the thread using this pool.
304   * @vio_type: The type of vios in the pool.
305   * @priority: The priority with which vios from the pool should be enqueued.
306   * @context: The context that each entry will have.
307   * @pool_ptr: The resulting pool.
308   *
309   * Return: A success or error code.
310   */
make_vio_pool(struct vdo * vdo,size_t pool_size,thread_id_t thread_id,enum vio_type vio_type,enum vio_priority priority,void * context,struct vio_pool ** pool_ptr)311  int make_vio_pool(struct vdo *vdo, size_t pool_size, thread_id_t thread_id,
312  		  enum vio_type vio_type, enum vio_priority priority, void *context,
313  		  struct vio_pool **pool_ptr)
314  {
315  	struct vio_pool *pool;
316  	char *ptr;
317  	int result;
318  
319  	result = vdo_allocate_extended(struct vio_pool, pool_size, struct pooled_vio,
320  				       __func__, &pool);
321  	if (result != VDO_SUCCESS)
322  		return result;
323  
324  	pool->thread_id = thread_id;
325  	INIT_LIST_HEAD(&pool->available);
326  	INIT_LIST_HEAD(&pool->busy);
327  
328  	result = vdo_allocate(pool_size * VDO_BLOCK_SIZE, char,
329  			      "VIO pool buffer", &pool->buffer);
330  	if (result != VDO_SUCCESS) {
331  		free_vio_pool(pool);
332  		return result;
333  	}
334  
335  	ptr = pool->buffer;
336  	for (pool->size = 0; pool->size < pool_size; pool->size++, ptr += VDO_BLOCK_SIZE) {
337  		struct pooled_vio *pooled = &pool->vios[pool->size];
338  
339  		result = allocate_vio_components(vdo, vio_type, priority, NULL, 1, ptr,
340  						 &pooled->vio);
341  		if (result != VDO_SUCCESS) {
342  			free_vio_pool(pool);
343  			return result;
344  		}
345  
346  		pooled->context = context;
347  		list_add_tail(&pooled->pool_entry, &pool->available);
348  	}
349  
350  	*pool_ptr = pool;
351  	return VDO_SUCCESS;
352  }
353  
354  /**
355   * free_vio_pool() - Destroy a vio pool.
356   * @pool: The pool to free.
357   */
free_vio_pool(struct vio_pool * pool)358  void free_vio_pool(struct vio_pool *pool)
359  {
360  	struct pooled_vio *pooled, *tmp;
361  
362  	if (pool == NULL)
363  		return;
364  
365  	/* Remove all available vios from the object pool. */
366  	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&pool->waiting),
367  			    "VIO pool must not have any waiters when being freed");
368  	VDO_ASSERT_LOG_ONLY((pool->busy_count == 0),
369  			    "VIO pool must not have %zu busy entries when being freed",
370  			    pool->busy_count);
371  	VDO_ASSERT_LOG_ONLY(list_empty(&pool->busy),
372  			    "VIO pool must not have busy entries when being freed");
373  
374  	list_for_each_entry_safe(pooled, tmp, &pool->available, pool_entry) {
375  		list_del(&pooled->pool_entry);
376  		free_vio_components(&pooled->vio);
377  		pool->size--;
378  	}
379  
380  	VDO_ASSERT_LOG_ONLY(pool->size == 0,
381  			    "VIO pool must not have missing entries when being freed");
382  
383  	vdo_free(vdo_forget(pool->buffer));
384  	vdo_free(pool);
385  }
386  
387  /**
388   * is_vio_pool_busy() - Check whether an vio pool has outstanding entries.
389   *
390   * Return: true if the pool is busy.
391   */
is_vio_pool_busy(struct vio_pool * pool)392  bool is_vio_pool_busy(struct vio_pool *pool)
393  {
394  	return (pool->busy_count != 0);
395  }
396  
397  /**
398   * acquire_vio_from_pool() - Acquire a vio and buffer from the pool (asynchronous).
399   * @pool: The vio pool.
400   * @waiter: Object that is requesting a vio.
401   */
acquire_vio_from_pool(struct vio_pool * pool,struct vdo_waiter * waiter)402  void acquire_vio_from_pool(struct vio_pool *pool, struct vdo_waiter *waiter)
403  {
404  	struct pooled_vio *pooled;
405  
406  	VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
407  			    "acquire from active vio_pool called from correct thread");
408  
409  	if (list_empty(&pool->available)) {
410  		vdo_waitq_enqueue_waiter(&pool->waiting, waiter);
411  		return;
412  	}
413  
414  	pooled = list_first_entry(&pool->available, struct pooled_vio, pool_entry);
415  	pool->busy_count++;
416  	list_move_tail(&pooled->pool_entry, &pool->busy);
417  	(*waiter->callback)(waiter, pooled);
418  }
419  
420  /**
421   * return_vio_to_pool() - Return a vio to the pool
422   * @pool: The vio pool.
423   * @vio: The pooled vio to return.
424   */
return_vio_to_pool(struct vio_pool * pool,struct pooled_vio * vio)425  void return_vio_to_pool(struct vio_pool *pool, struct pooled_vio *vio)
426  {
427  	VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
428  			    "vio pool entry returned on same thread as it was acquired");
429  
430  	vio->vio.completion.error_handler = NULL;
431  	vio->vio.completion.parent = NULL;
432  	if (vdo_waitq_has_waiters(&pool->waiting)) {
433  		vdo_waitq_notify_next_waiter(&pool->waiting, NULL, vio);
434  		return;
435  	}
436  
437  	list_move_tail(&vio->pool_entry, &pool->available);
438  	--pool->busy_count;
439  }
440  
441  /*
442   * Various counting functions for statistics.
443   * These are used for bios coming into VDO, as well as bios generated by VDO.
444   */
vdo_count_bios(struct atomic_bio_stats * bio_stats,struct bio * bio)445  void vdo_count_bios(struct atomic_bio_stats *bio_stats, struct bio *bio)
446  {
447  	if (((bio->bi_opf & REQ_PREFLUSH) != 0) && (bio->bi_iter.bi_size == 0)) {
448  		atomic64_inc(&bio_stats->empty_flush);
449  		atomic64_inc(&bio_stats->flush);
450  		return;
451  	}
452  
453  	switch (bio_op(bio)) {
454  	case REQ_OP_WRITE:
455  		atomic64_inc(&bio_stats->write);
456  		break;
457  	case REQ_OP_READ:
458  		atomic64_inc(&bio_stats->read);
459  		break;
460  	case REQ_OP_DISCARD:
461  		atomic64_inc(&bio_stats->discard);
462  		break;
463  		/*
464  		 * All other operations are filtered out in dmvdo.c, or not created by VDO, so
465  		 * shouldn't exist.
466  		 */
467  	default:
468  		VDO_ASSERT_LOG_ONLY(0, "Bio operation %d not a write, read, discard, or empty flush",
469  				    bio_op(bio));
470  	}
471  
472  	if ((bio->bi_opf & REQ_PREFLUSH) != 0)
473  		atomic64_inc(&bio_stats->flush);
474  	if (bio->bi_opf & REQ_FUA)
475  		atomic64_inc(&bio_stats->fua);
476  }
477  
count_all_bios_completed(struct vio * vio,struct bio * bio)478  static void count_all_bios_completed(struct vio *vio, struct bio *bio)
479  {
480  	struct atomic_statistics *stats = &vio->completion.vdo->stats;
481  
482  	if (is_data_vio(vio)) {
483  		vdo_count_bios(&stats->bios_out_completed, bio);
484  		return;
485  	}
486  
487  	vdo_count_bios(&stats->bios_meta_completed, bio);
488  	if (vio->type == VIO_TYPE_RECOVERY_JOURNAL)
489  		vdo_count_bios(&stats->bios_journal_completed, bio);
490  	else if (vio->type == VIO_TYPE_BLOCK_MAP)
491  		vdo_count_bios(&stats->bios_page_cache_completed, bio);
492  }
493  
vdo_count_completed_bios(struct bio * bio)494  void vdo_count_completed_bios(struct bio *bio)
495  {
496  	struct vio *vio = (struct vio *) bio->bi_private;
497  
498  	atomic64_inc(&vio->completion.vdo->stats.bios_completed);
499  	count_all_bios_completed(vio, bio);
500  }
501