1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "data-vio.h"
7
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/blkdev.h>
11 #include <linux/delay.h>
12 #include <linux/device-mapper.h>
13 #include <linux/jiffies.h>
14 #include <linux/kernel.h>
15 #include <linux/list.h>
16 #include <linux/lz4.h>
17 #include <linux/minmax.h>
18 #include <linux/sched.h>
19 #include <linux/spinlock.h>
20 #include <linux/wait.h>
21
22 #include "logger.h"
23 #include "memory-alloc.h"
24 #include "murmurhash3.h"
25 #include "permassert.h"
26
27 #include "block-map.h"
28 #include "dump.h"
29 #include "encodings.h"
30 #include "int-map.h"
31 #include "io-submitter.h"
32 #include "logical-zone.h"
33 #include "packer.h"
34 #include "recovery-journal.h"
35 #include "slab-depot.h"
36 #include "status-codes.h"
37 #include "types.h"
38 #include "vdo.h"
39 #include "vio.h"
40 #include "wait-queue.h"
41
42 /**
43 * DOC: Bio flags.
44 *
45 * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
46 * flags on our own bio(s) for that request may help underlying layers better fulfill the user
47 * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
48 * flags, as they convey incorrect information.
49 *
50 * These flags are always irrelevant if we have already finished the user bio as they are only
51 * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
52 * important finishing the finished bio was.
53 *
54 * Note that bio.c contains the complete list of flags we believe may be set; the following list
55 * explains the action taken with each of those flags VDO could receive:
56 *
57 * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
58 * completion is required for further work to be done by the issuer.
59 * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
60 * treats it as more urgent, similar to REQ_SYNC.
61 * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
62 * important.
63 * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
64 * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
65 * match incoming IO, so this flag is incorrect for it.
66 * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
67 * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
68 * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
69 * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
70 * prioritization.
71 */
72 static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
73
74 /**
75 * DOC:
76 *
77 * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
78 * correctness, and in order to avoid potentially expensive or blocking memory allocations during
79 * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
80 * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
81 * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
82 * for which a data_vio or discard permit are not available will block until the necessary
83 * resources are available. The pool is also responsible for distributing resources to blocked
84 * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
85 * performing the work of actually assigning resources to blocked threads or placing data_vios back
86 * into the pool on a single cpu at a time.
87 *
88 * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
89 * permits. The limiters also provide safe cross-thread access to pool statistics without the need
90 * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
91 * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
92 * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
93 * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
94 * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
95 * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
96 * break if jiffies are only 32 bits.)
97 *
98 * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
99 * will be called on it. This function will add the data_vio to a funnel queue, and then check the
100 * state of the pool. If the pool is not currently processing released data_vios, the pool's
101 * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
102 * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
103 * threads.
104 *
105 * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
106 * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
107 * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
108 * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
109 * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
110 * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
111 * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
112 * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
113 * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
114 * them are awakened.
115 */
116
117 #define DATA_VIO_RELEASE_BATCH_SIZE 128
118
119 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
120 static const u32 COMPRESSION_STATUS_MASK = 0xff;
121 static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000;
122
123 struct limiter;
124 typedef void (*assigner_fn)(struct limiter *limiter);
125
126 /* Bookkeeping structure for a single type of resource. */
127 struct limiter {
128 /* The data_vio_pool to which this limiter belongs */
129 struct data_vio_pool *pool;
130 /* The maximum number of data_vios available */
131 data_vio_count_t limit;
132 /* The number of resources in use */
133 data_vio_count_t busy;
134 /* The maximum number of resources ever simultaneously in use */
135 data_vio_count_t max_busy;
136 /* The number of resources to release */
137 data_vio_count_t release_count;
138 /* The number of waiters to wake */
139 data_vio_count_t wake_count;
140 /* The list of waiting bios which are known to process_release_callback() */
141 struct bio_list waiters;
142 /* The list of waiting bios which are not yet known to process_release_callback() */
143 struct bio_list new_waiters;
144 /* The list of waiters which have their permits */
145 struct bio_list *permitted_waiters;
146 /* The function for assigning a resource to a waiter */
147 assigner_fn assigner;
148 /* The queue of blocked threads */
149 wait_queue_head_t blocked_threads;
150 /* The arrival time of the eldest waiter */
151 u64 arrival;
152 };
153
154 /*
155 * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
156 * and are released in batches.
157 */
158 struct data_vio_pool {
159 /* Completion for scheduling releases */
160 struct vdo_completion completion;
161 /* The administrative state of the pool */
162 struct admin_state state;
163 /* Lock protecting the pool */
164 spinlock_t lock;
165 /* The main limiter controlling the total data_vios in the pool. */
166 struct limiter limiter;
167 /* The limiter controlling data_vios for discard */
168 struct limiter discard_limiter;
169 /* The list of bios which have discard permits but still need a data_vio */
170 struct bio_list permitted_discards;
171 /* The list of available data_vios */
172 struct list_head available;
173 /* The queue of data_vios waiting to be returned to the pool */
174 struct funnel_queue *queue;
175 /* Whether the pool is processing, or scheduled to process releases */
176 atomic_t processing;
177 /* The data vios in the pool */
178 struct data_vio data_vios[];
179 };
180
181 static const char * const ASYNC_OPERATION_NAMES[] = {
182 "launch",
183 "acknowledge_write",
184 "acquire_hash_lock",
185 "attempt_logical_block_lock",
186 "lock_duplicate_pbn",
187 "check_for_duplication",
188 "cleanup",
189 "compress_data_vio",
190 "find_block_map_slot",
191 "get_mapped_block_for_read",
192 "get_mapped_block_for_write",
193 "hash_data_vio",
194 "journal_remapping",
195 "vdo_attempt_packing",
196 "put_mapped_block",
197 "read_data_vio",
198 "update_dedupe_index",
199 "update_reference_counts",
200 "verify_duplication",
201 "write_data_vio",
202 };
203
204 /* The steps taken cleaning up a VIO, in the order they are performed. */
205 enum data_vio_cleanup_stage {
206 VIO_CLEANUP_START,
207 VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
208 VIO_RELEASE_ALLOCATED,
209 VIO_RELEASE_RECOVERY_LOCKS,
210 VIO_RELEASE_LOGICAL,
211 VIO_CLEANUP_DONE
212 };
213
214 static inline struct data_vio_pool * __must_check
as_data_vio_pool(struct vdo_completion * completion)215 as_data_vio_pool(struct vdo_completion *completion)
216 {
217 vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION);
218 return container_of(completion, struct data_vio_pool, completion);
219 }
220
get_arrival_time(struct bio * bio)221 static inline u64 get_arrival_time(struct bio *bio)
222 {
223 return (u64) bio->bi_private;
224 }
225
226 /**
227 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
228 * or waiters while holding the pool's lock.
229 */
check_for_drain_complete_locked(struct data_vio_pool * pool)230 static bool check_for_drain_complete_locked(struct data_vio_pool *pool)
231 {
232 if (pool->limiter.busy > 0)
233 return false;
234
235 VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0),
236 "no outstanding discard permits");
237
238 return (bio_list_empty(&pool->limiter.new_waiters) &&
239 bio_list_empty(&pool->discard_limiter.new_waiters));
240 }
241
initialize_lbn_lock(struct data_vio * data_vio,logical_block_number_t lbn)242 static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn)
243 {
244 struct vdo *vdo = vdo_from_data_vio(data_vio);
245 zone_count_t zone_number;
246 struct lbn_lock *lock = &data_vio->logical;
247
248 lock->lbn = lbn;
249 lock->locked = false;
250 vdo_waitq_init(&lock->waiters);
251 zone_number = vdo_compute_logical_zone(data_vio);
252 lock->zone = &vdo->logical_zones->zones[zone_number];
253 }
254
launch_locked_request(struct data_vio * data_vio)255 static void launch_locked_request(struct data_vio *data_vio)
256 {
257 data_vio->logical.locked = true;
258 if (data_vio->write) {
259 struct vdo *vdo = vdo_from_data_vio(data_vio);
260
261 if (vdo_is_read_only(vdo)) {
262 continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
263 return;
264 }
265 }
266
267 data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT;
268 vdo_find_block_map_slot(data_vio);
269 }
270
acknowledge_data_vio(struct data_vio * data_vio)271 static void acknowledge_data_vio(struct data_vio *data_vio)
272 {
273 struct vdo *vdo = vdo_from_data_vio(data_vio);
274 struct bio *bio = data_vio->user_bio;
275 int error = vdo_status_to_errno(data_vio->vio.completion.result);
276
277 if (bio == NULL)
278 return;
279
280 VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
281 (u32) (VDO_BLOCK_SIZE - data_vio->offset)),
282 "data_vio to acknowledge is not an incomplete discard");
283
284 data_vio->user_bio = NULL;
285 vdo_count_bios(&vdo->stats.bios_acknowledged, bio);
286 if (data_vio->is_partial)
287 vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
288
289 bio->bi_status = errno_to_blk_status(error);
290 bio_endio(bio);
291 }
292
copy_to_bio(struct bio * bio,char * data_ptr)293 static void copy_to_bio(struct bio *bio, char *data_ptr)
294 {
295 struct bio_vec biovec;
296 struct bvec_iter iter;
297
298 bio_for_each_segment(biovec, bio, iter) {
299 memcpy_to_bvec(&biovec, data_ptr);
300 data_ptr += biovec.bv_len;
301 }
302 }
303
get_data_vio_compression_status(struct data_vio * data_vio)304 struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio)
305 {
306 u32 packed = atomic_read(&data_vio->compression.status);
307
308 /* pairs with cmpxchg in set_data_vio_compression_status */
309 smp_rmb();
310 return (struct data_vio_compression_status) {
311 .stage = packed & COMPRESSION_STATUS_MASK,
312 .may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0),
313 };
314 }
315
316 /**
317 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
318 * atomically.
319 * @status: The state to convert.
320 *
321 * Return: The compression state packed into a u32.
322 */
pack_status(struct data_vio_compression_status status)323 static u32 __must_check pack_status(struct data_vio_compression_status status)
324 {
325 return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
326 }
327
328 /**
329 * set_data_vio_compression_status() - Set the compression status of a data_vio.
330 * @state: The expected current status of the data_vio.
331 * @new_state: The status to set.
332 *
333 * Return: true if the new status was set, false if the data_vio's compression status did not
334 * match the expected state, and so was left unchanged.
335 */
336 static bool __must_check
set_data_vio_compression_status(struct data_vio * data_vio,struct data_vio_compression_status status,struct data_vio_compression_status new_status)337 set_data_vio_compression_status(struct data_vio *data_vio,
338 struct data_vio_compression_status status,
339 struct data_vio_compression_status new_status)
340 {
341 u32 actual;
342 u32 expected = pack_status(status);
343 u32 replacement = pack_status(new_status);
344
345 /*
346 * Extra barriers because this was original developed using a CAS operation that implicitly
347 * had them.
348 */
349 smp_mb__before_atomic();
350 actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement);
351 /* same as before_atomic */
352 smp_mb__after_atomic();
353 return (expected == actual);
354 }
355
advance_data_vio_compression_stage(struct data_vio * data_vio)356 struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
357 {
358 for (;;) {
359 struct data_vio_compression_status status =
360 get_data_vio_compression_status(data_vio);
361 struct data_vio_compression_status new_status = status;
362
363 if (status.stage == DATA_VIO_POST_PACKER) {
364 /* We're already in the last stage. */
365 return status;
366 }
367
368 if (status.may_not_compress) {
369 /*
370 * Compression has been dis-allowed for this VIO, so skip the rest of the
371 * path and go to the end.
372 */
373 new_status.stage = DATA_VIO_POST_PACKER;
374 } else {
375 /* Go to the next state. */
376 new_status.stage++;
377 }
378
379 if (set_data_vio_compression_status(data_vio, status, new_status))
380 return new_status;
381
382 /* Another thread changed the status out from under us so try again. */
383 }
384 }
385
386 /**
387 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
388 *
389 * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
390 */
cancel_data_vio_compression(struct data_vio * data_vio)391 bool cancel_data_vio_compression(struct data_vio *data_vio)
392 {
393 struct data_vio_compression_status status, new_status;
394
395 for (;;) {
396 status = get_data_vio_compression_status(data_vio);
397 if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) {
398 /* This data_vio is already set up to not block in the packer. */
399 break;
400 }
401
402 new_status.stage = status.stage;
403 new_status.may_not_compress = true;
404
405 if (set_data_vio_compression_status(data_vio, status, new_status))
406 break;
407 }
408
409 return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress);
410 }
411
412 /**
413 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
414 * @completion: The data_vio for an external data request as a completion.
415 *
416 * This is the start of the path for all external requests. It is registered in launch_data_vio().
417 */
attempt_logical_block_lock(struct vdo_completion * completion)418 static void attempt_logical_block_lock(struct vdo_completion *completion)
419 {
420 struct data_vio *data_vio = as_data_vio(completion);
421 struct lbn_lock *lock = &data_vio->logical;
422 struct vdo *vdo = vdo_from_data_vio(data_vio);
423 struct data_vio *lock_holder;
424 int result;
425
426 assert_data_vio_in_logical_zone(data_vio);
427
428 if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
429 continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE);
430 return;
431 }
432
433 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
434 data_vio, false, (void **) &lock_holder);
435 if (result != VDO_SUCCESS) {
436 continue_data_vio_with_error(data_vio, result);
437 return;
438 }
439
440 if (lock_holder == NULL) {
441 /* We got the lock */
442 launch_locked_request(data_vio);
443 return;
444 }
445
446 result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held");
447 if (result != VDO_SUCCESS) {
448 continue_data_vio_with_error(data_vio, result);
449 return;
450 }
451
452 /*
453 * If the new request is a pure read request (not read-modify-write) and the lock_holder is
454 * writing and has received an allocation, service the read request immediately by copying
455 * data from the lock_holder to avoid having to flush the write out of the packer just to
456 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
457 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
458 * order to prevent returning data that may not have actually been written.
459 */
460 if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
461 copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
462 acknowledge_data_vio(data_vio);
463 complete_data_vio(completion);
464 return;
465 }
466
467 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK;
468 vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter);
469
470 /*
471 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
472 * packer.
473 */
474 if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
475 data_vio->compression.lock_holder = lock_holder;
476 launch_data_vio_packer_callback(data_vio,
477 vdo_remove_lock_holder_from_packer);
478 }
479 }
480
481 /**
482 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
483 * same parent and other state and send it on its way.
484 */
launch_data_vio(struct data_vio * data_vio,logical_block_number_t lbn)485 static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
486 {
487 struct vdo_completion *completion = &data_vio->vio.completion;
488
489 /*
490 * Clearing the tree lock must happen before initializing the LBN lock, which also adds
491 * information to the tree lock.
492 */
493 memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
494 initialize_lbn_lock(data_vio, lbn);
495 INIT_LIST_HEAD(&data_vio->hash_lock_entry);
496 INIT_LIST_HEAD(&data_vio->write_entry);
497
498 memset(&data_vio->allocation, 0, sizeof(data_vio->allocation));
499
500 data_vio->is_duplicate = false;
501
502 memset(&data_vio->record_name, 0, sizeof(data_vio->record_name));
503 memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate));
504 vdo_reset_completion(&data_vio->decrement_completion);
505 vdo_reset_completion(completion);
506 completion->error_handler = handle_data_vio_error;
507 set_data_vio_logical_callback(data_vio, attempt_logical_block_lock);
508 vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY);
509 }
510
is_zero_block(char * block)511 static bool is_zero_block(char *block)
512 {
513 int i;
514
515 for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
516 if (*((u64 *) &block[i]))
517 return false;
518 }
519
520 return true;
521 }
522
copy_from_bio(struct bio * bio,char * data_ptr)523 static void copy_from_bio(struct bio *bio, char *data_ptr)
524 {
525 struct bio_vec biovec;
526 struct bvec_iter iter;
527
528 bio_for_each_segment(biovec, bio, iter) {
529 memcpy_from_bvec(data_ptr, &biovec);
530 data_ptr += biovec.bv_len;
531 }
532 }
533
launch_bio(struct vdo * vdo,struct data_vio * data_vio,struct bio * bio)534 static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
535 {
536 logical_block_number_t lbn;
537 /*
538 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
539 * separately allocated objects).
540 */
541 memset(data_vio, 0, offsetof(struct data_vio, vio));
542 memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
543
544 data_vio->user_bio = bio;
545 data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK);
546 data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0);
547
548 /*
549 * Discards behave very differently than other requests when coming in from device-mapper.
550 * We have to be able to handle any size discards and various sector offsets within a
551 * block.
552 */
553 if (bio_op(bio) == REQ_OP_DISCARD) {
554 data_vio->remaining_discard = bio->bi_iter.bi_size;
555 data_vio->write = true;
556 data_vio->is_discard = true;
557 if (data_vio->is_partial) {
558 vdo_count_bios(&vdo->stats.bios_in_partial, bio);
559 data_vio->read = true;
560 }
561 } else if (data_vio->is_partial) {
562 vdo_count_bios(&vdo->stats.bios_in_partial, bio);
563 data_vio->read = true;
564 if (bio_data_dir(bio) == WRITE)
565 data_vio->write = true;
566 } else if (bio_data_dir(bio) == READ) {
567 data_vio->read = true;
568 } else {
569 /*
570 * Copy the bio data to a char array so that we can continue to use the data after
571 * we acknowledge the bio.
572 */
573 copy_from_bio(bio, data_vio->vio.data);
574 data_vio->is_zero = is_zero_block(data_vio->vio.data);
575 data_vio->write = true;
576 }
577
578 if (data_vio->user_bio->bi_opf & REQ_FUA)
579 data_vio->fua = true;
580
581 lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK;
582 launch_data_vio(data_vio, lbn);
583 }
584
assign_data_vio(struct limiter * limiter,struct data_vio * data_vio)585 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio)
586 {
587 struct bio *bio = bio_list_pop(limiter->permitted_waiters);
588
589 launch_bio(limiter->pool->completion.vdo, data_vio, bio);
590 limiter->wake_count++;
591
592 bio = bio_list_peek(limiter->permitted_waiters);
593 limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio));
594 }
595
assign_discard_permit(struct limiter * limiter)596 static void assign_discard_permit(struct limiter *limiter)
597 {
598 struct bio *bio = bio_list_pop(&limiter->waiters);
599
600 if (limiter->arrival == U64_MAX)
601 limiter->arrival = get_arrival_time(bio);
602
603 bio_list_add(limiter->permitted_waiters, bio);
604 }
605
get_waiters(struct limiter * limiter)606 static void get_waiters(struct limiter *limiter)
607 {
608 bio_list_merge_init(&limiter->waiters, &limiter->new_waiters);
609 }
610
get_available_data_vio(struct data_vio_pool * pool)611 static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool)
612 {
613 struct data_vio *data_vio =
614 list_first_entry(&pool->available, struct data_vio, pool_entry);
615
616 list_del_init(&data_vio->pool_entry);
617 return data_vio;
618 }
619
assign_data_vio_to_waiter(struct limiter * limiter)620 static void assign_data_vio_to_waiter(struct limiter *limiter)
621 {
622 assign_data_vio(limiter, get_available_data_vio(limiter->pool));
623 }
624
update_limiter(struct limiter * limiter)625 static void update_limiter(struct limiter *limiter)
626 {
627 struct bio_list *waiters = &limiter->waiters;
628 data_vio_count_t available = limiter->limit - limiter->busy;
629
630 VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy),
631 "Release count %u is not more than busy count %u",
632 limiter->release_count, limiter->busy);
633
634 get_waiters(limiter);
635 for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
636 limiter->assigner(limiter);
637
638 if (limiter->release_count > 0) {
639 WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count);
640 limiter->release_count = 0;
641 return;
642 }
643
644 for (; (available > 0) && !bio_list_empty(waiters); available--)
645 limiter->assigner(limiter);
646
647 WRITE_ONCE(limiter->busy, limiter->limit - available);
648 if (limiter->max_busy < limiter->busy)
649 WRITE_ONCE(limiter->max_busy, limiter->busy);
650 }
651
652 /**
653 * schedule_releases() - Ensure that release processing is scheduled.
654 *
655 * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
656 * done so.
657 */
schedule_releases(struct data_vio_pool * pool)658 static void schedule_releases(struct data_vio_pool *pool)
659 {
660 /* Pairs with the barrier in process_release_callback(). */
661 smp_mb__before_atomic();
662 if (atomic_cmpxchg(&pool->processing, false, true))
663 return;
664
665 pool->completion.requeue = true;
666 vdo_launch_completion_with_priority(&pool->completion,
667 CPU_Q_COMPLETE_VIO_PRIORITY);
668 }
669
reuse_or_release_resources(struct data_vio_pool * pool,struct data_vio * data_vio,struct list_head * returned)670 static void reuse_or_release_resources(struct data_vio_pool *pool,
671 struct data_vio *data_vio,
672 struct list_head *returned)
673 {
674 if (data_vio->remaining_discard > 0) {
675 if (bio_list_empty(&pool->discard_limiter.waiters)) {
676 /* Return the data_vio's discard permit. */
677 pool->discard_limiter.release_count++;
678 } else {
679 assign_discard_permit(&pool->discard_limiter);
680 }
681 }
682
683 if (pool->limiter.arrival < pool->discard_limiter.arrival) {
684 assign_data_vio(&pool->limiter, data_vio);
685 } else if (pool->discard_limiter.arrival < U64_MAX) {
686 assign_data_vio(&pool->discard_limiter, data_vio);
687 } else {
688 list_add(&data_vio->pool_entry, returned);
689 pool->limiter.release_count++;
690 }
691 }
692
693 /**
694 * process_release_callback() - Process a batch of data_vio releases.
695 * @completion: The pool with data_vios to release.
696 */
process_release_callback(struct vdo_completion * completion)697 static void process_release_callback(struct vdo_completion *completion)
698 {
699 struct data_vio_pool *pool = as_data_vio_pool(completion);
700 bool reschedule;
701 bool drained;
702 data_vio_count_t processed;
703 data_vio_count_t to_wake;
704 data_vio_count_t discards_to_wake;
705 LIST_HEAD(returned);
706
707 spin_lock(&pool->lock);
708 get_waiters(&pool->discard_limiter);
709 get_waiters(&pool->limiter);
710 spin_unlock(&pool->lock);
711
712 if (pool->limiter.arrival == U64_MAX) {
713 struct bio *bio = bio_list_peek(&pool->limiter.waiters);
714
715 if (bio != NULL)
716 pool->limiter.arrival = get_arrival_time(bio);
717 }
718
719 for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) {
720 struct data_vio *data_vio;
721 struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue);
722
723 if (entry == NULL)
724 break;
725
726 data_vio = as_data_vio(container_of(entry, struct vdo_completion,
727 work_queue_entry_link));
728 acknowledge_data_vio(data_vio);
729 reuse_or_release_resources(pool, data_vio, &returned);
730 }
731
732 spin_lock(&pool->lock);
733 /*
734 * There is a race where waiters could be added while we are in the unlocked section above.
735 * Those waiters could not see the resources we are now about to release, so we assign
736 * those resources now as we have no guarantee of being rescheduled. This is handled in
737 * update_limiter().
738 */
739 update_limiter(&pool->discard_limiter);
740 list_splice(&returned, &pool->available);
741 update_limiter(&pool->limiter);
742 to_wake = pool->limiter.wake_count;
743 pool->limiter.wake_count = 0;
744 discards_to_wake = pool->discard_limiter.wake_count;
745 pool->discard_limiter.wake_count = 0;
746
747 atomic_set(&pool->processing, false);
748 /* Pairs with the barrier in schedule_releases(). */
749 smp_mb();
750
751 reschedule = !vdo_is_funnel_queue_empty(pool->queue);
752 drained = (!reschedule &&
753 vdo_is_state_draining(&pool->state) &&
754 check_for_drain_complete_locked(pool));
755 spin_unlock(&pool->lock);
756
757 if (to_wake > 0)
758 wake_up_nr(&pool->limiter.blocked_threads, to_wake);
759
760 if (discards_to_wake > 0)
761 wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake);
762
763 if (reschedule)
764 schedule_releases(pool);
765 else if (drained)
766 vdo_finish_draining(&pool->state);
767 }
768
initialize_limiter(struct limiter * limiter,struct data_vio_pool * pool,assigner_fn assigner,data_vio_count_t limit)769 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool,
770 assigner_fn assigner, data_vio_count_t limit)
771 {
772 limiter->pool = pool;
773 limiter->assigner = assigner;
774 limiter->limit = limit;
775 limiter->arrival = U64_MAX;
776 init_waitqueue_head(&limiter->blocked_threads);
777 }
778
779 /**
780 * initialize_data_vio() - Allocate the components of a data_vio.
781 *
782 * The caller is responsible for cleaning up the data_vio on error.
783 *
784 * Return: VDO_SUCCESS or an error.
785 */
initialize_data_vio(struct data_vio * data_vio,struct vdo * vdo)786 static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
787 {
788 struct bio *bio;
789 int result;
790
791 BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
792 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
793 &data_vio->vio.data);
794 if (result != VDO_SUCCESS)
795 return vdo_log_error_strerror(result,
796 "data_vio data allocation failure");
797
798 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
799 &data_vio->compression.block);
800 if (result != VDO_SUCCESS) {
801 return vdo_log_error_strerror(result,
802 "data_vio compressed block allocation failure");
803 }
804
805 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
806 &data_vio->scratch_block);
807 if (result != VDO_SUCCESS)
808 return vdo_log_error_strerror(result,
809 "data_vio scratch allocation failure");
810
811 result = vdo_create_bio(&bio);
812 if (result != VDO_SUCCESS)
813 return vdo_log_error_strerror(result,
814 "data_vio data bio allocation failure");
815
816 vdo_initialize_completion(&data_vio->decrement_completion, vdo,
817 VDO_DECREMENT_COMPLETION);
818 initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo);
819
820 return VDO_SUCCESS;
821 }
822
destroy_data_vio(struct data_vio * data_vio)823 static void destroy_data_vio(struct data_vio *data_vio)
824 {
825 if (data_vio == NULL)
826 return;
827
828 vdo_free_bio(vdo_forget(data_vio->vio.bio));
829 vdo_free(vdo_forget(data_vio->vio.data));
830 vdo_free(vdo_forget(data_vio->compression.block));
831 vdo_free(vdo_forget(data_vio->scratch_block));
832 }
833
834 /**
835 * make_data_vio_pool() - Initialize a data_vio pool.
836 * @vdo: The vdo to which the pool will belong.
837 * @pool_size: The number of data_vios in the pool.
838 * @discard_limit: The maximum number of data_vios which may be used for discards.
839 * @pool: A pointer to hold the newly allocated pool.
840 */
make_data_vio_pool(struct vdo * vdo,data_vio_count_t pool_size,data_vio_count_t discard_limit,struct data_vio_pool ** pool_ptr)841 int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
842 data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
843 {
844 int result;
845 struct data_vio_pool *pool;
846 data_vio_count_t i;
847
848 result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
849 __func__, &pool);
850 if (result != VDO_SUCCESS)
851 return result;
852
853 VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size),
854 "discard limit does not exceed pool size");
855 initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
856 discard_limit);
857 pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
858 initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
859 pool->limiter.permitted_waiters = &pool->limiter.waiters;
860 INIT_LIST_HEAD(&pool->available);
861 spin_lock_init(&pool->lock);
862 vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
863 vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
864 vdo_prepare_completion(&pool->completion, process_release_callback,
865 process_release_callback, vdo->thread_config.cpu_thread,
866 NULL);
867
868 result = vdo_make_funnel_queue(&pool->queue);
869 if (result != VDO_SUCCESS) {
870 free_data_vio_pool(vdo_forget(pool));
871 return result;
872 }
873
874 for (i = 0; i < pool_size; i++) {
875 struct data_vio *data_vio = &pool->data_vios[i];
876
877 result = initialize_data_vio(data_vio, vdo);
878 if (result != VDO_SUCCESS) {
879 destroy_data_vio(data_vio);
880 free_data_vio_pool(pool);
881 return result;
882 }
883
884 list_add(&data_vio->pool_entry, &pool->available);
885 }
886
887 *pool_ptr = pool;
888 return VDO_SUCCESS;
889 }
890
891 /**
892 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
893 *
894 * All data_vios must be returned to the pool before calling this function.
895 */
free_data_vio_pool(struct data_vio_pool * pool)896 void free_data_vio_pool(struct data_vio_pool *pool)
897 {
898 struct data_vio *data_vio, *tmp;
899
900 if (pool == NULL)
901 return;
902
903 /*
904 * Pairs with the barrier in process_release_callback(). Possibly not needed since it
905 * caters to an enqueue vs. free race.
906 */
907 smp_mb();
908 BUG_ON(atomic_read(&pool->processing));
909
910 spin_lock(&pool->lock);
911 VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0),
912 "data_vio pool must not have %u busy entries when being freed",
913 pool->limiter.busy);
914 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
915 bio_list_empty(&pool->limiter.new_waiters)),
916 "data_vio pool must not have threads waiting to read or write when being freed");
917 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
918 bio_list_empty(&pool->discard_limiter.new_waiters)),
919 "data_vio pool must not have threads waiting to discard when being freed");
920 spin_unlock(&pool->lock);
921
922 list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) {
923 list_del_init(&data_vio->pool_entry);
924 destroy_data_vio(data_vio);
925 }
926
927 vdo_free_funnel_queue(vdo_forget(pool->queue));
928 vdo_free(pool);
929 }
930
acquire_permit(struct limiter * limiter)931 static bool acquire_permit(struct limiter *limiter)
932 {
933 if (limiter->busy >= limiter->limit)
934 return false;
935
936 WRITE_ONCE(limiter->busy, limiter->busy + 1);
937 if (limiter->max_busy < limiter->busy)
938 WRITE_ONCE(limiter->max_busy, limiter->busy);
939 return true;
940 }
941
wait_permit(struct limiter * limiter,struct bio * bio)942 static void wait_permit(struct limiter *limiter, struct bio *bio)
943 __releases(&limiter->pool->lock)
944 {
945 DEFINE_WAIT(wait);
946
947 bio_list_add(&limiter->new_waiters, bio);
948 prepare_to_wait_exclusive(&limiter->blocked_threads, &wait,
949 TASK_UNINTERRUPTIBLE);
950 spin_unlock(&limiter->pool->lock);
951 io_schedule();
952 finish_wait(&limiter->blocked_threads, &wait);
953 }
954
955 /**
956 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
957 *
958 * This will block if data_vios or discard permits are not available.
959 */
vdo_launch_bio(struct data_vio_pool * pool,struct bio * bio)960 void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
961 {
962 struct data_vio *data_vio;
963
964 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state),
965 "data_vio_pool not quiescent on acquire");
966
967 bio->bi_private = (void *) jiffies;
968 spin_lock(&pool->lock);
969 if ((bio_op(bio) == REQ_OP_DISCARD) &&
970 !acquire_permit(&pool->discard_limiter)) {
971 wait_permit(&pool->discard_limiter, bio);
972 return;
973 }
974
975 if (!acquire_permit(&pool->limiter)) {
976 wait_permit(&pool->limiter, bio);
977 return;
978 }
979
980 data_vio = get_available_data_vio(pool);
981 spin_unlock(&pool->lock);
982 launch_bio(pool->completion.vdo, data_vio, bio);
983 }
984
985 /* Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)986 static void initiate_drain(struct admin_state *state)
987 {
988 bool drained;
989 struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state);
990
991 spin_lock(&pool->lock);
992 drained = check_for_drain_complete_locked(pool);
993 spin_unlock(&pool->lock);
994
995 if (drained)
996 vdo_finish_draining(state);
997 }
998
assert_on_vdo_cpu_thread(const struct vdo * vdo,const char * name)999 static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name)
1000 {
1001 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread),
1002 "%s called on cpu thread", name);
1003 }
1004
1005 /**
1006 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
1007 * @completion: The completion to notify when the pool has drained.
1008 */
drain_data_vio_pool(struct data_vio_pool * pool,struct vdo_completion * completion)1009 void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1010 {
1011 assert_on_vdo_cpu_thread(completion->vdo, __func__);
1012 vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
1013 initiate_drain);
1014 }
1015
1016 /**
1017 * resume_data_vio_pool() - Resume a data_vio pool.
1018 * @completion: The completion to notify when the pool has resumed.
1019 */
resume_data_vio_pool(struct data_vio_pool * pool,struct vdo_completion * completion)1020 void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1021 {
1022 assert_on_vdo_cpu_thread(completion->vdo, __func__);
1023 vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
1024 }
1025
dump_limiter(const char * name,struct limiter * limiter)1026 static void dump_limiter(const char *name, struct limiter *limiter)
1027 {
1028 vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy,
1029 limiter->limit, limiter->max_busy,
1030 ((bio_list_empty(&limiter->waiters) &&
1031 bio_list_empty(&limiter->new_waiters)) ?
1032 "no waiters" : "has waiters"));
1033 }
1034
1035 /**
1036 * dump_data_vio_pool() - Dump a data_vio pool to the log.
1037 * @dump_vios: Whether to dump the details of each busy data_vio as well.
1038 */
dump_data_vio_pool(struct data_vio_pool * pool,bool dump_vios)1039 void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
1040 {
1041 /*
1042 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
1043 * second clock tick). These numbers were picked based on experiments with lab machines.
1044 */
1045 static const int ELEMENTS_PER_BATCH = 35;
1046 static const int SLEEP_FOR_SYSLOG = 4000;
1047
1048 if (pool == NULL)
1049 return;
1050
1051 spin_lock(&pool->lock);
1052 dump_limiter("data_vios", &pool->limiter);
1053 dump_limiter("discard permits", &pool->discard_limiter);
1054 if (dump_vios) {
1055 int i;
1056 int dumped = 0;
1057
1058 for (i = 0; i < pool->limiter.limit; i++) {
1059 struct data_vio *data_vio = &pool->data_vios[i];
1060
1061 if (!list_empty(&data_vio->pool_entry))
1062 continue;
1063
1064 dump_data_vio(data_vio);
1065 if (++dumped >= ELEMENTS_PER_BATCH) {
1066 spin_unlock(&pool->lock);
1067 dumped = 0;
1068 fsleep(SLEEP_FOR_SYSLOG);
1069 spin_lock(&pool->lock);
1070 }
1071 }
1072 }
1073
1074 spin_unlock(&pool->lock);
1075 }
1076
get_data_vio_pool_active_discards(struct data_vio_pool * pool)1077 data_vio_count_t get_data_vio_pool_active_discards(struct data_vio_pool *pool)
1078 {
1079 return READ_ONCE(pool->discard_limiter.busy);
1080 }
1081
get_data_vio_pool_discard_limit(struct data_vio_pool * pool)1082 data_vio_count_t get_data_vio_pool_discard_limit(struct data_vio_pool *pool)
1083 {
1084 return READ_ONCE(pool->discard_limiter.limit);
1085 }
1086
get_data_vio_pool_maximum_discards(struct data_vio_pool * pool)1087 data_vio_count_t get_data_vio_pool_maximum_discards(struct data_vio_pool *pool)
1088 {
1089 return READ_ONCE(pool->discard_limiter.max_busy);
1090 }
1091
set_data_vio_pool_discard_limit(struct data_vio_pool * pool,data_vio_count_t limit)1092 int set_data_vio_pool_discard_limit(struct data_vio_pool *pool, data_vio_count_t limit)
1093 {
1094 if (get_data_vio_pool_request_limit(pool) < limit) {
1095 // The discard limit may not be higher than the data_vio limit.
1096 return -EINVAL;
1097 }
1098
1099 spin_lock(&pool->lock);
1100 pool->discard_limiter.limit = limit;
1101 spin_unlock(&pool->lock);
1102
1103 return VDO_SUCCESS;
1104 }
1105
get_data_vio_pool_active_requests(struct data_vio_pool * pool)1106 data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool)
1107 {
1108 return READ_ONCE(pool->limiter.busy);
1109 }
1110
get_data_vio_pool_request_limit(struct data_vio_pool * pool)1111 data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool)
1112 {
1113 return READ_ONCE(pool->limiter.limit);
1114 }
1115
get_data_vio_pool_maximum_requests(struct data_vio_pool * pool)1116 data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool)
1117 {
1118 return READ_ONCE(pool->limiter.max_busy);
1119 }
1120
update_data_vio_error_stats(struct data_vio * data_vio)1121 static void update_data_vio_error_stats(struct data_vio *data_vio)
1122 {
1123 u8 index = 0;
1124 static const char * const operations[] = {
1125 [0] = "empty",
1126 [1] = "read",
1127 [2] = "write",
1128 [3] = "read-modify-write",
1129 [5] = "read+fua",
1130 [6] = "write+fua",
1131 [7] = "read-modify-write+fua",
1132 };
1133
1134 if (data_vio->read)
1135 index = 1;
1136
1137 if (data_vio->write)
1138 index += 2;
1139
1140 if (data_vio->fua)
1141 index += 4;
1142
1143 update_vio_error_stats(&data_vio->vio,
1144 "Completing %s vio for LBN %llu with error after %s",
1145 operations[index],
1146 (unsigned long long) data_vio->logical.lbn,
1147 get_data_vio_operation_name(data_vio));
1148 }
1149
1150 static void perform_cleanup_stage(struct data_vio *data_vio,
1151 enum data_vio_cleanup_stage stage);
1152
1153 /**
1154 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1155 * the end of processing a data_vio.
1156 */
release_allocated_lock(struct vdo_completion * completion)1157 static void release_allocated_lock(struct vdo_completion *completion)
1158 {
1159 struct data_vio *data_vio = as_data_vio(completion);
1160
1161 assert_data_vio_in_allocated_zone(data_vio);
1162 release_data_vio_allocation_lock(data_vio, false);
1163 perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS);
1164 }
1165
1166 /** release_lock() - Release an uncontended LBN lock. */
release_lock(struct data_vio * data_vio,struct lbn_lock * lock)1167 static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1168 {
1169 struct int_map *lock_map = lock->zone->lbn_operations;
1170 struct data_vio *lock_holder;
1171
1172 if (!lock->locked) {
1173 /* The lock is not locked, so it had better not be registered in the lock map. */
1174 struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
1175
1176 VDO_ASSERT_LOG_ONLY((data_vio != lock_holder),
1177 "no logical block lock held for block %llu",
1178 (unsigned long long) lock->lbn);
1179 return;
1180 }
1181
1182 /* Release the lock by removing the lock from the map. */
1183 lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
1184 VDO_ASSERT_LOG_ONLY((data_vio == lock_holder),
1185 "logical block lock mismatch for block %llu",
1186 (unsigned long long) lock->lbn);
1187 lock->locked = false;
1188 }
1189
1190 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
transfer_lock(struct data_vio * data_vio,struct lbn_lock * lock)1191 static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1192 {
1193 struct data_vio *lock_holder, *next_lock_holder;
1194 int result;
1195
1196 VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
1197
1198 /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
1199 next_lock_holder =
1200 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
1201
1202 /* Transfer the remaining lock waiters to the next lock holder. */
1203 vdo_waitq_transfer_all_waiters(&lock->waiters,
1204 &next_lock_holder->logical.waiters);
1205
1206 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
1207 next_lock_holder, true, (void **) &lock_holder);
1208 if (result != VDO_SUCCESS) {
1209 continue_data_vio_with_error(next_lock_holder, result);
1210 return;
1211 }
1212
1213 VDO_ASSERT_LOG_ONLY((lock_holder == data_vio),
1214 "logical block lock mismatch for block %llu",
1215 (unsigned long long) lock->lbn);
1216 lock->locked = false;
1217
1218 /*
1219 * If there are still waiters, other data_vios must be trying to get the lock we just
1220 * transferred. We must ensure that the new lock holder doesn't block in the packer.
1221 */
1222 if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
1223 cancel_data_vio_compression(next_lock_holder);
1224
1225 /*
1226 * Avoid stack overflow on lock transfer.
1227 * FIXME: this is only an issue in the 1 thread config.
1228 */
1229 next_lock_holder->vio.completion.requeue = true;
1230 launch_locked_request(next_lock_holder);
1231 }
1232
1233 /**
1234 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1235 * processing a data_vio.
1236 */
release_logical_lock(struct vdo_completion * completion)1237 static void release_logical_lock(struct vdo_completion *completion)
1238 {
1239 struct data_vio *data_vio = as_data_vio(completion);
1240 struct lbn_lock *lock = &data_vio->logical;
1241
1242 assert_data_vio_in_logical_zone(data_vio);
1243
1244 if (vdo_waitq_has_waiters(&lock->waiters))
1245 transfer_lock(data_vio, lock);
1246 else
1247 release_lock(data_vio, lock);
1248
1249 vdo_release_flush_generation_lock(data_vio);
1250 perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE);
1251 }
1252
1253 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
clean_hash_lock(struct vdo_completion * completion)1254 static void clean_hash_lock(struct vdo_completion *completion)
1255 {
1256 struct data_vio *data_vio = as_data_vio(completion);
1257
1258 assert_data_vio_in_hash_zone(data_vio);
1259 if (completion->result != VDO_SUCCESS) {
1260 vdo_clean_failed_hash_lock(data_vio);
1261 return;
1262 }
1263
1264 vdo_release_hash_lock(data_vio);
1265 perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL);
1266 }
1267
1268 /**
1269 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1270 *
1271 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1272 * pool.
1273 */
finish_cleanup(struct data_vio * data_vio)1274 static void finish_cleanup(struct data_vio *data_vio)
1275 {
1276 struct vdo_completion *completion = &data_vio->vio.completion;
1277 u32 discard_size = min_t(u32, data_vio->remaining_discard,
1278 VDO_BLOCK_SIZE - data_vio->offset);
1279
1280 VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL,
1281 "complete data_vio has no allocation lock");
1282 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL,
1283 "complete data_vio has no hash lock");
1284 if ((data_vio->remaining_discard <= discard_size) ||
1285 (completion->result != VDO_SUCCESS)) {
1286 struct data_vio_pool *pool = completion->vdo->data_vio_pool;
1287
1288 vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link);
1289 schedule_releases(pool);
1290 return;
1291 }
1292
1293 data_vio->remaining_discard -= discard_size;
1294 data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE);
1295 data_vio->read = data_vio->is_partial;
1296 data_vio->offset = 0;
1297 completion->requeue = true;
1298 data_vio->first_reference_operation_complete = false;
1299 launch_data_vio(data_vio, data_vio->logical.lbn + 1);
1300 }
1301
1302 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
perform_cleanup_stage(struct data_vio * data_vio,enum data_vio_cleanup_stage stage)1303 static void perform_cleanup_stage(struct data_vio *data_vio,
1304 enum data_vio_cleanup_stage stage)
1305 {
1306 struct vdo *vdo = vdo_from_data_vio(data_vio);
1307
1308 switch (stage) {
1309 case VIO_RELEASE_HASH_LOCK:
1310 if (data_vio->hash_lock != NULL) {
1311 launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock);
1312 return;
1313 }
1314 fallthrough;
1315
1316 case VIO_RELEASE_ALLOCATED:
1317 if (data_vio_has_allocation(data_vio)) {
1318 launch_data_vio_allocated_zone_callback(data_vio,
1319 release_allocated_lock);
1320 return;
1321 }
1322 fallthrough;
1323
1324 case VIO_RELEASE_RECOVERY_LOCKS:
1325 if ((data_vio->recovery_sequence_number > 0) &&
1326 (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
1327 (data_vio->vio.completion.result != VDO_READ_ONLY))
1328 vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
1329 fallthrough;
1330
1331 case VIO_RELEASE_LOGICAL:
1332 launch_data_vio_logical_callback(data_vio, release_logical_lock);
1333 return;
1334
1335 default:
1336 finish_cleanup(data_vio);
1337 }
1338 }
1339
complete_data_vio(struct vdo_completion * completion)1340 void complete_data_vio(struct vdo_completion *completion)
1341 {
1342 struct data_vio *data_vio = as_data_vio(completion);
1343
1344 completion->error_handler = NULL;
1345 data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP;
1346 perform_cleanup_stage(data_vio,
1347 (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL));
1348 }
1349
enter_read_only_mode(struct vdo_completion * completion)1350 static void enter_read_only_mode(struct vdo_completion *completion)
1351 {
1352 if (vdo_is_read_only(completion->vdo))
1353 return;
1354
1355 if (completion->result != VDO_READ_ONLY) {
1356 struct data_vio *data_vio = as_data_vio(completion);
1357
1358 vdo_log_error_strerror(completion->result,
1359 "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
1360 (unsigned long long) data_vio->logical.lbn,
1361 (unsigned long long) data_vio->new_mapped.pbn,
1362 (unsigned long long) data_vio->mapped.pbn,
1363 (unsigned long long) data_vio->allocation.pbn,
1364 get_data_vio_operation_name(data_vio));
1365 }
1366
1367 vdo_enter_read_only_mode(completion->vdo, completion->result);
1368 }
1369
handle_data_vio_error(struct vdo_completion * completion)1370 void handle_data_vio_error(struct vdo_completion *completion)
1371 {
1372 struct data_vio *data_vio = as_data_vio(completion);
1373
1374 if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL))
1375 enter_read_only_mode(completion);
1376
1377 update_data_vio_error_stats(data_vio);
1378 complete_data_vio(completion);
1379 }
1380
1381 /**
1382 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1383 * data_vio.
1384 */
get_data_vio_operation_name(struct data_vio * data_vio)1385 const char *get_data_vio_operation_name(struct data_vio *data_vio)
1386 {
1387 BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
1388 ARRAY_SIZE(ASYNC_OPERATION_NAMES));
1389
1390 return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ?
1391 ASYNC_OPERATION_NAMES[data_vio->last_async_operation] :
1392 "unknown async operation");
1393 }
1394
1395 /**
1396 * data_vio_allocate_data_block() - Allocate a data block.
1397 *
1398 * @write_lock_type: The type of write lock to obtain on the block.
1399 * @callback: The callback which will attempt an allocation in the current zone and continue if it
1400 * succeeds.
1401 * @error_handler: The handler for errors while allocating.
1402 */
data_vio_allocate_data_block(struct data_vio * data_vio,enum pbn_lock_type write_lock_type,vdo_action_fn callback,vdo_action_fn error_handler)1403 void data_vio_allocate_data_block(struct data_vio *data_vio,
1404 enum pbn_lock_type write_lock_type,
1405 vdo_action_fn callback, vdo_action_fn error_handler)
1406 {
1407 struct allocation *allocation = &data_vio->allocation;
1408
1409 VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK),
1410 "data_vio does not have an allocation");
1411 allocation->write_lock_type = write_lock_type;
1412 allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
1413 allocation->first_allocation_zone = allocation->zone->zone_number;
1414
1415 data_vio->vio.completion.error_handler = error_handler;
1416 launch_data_vio_allocated_zone_callback(data_vio, callback);
1417 }
1418
1419 /**
1420 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1421 * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
1422 *
1423 * If the reference to the locked block is still provisional, it will be released as well.
1424 */
release_data_vio_allocation_lock(struct data_vio * data_vio,bool reset)1425 void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
1426 {
1427 struct allocation *allocation = &data_vio->allocation;
1428 physical_block_number_t locked_pbn = allocation->pbn;
1429
1430 assert_data_vio_in_allocated_zone(data_vio);
1431
1432 if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
1433 allocation->pbn = VDO_ZERO_BLOCK;
1434
1435 vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn,
1436 vdo_forget(allocation->lock));
1437 }
1438
1439 /**
1440 * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1441 * @mapping_state: The mapping state indicating which fragment to decompress.
1442 * @buffer: The buffer to receive the uncompressed data.
1443 */
uncompress_data_vio(struct data_vio * data_vio,enum block_mapping_state mapping_state,char * buffer)1444 int uncompress_data_vio(struct data_vio *data_vio,
1445 enum block_mapping_state mapping_state, char *buffer)
1446 {
1447 int size;
1448 u16 fragment_offset, fragment_size;
1449 struct compressed_block *block = data_vio->compression.block;
1450 int result = vdo_get_compressed_block_fragment(mapping_state, block,
1451 &fragment_offset, &fragment_size);
1452
1453 if (result != VDO_SUCCESS) {
1454 vdo_log_debug("%s: compressed fragment error %d", __func__, result);
1455 return result;
1456 }
1457
1458 size = LZ4_decompress_safe((block->data + fragment_offset), buffer,
1459 fragment_size, VDO_BLOCK_SIZE);
1460 if (size != VDO_BLOCK_SIZE) {
1461 vdo_log_debug("%s: lz4 error", __func__);
1462 return VDO_INVALID_FRAGMENT;
1463 }
1464
1465 return VDO_SUCCESS;
1466 }
1467
1468 /**
1469 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1470 * @completion: The data_vio which has just finished its read.
1471 *
1472 * This callback is registered in read_block().
1473 */
modify_for_partial_write(struct vdo_completion * completion)1474 static void modify_for_partial_write(struct vdo_completion *completion)
1475 {
1476 struct data_vio *data_vio = as_data_vio(completion);
1477 char *data = data_vio->vio.data;
1478 struct bio *bio = data_vio->user_bio;
1479
1480 assert_data_vio_on_cpu_thread(data_vio);
1481
1482 if (bio_op(bio) == REQ_OP_DISCARD) {
1483 memset(data + data_vio->offset, '\0', min_t(u32,
1484 data_vio->remaining_discard,
1485 VDO_BLOCK_SIZE - data_vio->offset));
1486 } else {
1487 copy_from_bio(bio, data + data_vio->offset);
1488 }
1489
1490 data_vio->is_zero = is_zero_block(data);
1491 data_vio->read = false;
1492 launch_data_vio_logical_callback(data_vio,
1493 continue_data_vio_with_block_map_slot);
1494 }
1495
complete_read(struct vdo_completion * completion)1496 static void complete_read(struct vdo_completion *completion)
1497 {
1498 struct data_vio *data_vio = as_data_vio(completion);
1499 char *data = data_vio->vio.data;
1500 bool compressed = vdo_is_state_compressed(data_vio->mapped.state);
1501
1502 assert_data_vio_on_cpu_thread(data_vio);
1503
1504 if (compressed) {
1505 int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data);
1506
1507 if (result != VDO_SUCCESS) {
1508 continue_data_vio_with_error(data_vio, result);
1509 return;
1510 }
1511 }
1512
1513 if (data_vio->write) {
1514 modify_for_partial_write(completion);
1515 return;
1516 }
1517
1518 if (compressed || data_vio->is_partial)
1519 copy_to_bio(data_vio->user_bio, data + data_vio->offset);
1520
1521 acknowledge_data_vio(data_vio);
1522 complete_data_vio(completion);
1523 }
1524
read_endio(struct bio * bio)1525 static void read_endio(struct bio *bio)
1526 {
1527 struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
1528 int result = blk_status_to_errno(bio->bi_status);
1529
1530 vdo_count_completed_bios(bio);
1531 if (result != VDO_SUCCESS) {
1532 continue_data_vio_with_error(data_vio, result);
1533 return;
1534 }
1535
1536 launch_data_vio_cpu_callback(data_vio, complete_read,
1537 CPU_Q_COMPLETE_READ_PRIORITY);
1538 }
1539
complete_zero_read(struct vdo_completion * completion)1540 static void complete_zero_read(struct vdo_completion *completion)
1541 {
1542 struct data_vio *data_vio = as_data_vio(completion);
1543
1544 assert_data_vio_on_cpu_thread(data_vio);
1545
1546 if (data_vio->is_partial) {
1547 memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE);
1548 if (data_vio->write) {
1549 modify_for_partial_write(completion);
1550 return;
1551 }
1552 } else {
1553 zero_fill_bio(data_vio->user_bio);
1554 }
1555
1556 complete_read(completion);
1557 }
1558
1559 /**
1560 * read_block() - Read a block asynchronously.
1561 *
1562 * This is the callback registered in read_block_mapping().
1563 */
read_block(struct vdo_completion * completion)1564 static void read_block(struct vdo_completion *completion)
1565 {
1566 struct data_vio *data_vio = as_data_vio(completion);
1567 struct vio *vio = as_vio(completion);
1568 int result = VDO_SUCCESS;
1569
1570 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1571 launch_data_vio_cpu_callback(data_vio, complete_zero_read,
1572 CPU_Q_COMPLETE_VIO_PRIORITY);
1573 return;
1574 }
1575
1576 data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO;
1577 if (vdo_is_state_compressed(data_vio->mapped.state)) {
1578 result = vio_reset_bio(vio, (char *) data_vio->compression.block,
1579 read_endio, REQ_OP_READ, data_vio->mapped.pbn);
1580 } else {
1581 blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ);
1582
1583 if (data_vio->is_partial) {
1584 result = vio_reset_bio(vio, vio->data, read_endio, opf,
1585 data_vio->mapped.pbn);
1586 } else {
1587 /* A full 4k read. Use the incoming bio to avoid having to copy the data */
1588 bio_reset(vio->bio, vio->bio->bi_bdev, opf);
1589 bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
1590 data_vio->user_bio, GFP_KERNEL);
1591
1592 /* Copy over the original bio iovec and opflags. */
1593 vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
1594 data_vio->mapped.pbn);
1595 }
1596 }
1597
1598 if (result != VDO_SUCCESS) {
1599 continue_data_vio_with_error(data_vio, result);
1600 return;
1601 }
1602
1603 vdo_submit_data_vio(data_vio);
1604 }
1605
1606 static inline struct data_vio *
reference_count_update_completion_as_data_vio(struct vdo_completion * completion)1607 reference_count_update_completion_as_data_vio(struct vdo_completion *completion)
1608 {
1609 if (completion->type == VIO_COMPLETION)
1610 return as_data_vio(completion);
1611
1612 return container_of(completion, struct data_vio, decrement_completion);
1613 }
1614
1615 /**
1616 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1617 * made its reference updates. Handle any error from either, or proceed
1618 * to updating the block map.
1619 * @completion: The completion of the write in progress.
1620 */
update_block_map(struct vdo_completion * completion)1621 static void update_block_map(struct vdo_completion *completion)
1622 {
1623 struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
1624
1625 assert_data_vio_in_logical_zone(data_vio);
1626
1627 if (!data_vio->first_reference_operation_complete) {
1628 /* Rendezvous, we're first */
1629 data_vio->first_reference_operation_complete = true;
1630 return;
1631 }
1632
1633 completion = &data_vio->vio.completion;
1634 vdo_set_completion_result(completion, data_vio->decrement_completion.result);
1635 if (completion->result != VDO_SUCCESS) {
1636 handle_data_vio_error(completion);
1637 return;
1638 }
1639
1640 completion->error_handler = handle_data_vio_error;
1641 if (data_vio->hash_lock != NULL)
1642 set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock);
1643 else
1644 completion->callback = complete_data_vio;
1645
1646 data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK;
1647 vdo_put_mapped_block(data_vio);
1648 }
1649
decrement_reference_count(struct vdo_completion * completion)1650 static void decrement_reference_count(struct vdo_completion *completion)
1651 {
1652 struct data_vio *data_vio = container_of(completion, struct data_vio,
1653 decrement_completion);
1654
1655 assert_data_vio_in_mapped_zone(data_vio);
1656
1657 vdo_set_completion_callback(completion, update_block_map,
1658 data_vio->logical.zone->thread_id);
1659 completion->error_handler = update_block_map;
1660 vdo_modify_reference_count(completion, &data_vio->decrement_updater);
1661 }
1662
increment_reference_count(struct vdo_completion * completion)1663 static void increment_reference_count(struct vdo_completion *completion)
1664 {
1665 struct data_vio *data_vio = as_data_vio(completion);
1666
1667 assert_data_vio_in_new_mapped_zone(data_vio);
1668
1669 if (data_vio->downgrade_allocation_lock) {
1670 /*
1671 * Now that the data has been written, it's safe to deduplicate against the
1672 * block. Downgrade the allocation lock to a read lock so it can be used later by
1673 * the hash lock. This is done here since it needs to happen sometime before we
1674 * return to the hash zone, and we are currently on the correct thread. For
1675 * compressed blocks, the downgrade will have already been done.
1676 */
1677 vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
1678 }
1679
1680 set_data_vio_logical_callback(data_vio, update_block_map);
1681 completion->error_handler = update_block_map;
1682 vdo_modify_reference_count(completion, &data_vio->increment_updater);
1683 }
1684
1685 /** journal_remapping() - Add a recovery journal entry for a data remapping. */
journal_remapping(struct vdo_completion * completion)1686 static void journal_remapping(struct vdo_completion *completion)
1687 {
1688 struct data_vio *data_vio = as_data_vio(completion);
1689
1690 assert_data_vio_in_journal_zone(data_vio);
1691
1692 data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING;
1693 data_vio->decrement_updater.zpbn = data_vio->mapped;
1694 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1695 data_vio->first_reference_operation_complete = true;
1696 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK)
1697 set_data_vio_logical_callback(data_vio, update_block_map);
1698 } else {
1699 set_data_vio_new_mapped_zone_callback(data_vio,
1700 increment_reference_count);
1701 }
1702
1703 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1704 data_vio->first_reference_operation_complete = true;
1705 } else {
1706 vdo_set_completion_callback(&data_vio->decrement_completion,
1707 decrement_reference_count,
1708 data_vio->mapped.zone->thread_id);
1709 }
1710
1711 data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING;
1712 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
1713 }
1714
1715 /**
1716 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1717 *
1718 * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
1719 * journal entry referencing the removal of this LBN->PBN mapping.
1720 */
read_old_block_mapping(struct vdo_completion * completion)1721 static void read_old_block_mapping(struct vdo_completion *completion)
1722 {
1723 struct data_vio *data_vio = as_data_vio(completion);
1724
1725 assert_data_vio_in_logical_zone(data_vio);
1726
1727 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE;
1728 set_data_vio_journal_callback(data_vio, journal_remapping);
1729 vdo_get_mapped_block(data_vio);
1730 }
1731
update_metadata_for_data_vio_write(struct data_vio * data_vio,struct pbn_lock * lock)1732 void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock)
1733 {
1734 data_vio->increment_updater = (struct reference_updater) {
1735 .operation = VDO_JOURNAL_DATA_REMAPPING,
1736 .increment = true,
1737 .zpbn = data_vio->new_mapped,
1738 .lock = lock,
1739 };
1740
1741 launch_data_vio_logical_callback(data_vio, read_old_block_mapping);
1742 }
1743
1744 /**
1745 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1746 *
1747 * This is the callback registered in launch_compress_data_vio().
1748 */
pack_compressed_data(struct vdo_completion * completion)1749 static void pack_compressed_data(struct vdo_completion *completion)
1750 {
1751 struct data_vio *data_vio = as_data_vio(completion);
1752
1753 assert_data_vio_in_packer_zone(data_vio);
1754
1755 if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1756 get_data_vio_compression_status(data_vio).may_not_compress) {
1757 write_data_vio(data_vio);
1758 return;
1759 }
1760
1761 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING;
1762 vdo_attempt_packing(data_vio);
1763 }
1764
1765 /**
1766 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1767 *
1768 * This callback is registered in launch_compress_data_vio().
1769 */
compress_data_vio(struct vdo_completion * completion)1770 static void compress_data_vio(struct vdo_completion *completion)
1771 {
1772 struct data_vio *data_vio = as_data_vio(completion);
1773 int size;
1774
1775 assert_data_vio_on_cpu_thread(data_vio);
1776
1777 /*
1778 * By putting the compressed data at the start of the compressed block data field, we won't
1779 * need to copy it if this data_vio becomes a compressed write agent.
1780 */
1781 size = LZ4_compress_default(data_vio->vio.data,
1782 data_vio->compression.block->data, VDO_BLOCK_SIZE,
1783 VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
1784 (char *) vdo_get_work_queue_private_data());
1785 if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
1786 data_vio->compression.size = size;
1787 launch_data_vio_packer_callback(data_vio, pack_compressed_data);
1788 return;
1789 }
1790
1791 write_data_vio(data_vio);
1792 }
1793
1794 /**
1795 * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1796 *
1797 * This is a re-entry point to vio_write used by hash locks.
1798 */
launch_compress_data_vio(struct data_vio * data_vio)1799 void launch_compress_data_vio(struct data_vio *data_vio)
1800 {
1801 VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
1802 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL,
1803 "data_vio to compress has a hash_lock");
1804 VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio),
1805 "data_vio to compress has an allocation");
1806
1807 /*
1808 * There are 4 reasons why a data_vio which has reached this point will not be eligible for
1809 * compression:
1810 *
1811 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
1812 * write request also requests FUA.
1813 *
1814 * 2) A data_vio should not be compressed when compression is disabled for the vdo.
1815 *
1816 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
1817 * yet been acknowledged and hence blocking in the packer would be bad.
1818 *
1819 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
1820 * packer would also be bad.
1821 */
1822 if (data_vio->fua ||
1823 !vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1824 ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
1825 (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
1826 write_data_vio(data_vio);
1827 return;
1828 }
1829
1830 data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO;
1831 launch_data_vio_cpu_callback(data_vio, compress_data_vio,
1832 CPU_Q_COMPRESS_BLOCK_PRIORITY);
1833 }
1834
1835 /**
1836 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1837 * name as set).
1838
1839 * This callback is registered in prepare_for_dedupe().
1840 */
hash_data_vio(struct vdo_completion * completion)1841 static void hash_data_vio(struct vdo_completion *completion)
1842 {
1843 struct data_vio *data_vio = as_data_vio(completion);
1844
1845 assert_data_vio_on_cpu_thread(data_vio);
1846 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed");
1847
1848 murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be,
1849 &data_vio->record_name);
1850
1851 data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones,
1852 &data_vio->record_name);
1853 data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK;
1854 launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock);
1855 }
1856
1857 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
prepare_for_dedupe(struct data_vio * data_vio)1858 static void prepare_for_dedupe(struct data_vio *data_vio)
1859 {
1860 /* We don't care what thread we are on. */
1861 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
1862
1863 /*
1864 * Before we can dedupe, we need to know the record name, so the first
1865 * step is to hash the block data.
1866 */
1867 data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
1868 launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
1869 }
1870
1871 /**
1872 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1873 * when a data_vio's write to the underlying storage has completed.
1874 */
write_bio_finished(struct bio * bio)1875 static void write_bio_finished(struct bio *bio)
1876 {
1877 struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
1878
1879 vdo_count_completed_bios(bio);
1880 vdo_set_completion_result(&data_vio->vio.completion,
1881 blk_status_to_errno(bio->bi_status));
1882 data_vio->downgrade_allocation_lock = true;
1883 update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock);
1884 }
1885
1886 /** write_data_vio() - Write a data block to storage without compression. */
write_data_vio(struct data_vio * data_vio)1887 void write_data_vio(struct data_vio *data_vio)
1888 {
1889 struct data_vio_compression_status status, new_status;
1890 int result;
1891
1892 if (!data_vio_has_allocation(data_vio)) {
1893 /*
1894 * There was no space to write this block and we failed to deduplicate or compress
1895 * it.
1896 */
1897 continue_data_vio_with_error(data_vio, VDO_NO_SPACE);
1898 return;
1899 }
1900
1901 new_status = (struct data_vio_compression_status) {
1902 .stage = DATA_VIO_POST_PACKER,
1903 .may_not_compress = true,
1904 };
1905
1906 do {
1907 status = get_data_vio_compression_status(data_vio);
1908 } while ((status.stage != DATA_VIO_POST_PACKER) &&
1909 !set_data_vio_compression_status(data_vio, status, new_status));
1910
1911 /* Write the data from the data block buffer. */
1912 result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
1913 write_bio_finished, REQ_OP_WRITE,
1914 data_vio->allocation.pbn);
1915 if (result != VDO_SUCCESS) {
1916 continue_data_vio_with_error(data_vio, result);
1917 return;
1918 }
1919
1920 data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO;
1921 vdo_submit_data_vio(data_vio);
1922 }
1923
1924 /**
1925 * acknowledge_write_callback() - Acknowledge a write to the requestor.
1926 *
1927 * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
1928 */
acknowledge_write_callback(struct vdo_completion * completion)1929 static void acknowledge_write_callback(struct vdo_completion *completion)
1930 {
1931 struct data_vio *data_vio = as_data_vio(completion);
1932 struct vdo *vdo = completion->vdo;
1933
1934 VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
1935 (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)),
1936 "%s() called on bio ack queue", __func__);
1937 VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio),
1938 "write VIO to be acknowledged has a flush generation lock");
1939 acknowledge_data_vio(data_vio);
1940 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1941 /* This is a zero write or discard */
1942 update_metadata_for_data_vio_write(data_vio, NULL);
1943 return;
1944 }
1945
1946 prepare_for_dedupe(data_vio);
1947 }
1948
1949 /**
1950 * allocate_block() - Attempt to allocate a block in the current allocation zone.
1951 *
1952 * This callback is registered in continue_write_with_block_map_slot().
1953 */
allocate_block(struct vdo_completion * completion)1954 static void allocate_block(struct vdo_completion *completion)
1955 {
1956 struct data_vio *data_vio = as_data_vio(completion);
1957
1958 assert_data_vio_in_allocated_zone(data_vio);
1959
1960 if (!vdo_allocate_block_in_zone(data_vio))
1961 return;
1962
1963 completion->error_handler = handle_data_vio_error;
1964 WRITE_ONCE(data_vio->allocation_succeeded, true);
1965 data_vio->new_mapped = (struct zoned_pbn) {
1966 .zone = data_vio->allocation.zone,
1967 .pbn = data_vio->allocation.pbn,
1968 .state = VDO_MAPPING_STATE_UNCOMPRESSED,
1969 };
1970
1971 if (data_vio->fua ||
1972 data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
1973 prepare_for_dedupe(data_vio);
1974 return;
1975 }
1976
1977 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
1978 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
1979 }
1980
1981 /**
1982 * handle_allocation_error() - Handle an error attempting to allocate a block.
1983 *
1984 * This error handler is registered in continue_write_with_block_map_slot().
1985 */
handle_allocation_error(struct vdo_completion * completion)1986 static void handle_allocation_error(struct vdo_completion *completion)
1987 {
1988 struct data_vio *data_vio = as_data_vio(completion);
1989
1990 if (completion->result == VDO_NO_SPACE) {
1991 /* We failed to get an allocation, but we can try to dedupe. */
1992 vdo_reset_completion(completion);
1993 completion->error_handler = handle_data_vio_error;
1994 prepare_for_dedupe(data_vio);
1995 return;
1996 }
1997
1998 /* We got a "real" error, not just a failure to allocate, so fail the request. */
1999 handle_data_vio_error(completion);
2000 }
2001
assert_is_discard(struct data_vio * data_vio)2002 static int assert_is_discard(struct data_vio *data_vio)
2003 {
2004 int result = VDO_ASSERT(data_vio->is_discard,
2005 "data_vio with no block map page is a discard");
2006
2007 return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
2008 }
2009
2010 /**
2011 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
2012 *
2013 * This callback is registered in launch_read_data_vio().
2014 */
continue_data_vio_with_block_map_slot(struct vdo_completion * completion)2015 void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
2016 {
2017 struct data_vio *data_vio = as_data_vio(completion);
2018
2019 assert_data_vio_in_logical_zone(data_vio);
2020 if (data_vio->read) {
2021 set_data_vio_logical_callback(data_vio, read_block);
2022 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ;
2023 vdo_get_mapped_block(data_vio);
2024 return;
2025 }
2026
2027 vdo_acquire_flush_generation_lock(data_vio);
2028
2029 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
2030 /*
2031 * This is a discard for a block on a block map page which has not been allocated, so
2032 * there's nothing more we need to do.
2033 */
2034 completion->callback = complete_data_vio;
2035 continue_data_vio_with_error(data_vio, assert_is_discard(data_vio));
2036 return;
2037 }
2038
2039 /*
2040 * We need an allocation if this is neither a full-block discard nor a
2041 * full-block zero write.
2042 */
2043 if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) {
2044 data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block,
2045 handle_allocation_error);
2046 return;
2047 }
2048
2049 /*
2050 * We don't need to write any data, so skip allocation and just update the block map and
2051 * reference counts (via the journal).
2052 */
2053 data_vio->new_mapped.pbn = VDO_ZERO_BLOCK;
2054 if (data_vio->is_zero)
2055 data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED;
2056
2057 if (data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
2058 /* This is not the final block of a discard so we can't acknowledge it yet. */
2059 update_metadata_for_data_vio_write(data_vio, NULL);
2060 return;
2061 }
2062
2063 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
2064 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
2065 }
2066