Lines Matching +full:lock +full:- +full:- +full:- +full:-

1 // SPDX-License-Identifier: GPL-2.0-only
14 * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
19 * to that zone. The concurrency guarantees of this single-threaded model allow the code to omit
20 * more fine-grained locking for the hash_lock structures.
22 * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
25 * containing the lock. An asynchronous operation is almost always performed upon entering a state,
28 * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
29 * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
30 * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
31 * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
32 * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
33 * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
34 * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
35 * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
37 * The existence of lock waiters is a key factor controlling which state the lock transitions to
38 * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
41 * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
42 * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
43 * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
48 * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
49 * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
51 * The shortest sequence of states is for non-concurrent writes of new data:
52 * INITIALIZING -> QUERYING -> WRITING -> BYPASSING
53 * This sequence is short because no PBN read lock or index update is needed.
55 * Non-concurrent, finding valid advice looks like this (endpoints elided):
56 * -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
58 * -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
61 * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
62 * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
64 * lock will have the right to update the index (unless it also forks).
66 * Since rollover happens in a lock instance, once a valid data location has been selected, it will
67 * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
68 * non-endpoint states can be re-entered.
73 * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
75 * code actually obtaining the lock. It does any bookkeeping or decision-making required and
78 * ----------------------------------------------------------------------
129 #include "memory-alloc.h"
132 #include "string-utils.h"
136 #include "action-manager.h"
137 #include "admin-state.h"
140 #include "data-vio.h"
141 #include "int-map.h"
142 #include "io-submitter.h"
144 #include "physical-zone.h"
145 #include "slab-depot.h"
149 #include "wait-queue.h"
181 /* version byte + state byte + 64-bit little-endian PBN */
188 /* This is the sequence of states typically used on the non-dedupe path. */
219 /* The block hash covered by this lock */
222 /* When the lock is unused, this list entry allows the lock to be pooled */
226 * A list containing the data VIOs sharing this lock, all having the same record name and
231 /* The number of data_vios sharing this lock instance */
234 /* The maximum value of reference_count in the lifetime of this lock */
237 /* The current state of this lock */
246 /* True if the lock has already accounted for an initial verification */
249 /* True if this lock is registered in the lock map (cleared on rollover) */
258 /* The PBN lock on the block containing the duplicate data */
261 /* The data_vio designated to act on behalf of the lock */
266 * to get the information they all need to deduplicate--either against each other, or
283 spinlock_t lock; member
285 /* The fields in the next block are all protected by the lock */
323 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id), in assert_in_hash_zone()
329 return (atomic_cmpxchg(&context->state, old, new) == old); in change_context_state()
334 return (atomic_cmpxchg(&zone->timer_state, old, new) == old); in change_timer_state()
338 * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
339 * @zone: The zone from which the lock was borrowed.
340 * @lock: The lock that is no longer in use.
342 static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock) in return_hash_lock_to_pool() argument
344 memset(lock, 0, sizeof(*lock)); in return_hash_lock_to_pool()
345 INIT_LIST_HEAD(&lock->pool_node); in return_hash_lock_to_pool()
346 INIT_LIST_HEAD(&lock->duplicate_ring); in return_hash_lock_to_pool()
347 vdo_waitq_init(&lock->waiters); in return_hash_lock_to_pool()
348 list_add_tail(&lock->pool_node, &zone->lock_pool); in return_hash_lock_to_pool()
352 * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
356 * Return: The PBN lock on the data_vio's duplicate location.
360 if (data_vio->hash_lock == NULL) in vdo_get_duplicate_lock()
363 return data_vio->hash_lock->duplicate_lock; in vdo_get_duplicate_lock()
367 * hash_lock_key() - Return hash_lock's record name as a hash code.
368 * @lock: The hash lock.
372 static inline u64 hash_lock_key(struct hash_lock *lock) in hash_lock_key() argument
374 return get_unaligned_le64(&lock->hash.name); in hash_lock_key()
378 * get_hash_lock_state_name() - Get the string representation of a hash lock state.
379 * @state: The hash lock state.
391 * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
393 * @data_vio: The data_vio expected to be the lock agent.
400 VDO_ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent, in assert_hash_lock_agent()
401 "%s must be for the hash lock agent", where); in assert_hash_lock_agent()
405 * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
406 * physical zone of the PBN lock.
407 * @hash_lock: The hash lock to update.
408 * @pbn_lock: The PBN read lock to use as the duplicate lock.
412 VDO_ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL), in set_duplicate_lock()
413 "hash lock must not already hold a duplicate lock"); in set_duplicate_lock()
414 pbn_lock->holder_count += 1; in set_duplicate_lock()
415 hash_lock->duplicate_lock = pbn_lock; in set_duplicate_lock()
419 * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
420 * @lock: The lock containing the wait queue.
424 static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock) in dequeue_lock_waiter() argument
426 return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters)); in dequeue_lock_waiter()
430 * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
432 * @new_lock: The hash lock the data_vio is joining.
434 * Updates the hash lock (or locks) to reflect the change in membership.
438 struct hash_lock *old_lock = data_vio->hash_lock; in set_hash_lock()
441 VDO_ASSERT_LOG_ONLY(data_vio->hash_zone != NULL, in set_hash_lock()
442 "must have a hash zone when holding a hash lock"); in set_hash_lock()
443 VDO_ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry), in set_hash_lock()
444 "must be on a hash lock ring when holding a hash lock"); in set_hash_lock()
445 VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 0, in set_hash_lock()
446 "hash lock reference must be counted"); in set_hash_lock()
448 if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) && in set_hash_lock()
449 (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) { in set_hash_lock()
451 * If the reference count goes to zero in a non-terminal state, we're most in set_hash_lock()
452 * likely leaking this lock. in set_hash_lock()
454 VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 1, in set_hash_lock()
456 get_hash_lock_state_name(old_lock->state)); in set_hash_lock()
459 list_del_init(&data_vio->hash_lock_entry); in set_hash_lock()
460 old_lock->reference_count -= 1; in set_hash_lock()
462 data_vio->hash_lock = NULL; in set_hash_lock()
467 * Keep all data_vios sharing the lock on a ring since they can complete in any in set_hash_lock()
470 list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring); in set_hash_lock()
471 new_lock->reference_count += 1; in set_hash_lock()
472 if (new_lock->max_references < new_lock->reference_count) in set_hash_lock()
473 new_lock->max_references = new_lock->reference_count; in set_hash_lock()
475 data_vio->hash_lock = new_lock; in set_hash_lock()
480 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
482 static void start_locking(struct hash_lock *lock, struct data_vio *agent);
483 static void start_writing(struct hash_lock *lock, struct data_vio *agent);
488 * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
489 * longer needed to be an agent for the hash lock.
494 /* Release the hash lock now, saving a thread transition in cleanup. */ in exit_hash_lock()
497 /* Complete the data_vio and start the clean-up path to release any locks it still holds. */ in exit_hash_lock()
498 data_vio->vio.completion.callback = complete_data_vio; in exit_hash_lock()
504 * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
512 data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK); in set_duplicate_location()
513 data_vio->duplicate = source; in set_duplicate_location()
517 * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
518 * make the retired agent exit the hash lock.
519 * @lock: The hash lock to update.
521 * Return: The new lock agent (which will be NULL if there was no waiter)
523 static struct data_vio *retire_lock_agent(struct hash_lock *lock) in retire_lock_agent() argument
525 struct data_vio *old_agent = lock->agent; in retire_lock_agent()
526 struct data_vio *new_agent = dequeue_lock_waiter(lock); in retire_lock_agent()
528 lock->agent = new_agent; in retire_lock_agent()
531 set_duplicate_location(new_agent, lock->duplicate); in retire_lock_agent()
536 * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
537 * @lock: The hash lock on which to wait.
540 static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio) in wait_on_hash_lock() argument
542 vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter); in wait_on_hash_lock()
548 if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent)) in wait_on_hash_lock()
552 * Even though we're waiting, we also have to send ourselves as a one-way message to the in wait_on_hash_lock()
558 data_vio->compression.lock_holder = lock->agent; in wait_on_hash_lock()
563 * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
574 * start_bypassing() - Stop using the hash lock.
575 * @lock: The hash lock.
576 * @agent: The data_vio acting as the agent for the lock.
578 * Stops using the hash lock. This is the final transition for hash locks which did not get an
581 static void start_bypassing(struct hash_lock *lock, struct data_vio *agent) in start_bypassing() argument
583 lock->state = VDO_HASH_LOCK_BYPASSING; in start_bypassing()
589 struct hash_lock *lock = data_vio->hash_lock; in vdo_clean_failed_hash_lock() local
591 if (lock->state == VDO_HASH_LOCK_BYPASSING) { in vdo_clean_failed_hash_lock()
596 if (lock->agent == NULL) { in vdo_clean_failed_hash_lock()
597 lock->agent = data_vio; in vdo_clean_failed_hash_lock()
598 } else if (data_vio != lock->agent) { in vdo_clean_failed_hash_lock()
603 lock->state = VDO_HASH_LOCK_BYPASSING; in vdo_clean_failed_hash_lock()
606 lock->update_advice = false; in vdo_clean_failed_hash_lock()
608 vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL); in vdo_clean_failed_hash_lock()
610 if (lock->duplicate_lock != NULL) { in vdo_clean_failed_hash_lock()
612 data_vio->duplicate = lock->duplicate; in vdo_clean_failed_hash_lock()
617 lock->agent = NULL; in vdo_clean_failed_hash_lock()
618 data_vio->is_duplicate = false; in vdo_clean_failed_hash_lock()
623 * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
625 * @completion: The completion of the data_vio acting as the lock's agent.
632 struct hash_lock *lock = agent->hash_lock; in finish_unlocking() local
636 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL, in finish_unlocking()
637 "must have released the duplicate lock for the hash lock"); in finish_unlocking()
639 if (!lock->verified) { in finish_unlocking()
641 * UNLOCKING -> WRITING transition: The lock we released was on an unverified in finish_unlocking()
642 * block, so it must have been a lock on advice we were verifying, not on a in finish_unlocking()
646 start_writing(lock, agent); in finish_unlocking()
651 * With the lock released, the verified duplicate block may already have changed and will in finish_unlocking()
652 * need to be re-verified if a waiter arrived. in finish_unlocking()
654 lock->verified = false; in finish_unlocking()
656 if (vdo_waitq_has_waiters(&lock->waiters)) { in finish_unlocking()
658 * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the in finish_unlocking()
659 * agent was releasing the PBN lock. The current agent exits and the waiter has to in finish_unlocking()
660 * re-lock and re-verify the duplicate location. in finish_unlocking()
662 * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need in finish_unlocking()
663 * to re-verify. in finish_unlocking()
665 agent = retire_lock_agent(lock); in finish_unlocking()
666 start_locking(lock, agent); in finish_unlocking()
671 * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other in finish_unlocking()
672 * data_vios reference it, so remove it from the lock map and return it to the pool. in finish_unlocking()
674 start_bypassing(lock, agent); in finish_unlocking()
678 * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
680 * @completion: The completion of the data_vio acting as the lock's agent.
688 struct hash_lock *lock = agent->hash_lock; in unlock_duplicate_pbn() local
691 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL, in unlock_duplicate_pbn()
692 "must have a duplicate lock to release"); in unlock_duplicate_pbn()
694 vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn, in unlock_duplicate_pbn()
695 vdo_forget(lock->duplicate_lock)); in unlock_duplicate_pbn()
696 if (lock->state == VDO_HASH_LOCK_BYPASSING) { in unlock_duplicate_pbn()
705 * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
707 * @lock: The hash lock.
708 * @agent: The data_vio currently acting as the agent for the lock.
710 static void start_unlocking(struct hash_lock *lock, struct data_vio *agent) in start_unlocking() argument
712 lock->state = VDO_HASH_LOCK_UNLOCKING; in start_unlocking()
718 struct hash_zone *zone = context->zone; in release_context()
720 WRITE_ONCE(zone->active, zone->active - 1); in release_context()
721 list_move(&context->list_entry, &zone->available); in release_context()
726 struct dedupe_context *context = agent->dedupe_context; in process_update_result()
732 agent->dedupe_context = NULL; in process_update_result()
737 * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
745 struct hash_lock *lock = agent->hash_lock; in finish_updating() local
755 lock->update_advice = false; in finish_updating()
757 if (vdo_waitq_has_waiters(&lock->waiters)) { in finish_updating()
759 * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update. in finish_updating()
760 * Send it on the verified dedupe path. The agent is done with the lock, but the in finish_updating()
761 * lock may still need to use it to clean up after rollover. in finish_updating()
763 start_deduping(lock, agent, true); in finish_updating()
767 if (lock->duplicate_lock != NULL) { in finish_updating()
769 * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a in finish_updating()
770 * duplicate PBN lock, so go release it. in finish_updating()
772 start_unlocking(lock, agent); in finish_updating()
777 * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to in finish_updating()
780 start_bypassing(lock, agent); in finish_updating()
786 * start_updating() - Continue deduplication with the last step, updating UDS with the location of
788 * @lock: The hash lock.
789 * @agent: The data_vio currently acting as the agent for the lock.
791 static void start_updating(struct hash_lock *lock, struct data_vio *agent) in start_updating() argument
793 lock->state = VDO_HASH_LOCK_UPDATING; in start_updating()
795 VDO_ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified"); in start_updating()
796 VDO_ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed"); in start_updating()
798 agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX; in start_updating()
804 * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
805 * by the hash lock.
806 * @lock: The hash lock.
807 * @data_vio: The lock holder that has finished deduplicating.
809 * If there are other data_vios still sharing the lock, this will just release the data_vio's share
810 * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
811 * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
814 static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio) in finish_deduping() argument
818 VDO_ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING"); in finish_deduping()
819 VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters), in finish_deduping()
820 "shouldn't have any lock waiters in DEDUPING"); in finish_deduping()
822 /* Just release the lock reference if other data_vios are still deduping. */ in finish_deduping()
823 if (lock->reference_count > 1) { in finish_deduping()
828 /* The hash lock must have an agent for all other lock states. */ in finish_deduping()
829 lock->agent = agent; in finish_deduping()
830 if (lock->update_advice) { in finish_deduping()
832 * DEDUPING -> UPDATING transition: The location of the duplicate block changed in finish_deduping()
835 * was another change in location, but with only this data_vio using the hash lock, in finish_deduping()
838 start_updating(lock, agent); in finish_deduping()
841 * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate in finish_deduping()
842 * location so the hash lock itself can be released (contingent on no new data_vios in finish_deduping()
843 * arriving in the lock before the agent returns). in finish_deduping()
845 start_unlocking(lock, agent); in finish_deduping()
850 * acquire_lock() - Get the lock for a record name.
852 * @hash: The hash to lock.
853 * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
854 * the new lock.
855 * @lock_ptr: A pointer to receive the hash lock.
857 * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
858 * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
868 struct hash_lock *lock, *new_lock; in acquire_lock() local
872 * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses in acquire_lock()
873 * in the common case of no lock contention. in acquire_lock()
875 result = VDO_ASSERT(!list_empty(&zone->lock_pool), in acquire_lock()
876 "never need to wait for a free hash lock"); in acquire_lock()
880 new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node); in acquire_lock()
881 list_del_init(&new_lock->pool_node); in acquire_lock()
884 * Fill in the hash of the new lock so we can map it, since we have to use the hash as the in acquire_lock()
887 new_lock->hash = *hash; in acquire_lock()
889 result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock), in acquire_lock()
890 new_lock, (replace_lock != NULL), (void **) &lock); in acquire_lock()
897 /* On mismatch put the old lock back and return a severe error */ in acquire_lock()
898 VDO_ASSERT_LOG_ONLY(lock == replace_lock, in acquire_lock()
899 "old lock must have been in the lock map"); in acquire_lock()
901 VDO_ASSERT_LOG_ONLY(replace_lock->registered, in acquire_lock()
902 "old lock must have been marked registered"); in acquire_lock()
903 replace_lock->registered = false; in acquire_lock()
906 if (lock == replace_lock) { in acquire_lock()
907 lock = new_lock; in acquire_lock()
908 lock->registered = true; in acquire_lock()
910 /* There's already a lock for the hash, so we don't need the borrowed lock. */ in acquire_lock()
914 *lock_ptr = lock; in acquire_lock()
919 * enter_forked_lock() - Bind the data_vio to a new hash lock.
921 * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
922 * on that lock.
934 * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
935 * @old_lock: The hash lock to fork.
936 * @new_agent: The data_vio that will be the agent for the new lock.
938 * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
939 * of the old lock in the lock map. The old lock remains active, but will not update advice.
946 result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock, in fork_hash_lock()
954 * Only one of the two locks should update UDS. The old lock is out of references, so it in fork_hash_lock()
957 old_lock->update_advice = false; in fork_hash_lock()
958 new_lock->update_advice = true; in fork_hash_lock()
961 new_lock->agent = new_agent; in fork_hash_lock()
963 vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock); in fork_hash_lock()
965 new_agent->is_duplicate = false; in fork_hash_lock()
970 * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
972 * @lock: The hash lock.
973 * @data_vio: The data_vio to deduplicate using the hash lock.
974 * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
976 * If no increments are available, this will roll over to a new hash lock and launch the data_vio
977 * as the writing agent for that lock.
979 static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio, in launch_dedupe() argument
982 if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) { in launch_dedupe()
983 /* Out of increments, so must roll over to a new lock. */ in launch_dedupe()
984 fork_hash_lock(lock, data_vio); in launch_dedupe()
988 /* Deduplicate against the lock's verified location. */ in launch_dedupe()
989 set_duplicate_location(data_vio, lock->duplicate); in launch_dedupe()
990 data_vio->new_mapped = data_vio->duplicate; in launch_dedupe()
991 update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock); in launch_dedupe()
995 * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
997 * @lock: The hash lock.
998 * @agent: The data_vio acting as the agent for the lock.
1002 * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
1004 static void start_deduping(struct hash_lock *lock, struct data_vio *agent, in start_deduping() argument
1007 lock->state = VDO_HASH_LOCK_DEDUPING; in start_deduping()
1010 * We don't take the downgraded allocation lock from the agent unless we actually need to in start_deduping()
1013 if (lock->duplicate_lock == NULL) { in start_deduping()
1014 VDO_ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state), in start_deduping()
1015 "compression must have shared a lock"); in start_deduping()
1021 VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock), in start_deduping()
1022 "duplicate_lock must be a PBN read lock"); in start_deduping()
1025 * This state is not like any of the other states. There is no designated agent--the agent in start_deduping()
1029 lock->agent = NULL; in start_deduping()
1032 * Launch the agent (if not already deduplicated) and as many lock waiters as we have in start_deduping()
1034 * be triggered and the remaining waiters will be transferred to the new lock. in start_deduping()
1037 launch_dedupe(lock, agent, true); in start_deduping()
1040 while (vdo_waitq_has_waiters(&lock->waiters)) in start_deduping()
1041 launch_dedupe(lock, dequeue_lock_waiter(lock), false); in start_deduping()
1045 * In the degenerate case where all the waiters rolled over to a new lock, this in start_deduping()
1046 * will continue to use the old agent to clean up this lock, and otherwise it just in start_deduping()
1047 * lets the agent exit the lock. in start_deduping()
1049 finish_deduping(lock, agent); in start_deduping()
1054 * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
1067 * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
1076 struct hash_lock *lock = agent->hash_lock; in finish_verifying() local
1080 lock->verified = agent->is_duplicate; in finish_verifying()
1084 * not any re-verifications due to PBN lock releases. in finish_verifying()
1086 if (!lock->verify_counted) { in finish_verifying()
1087 lock->verify_counted = true; in finish_verifying()
1088 if (lock->verified) in finish_verifying()
1089 increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid); in finish_verifying()
1091 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale); in finish_verifying()
1098 if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) { in finish_verifying()
1099 agent->is_duplicate = false; in finish_verifying()
1100 lock->verified = false; in finish_verifying()
1103 if (lock->verified) { in finish_verifying()
1105 * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start in finish_verifying()
1108 start_deduping(lock, agent, false); in finish_verifying()
1111 * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to in finish_verifying()
1113 * lock without an agent to release the PBN lock. In both cases, the data will have in finish_verifying()
1117 lock->update_advice = true; in finish_verifying()
1118 start_unlocking(lock, agent); in finish_verifying()
1138 agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block); in verify_callback()
1147 result = uncompress_data_vio(agent, agent->duplicate.state, in uncompress_and_verify()
1148 agent->scratch_block); in uncompress_and_verify()
1154 agent->is_duplicate = false; in uncompress_and_verify()
1160 struct data_vio *agent = vio_as_data_vio(bio->bi_private); in verify_endio()
1161 int result = blk_status_to_errno(bio->bi_status); in verify_endio()
1165 agent->is_duplicate = false; in verify_endio()
1170 if (vdo_is_state_compressed(agent->duplicate.state)) { in verify_endio()
1181 * start_verifying() - Begin the data verification phase.
1182 * @lock: The hash lock (must be LOCKING).
1185 * Continue the deduplication path for a hash lock by using the agent to read (and possibly
1191 static void start_verifying(struct hash_lock *lock, struct data_vio *agent) in start_verifying() argument
1194 struct vio *vio = &agent->vio; in start_verifying()
1195 char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ? in start_verifying()
1196 (char *) agent->compression.block : in start_verifying()
1197 agent->scratch_block); in start_verifying()
1199 lock->state = VDO_HASH_LOCK_VERIFYING; in start_verifying()
1200 VDO_ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once"); in start_verifying()
1202 agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION; in start_verifying()
1204 agent->duplicate.pbn); in start_verifying()
1212 vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY); in start_verifying()
1216 * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
1217 * lock on the candidate duplicate block.
1218 * @completion: The completion of the data_vio that attempted to get the read lock.
1225 struct hash_lock *lock = agent->hash_lock; in finish_locking() local
1229 if (!agent->is_duplicate) { in finish_locking()
1230 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL, in finish_locking()
1233 * LOCKING -> WRITING transition: The advice block is being modified or has no in finish_locking()
1237 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale); in finish_locking()
1238 lock->update_advice = true; in finish_locking()
1239 start_writing(lock, agent); in finish_locking()
1243 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL, in finish_locking()
1246 if (!lock->verified) { in finish_locking()
1248 * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading in finish_locking()
1252 start_verifying(lock, agent); in finish_locking()
1256 if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) { in finish_locking()
1258 * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no in finish_locking()
1259 * available increments left. Must first release the useless PBN read lock before in finish_locking()
1262 agent->is_duplicate = false; in finish_locking()
1263 lock->verified = false; in finish_locking()
1264 lock->update_advice = true; in finish_locking()
1265 start_unlocking(lock, agent); in finish_locking()
1270 * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating in finish_locking()
1273 start_deduping(lock, agent, false); in finish_locking()
1276 static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock, in acquire_provisional_reference() argument
1279 /* Ensure that the newly-locked block is referenced. */ in acquire_provisional_reference()
1280 struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn); in acquire_provisional_reference()
1281 int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock); in acquire_provisional_reference()
1288 agent->is_duplicate = false; in acquire_provisional_reference()
1289 vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, in acquire_provisional_reference()
1290 agent->duplicate.pbn, lock); in acquire_provisional_reference()
1296 * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
1298 * @completion: The completion of the data_vio attempting to acquire the physical block lock on
1299 * behalf of its hash lock.
1301 * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
1308 struct pbn_lock *lock; in lock_duplicate_pbn() local
1312 struct slab_depot *depot = vdo_from_data_vio(agent)->depot; in lock_duplicate_pbn()
1313 struct physical_zone *zone = agent->duplicate.zone; in lock_duplicate_pbn()
1323 increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn); in lock_duplicate_pbn()
1329 agent->is_duplicate = false; in lock_duplicate_pbn()
1334 result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn, in lock_duplicate_pbn()
1335 VIO_READ_LOCK, &lock); in lock_duplicate_pbn()
1341 if (!vdo_is_pbn_read_lock(lock)) { in lock_duplicate_pbn()
1346 * data in the write lock holder. in lock_duplicate_pbn()
1353 * only way we'd be getting lock contention is if we've written the same in lock_duplicate_pbn()
1359 * smashed, and the write smashing it cannot contain our data--it would have to be in lock_duplicate_pbn()
1360 * writing on behalf of our hash lock, but that's impossible since we're the lock in lock_duplicate_pbn()
1363 * 3a) If the lock is held by a data_vio with different data, the advice is already in lock_duplicate_pbn()
1366 * 3b) If the lock is held by a data_vio that matches us, we may as well either in lock_duplicate_pbn()
1368 * potentially having many duplicates wait for the lock holder to write, journal, in lock_duplicate_pbn()
1369 * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS in lock_duplicate_pbn()
1377 agent->is_duplicate = false; in lock_duplicate_pbn()
1382 if (lock->holder_count == 0) { in lock_duplicate_pbn()
1383 if (!acquire_provisional_reference(agent, lock, depot)) in lock_duplicate_pbn()
1387 * The increment limit we grabbed earlier is still valid. The lock now holds the in lock_duplicate_pbn()
1389 * locks sharing this read lock. in lock_duplicate_pbn()
1391 lock->increment_limit = increment_limit; in lock_duplicate_pbn()
1395 * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such. in lock_duplicate_pbn()
1397 set_duplicate_lock(agent->hash_lock, lock); in lock_duplicate_pbn()
1407 * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
1409 * @lock: The hash lock (currently must be QUERYING).
1412 static void start_locking(struct hash_lock *lock, struct data_vio *agent) in start_locking() argument
1414 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL, in start_locking()
1415 "must not acquire a duplicate lock when already holding it"); in start_locking()
1417 lock->state = VDO_HASH_LOCK_LOCKING; in start_locking()
1421 * accepting the advice, and don't explicitly change lock states (or use an agent-local in start_locking()
1424 agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN; in start_locking()
1429 * finish_writing() - Re-entry point for the lock agent after it has finished writing or
1431 * @lock: The hash lock, which must be in state WRITING.
1432 * @agent: The data_vio that wrote its data for the lock.
1434 * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
1437 * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
1438 * the lock to deduplicate against the just-written block. If there are no other lock holders, the
1439 * agent either exits (and later tears down the hash lock), or it remains the agent and updates
1442 static void finish_writing(struct hash_lock *lock, struct data_vio *agent) in finish_writing() argument
1448 lock->duplicate = agent->new_mapped; in finish_writing()
1449 lock->verified = true; in finish_writing()
1451 if (vdo_is_state_compressed(lock->duplicate.state) && lock->registered) { in finish_writing()
1456 lock->update_advice = true; in finish_writing()
1460 if (vdo_waitq_has_waiters(&lock->waiters)) { in finish_writing()
1462 * WRITING -> DEDUPING transition: an asynchronously-written block failed to in finish_writing()
1463 * compress, so the PBN lock on the written copy was already transferred. The agent in finish_writing()
1464 * is done with the lock, but the lock may still need to use it to clean up after in finish_writing()
1467 start_deduping(lock, agent, true); in finish_writing()
1473 * being able to release the hash lock (or just release it). in finish_writing()
1475 if (lock->update_advice) { in finish_writing()
1477 * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so in finish_writing()
1481 start_updating(lock, agent); in finish_writing()
1482 } else if (lock->duplicate_lock != NULL) { in finish_writing()
1484 * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the in finish_writing()
1485 * compressed write gave us a shared duplicate lock that we must release. in finish_writing()
1487 set_duplicate_location(agent, lock->duplicate); in finish_writing()
1488 start_unlocking(lock, agent); in finish_writing()
1491 * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no in finish_writing()
1492 * duplicate lock held, so both the agent and lock have no more work to do. The in finish_writing()
1493 * agent will release its allocation lock in cleanup. in finish_writing()
1495 start_bypassing(lock, agent); in finish_writing()
1500 * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
1501 * @lock: The hash lock to modify.
1506 static struct data_vio *select_writing_agent(struct hash_lock *lock) in select_writing_agent() argument
1514 * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to in select_writing_agent()
1517 while (((data_vio = dequeue_lock_waiter(lock)) != NULL) && in select_writing_agent()
1519 /* Use the lower-level enqueue since we're just moving waiters around. */ in select_writing_agent()
1520 vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter); in select_writing_agent()
1526 * arrived at the lock. in select_writing_agent()
1528 vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue); in select_writing_agent()
1532 * first waiter since it was the first to reach the lock. in select_writing_agent()
1534 vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter); in select_writing_agent()
1535 lock->agent = data_vio; in select_writing_agent()
1538 data_vio = lock->agent; in select_writing_agent()
1541 /* Swap all the waiters back onto the lock's queue. */ in select_writing_agent()
1542 vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters); in select_writing_agent()
1547 * start_writing() - Begin the non-duplicate write path.
1548 * @lock: The hash lock (currently must be QUERYING).
1549 * @agent: The data_vio currently acting as the agent for the lock.
1551 * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
1555 static void start_writing(struct hash_lock *lock, struct data_vio *agent) in start_writing() argument
1557 lock->state = VDO_HASH_LOCK_WRITING; in start_writing()
1564 agent = select_writing_agent(lock); in start_writing()
1570 * that on some path there would be non-waiters still referencing the lock, in start_writing()
1583 if (vdo_waitq_has_waiters(&lock->waiters)) in start_writing()
1588 * return to the hash lock via vdo_continue_hash_lock() and call finish_writing(). in start_writing()
1599 const struct uds_request *request = &context->request; in decode_uds_advice()
1600 struct data_vio *data_vio = context->requestor; in decode_uds_advice()
1602 const struct uds_record_data *encoding = &request->old_metadata; in decode_uds_advice()
1604 struct zoned_pbn *advice = &data_vio->duplicate; in decode_uds_advice()
1608 if ((request->status != UDS_SUCCESS) || !request->found) in decode_uds_advice()
1611 version = encoding->data[offset++]; in decode_uds_advice()
1617 advice->state = encoding->data[offset++]; in decode_uds_advice()
1618 advice->pbn = get_unaligned_le64(&encoding->data[offset]); in decode_uds_advice()
1623 if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) { in decode_uds_advice()
1625 (unsigned long long) advice->pbn, advice->state, in decode_uds_advice()
1626 (unsigned long long) data_vio->logical.lbn); in decode_uds_advice()
1627 atomic64_inc(&vdo->stats.invalid_advice_pbn_count); in decode_uds_advice()
1631 result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone); in decode_uds_advice()
1632 if ((result != VDO_SUCCESS) || (advice->zone == NULL)) { in decode_uds_advice()
1634 (unsigned long long) advice->pbn, in decode_uds_advice()
1635 (unsigned long long) data_vio->logical.lbn); in decode_uds_advice()
1636 atomic64_inc(&vdo->stats.invalid_advice_pbn_count); in decode_uds_advice()
1645 struct dedupe_context *context = agent->dedupe_context; in process_query_result()
1651 agent->is_duplicate = decode_uds_advice(context); in process_query_result()
1652 agent->dedupe_context = NULL; in process_query_result()
1658 * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
1666 struct hash_lock *lock = agent->hash_lock; in finish_querying() local
1672 if (agent->is_duplicate) { in finish_querying()
1673 lock->duplicate = agent->duplicate; in finish_querying()
1675 * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the in finish_querying()
1676 * QUERYING agent to start the hash lock on the unverified dedupe path, verifying in finish_querying()
1679 start_locking(lock, agent); in finish_querying()
1685 lock->update_advice = !data_vio_has_allocation(agent); in finish_querying()
1687 * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid, in finish_querying()
1690 start_writing(lock, agent); in finish_querying()
1695 * start_querying() - Start deduplication for a hash lock.
1696 * @lock: The initialized hash lock.
1697 * @data_vio: The data_vio that has just obtained the new lock.
1699 * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
1701 * query on behalf of the lock.
1703 static void start_querying(struct hash_lock *lock, struct data_vio *data_vio) in start_querying() argument
1705 lock->agent = data_vio; in start_querying()
1706 lock->state = VDO_HASH_LOCK_QUERYING; in start_querying()
1707 data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION; in start_querying()
1714 * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
1717 * @lock: The hash lock.
1718 * @data_vio: The data_vio attempting to enter the lock.
1720 static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio) in report_bogus_lock_state() argument
1722 VDO_ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s", in report_bogus_lock_state()
1723 get_hash_lock_state_name(lock->state)); in report_bogus_lock_state()
1728 * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
1730 * @data_vio: The data_vio to continue processing in its hash lock.
1732 * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
1734 * lock, or update the UDS index, or simply release its share of the lock.
1741 struct hash_lock *lock = data_vio->hash_lock; in vdo_continue_hash_lock() local
1743 switch (lock->state) { in vdo_continue_hash_lock()
1745 VDO_ASSERT_LOG_ONLY(data_vio == lock->agent, in vdo_continue_hash_lock()
1746 "only the lock agent may continue the lock"); in vdo_continue_hash_lock()
1747 finish_writing(lock, data_vio); in vdo_continue_hash_lock()
1751 finish_deduping(lock, data_vio); in vdo_continue_hash_lock()
1755 /* This data_vio has finished the write path and the lock doesn't need it. */ in vdo_continue_hash_lock()
1765 /* A lock in this state should never be re-entered. */ in vdo_continue_hash_lock()
1766 report_bogus_lock_state(lock, data_vio); in vdo_continue_hash_lock()
1770 report_bogus_lock_state(lock, data_vio); in vdo_continue_hash_lock()
1775 * is_hash_collision() - Check to see if a hash collision has occurred.
1776 * @lock: The lock to check.
1777 * @candidate: The data_vio seeking to share the lock.
1779 * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
1780 * share the lock, which should only be possible in the extremely unlikely case of a hash
1783 * Return: true if the given data_vio must not share the lock because it doesn't have the same data
1784 * as the lock holders.
1786 static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate) in is_hash_collision() argument
1792 if (list_empty(&lock->duplicate_ring)) in is_hash_collision()
1795 lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio, in is_hash_collision()
1797 zone = candidate->hash_zone; in is_hash_collision()
1798 collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data); in is_hash_collision()
1800 increment_stat(&zone->statistics.concurrent_hash_collisions); in is_hash_collision()
1802 increment_stat(&zone->statistics.concurrent_data_matches); in is_hash_collision()
1811 /* FIXME: BUG_ON() and/or enter read-only mode? */ in assert_hash_lock_preconditions()
1812 result = VDO_ASSERT(data_vio->hash_lock == NULL, in assert_hash_lock_preconditions()
1813 "must not already hold a hash lock"); in assert_hash_lock_preconditions()
1817 result = VDO_ASSERT(list_empty(&data_vio->hash_lock_entry), in assert_hash_lock_preconditions()
1818 "must not already be a member of a hash lock ring"); in assert_hash_lock_preconditions()
1822 return VDO_ASSERT(data_vio->recovery_sequence_number == 0, in assert_hash_lock_preconditions()
1823 "must not hold a recovery lock when getting a hash lock"); in assert_hash_lock_preconditions()
1827 * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
1828 * @data_vio: The data_vio acquiring a lock on its record name.
1830 * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
1831 * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
1833 * a lock reference.
1838 struct hash_lock *lock; in vdo_acquire_hash_lock() local
1849 result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock); in vdo_acquire_hash_lock()
1855 if (is_hash_collision(lock, data_vio)) { in vdo_acquire_hash_lock()
1866 set_hash_lock(data_vio, lock); in vdo_acquire_hash_lock()
1867 switch (lock->state) { in vdo_acquire_hash_lock()
1869 start_querying(lock, data_vio); in vdo_acquire_hash_lock()
1878 /* The lock is busy, and can't be shared yet. */ in vdo_acquire_hash_lock()
1879 wait_on_hash_lock(lock, data_vio); in vdo_acquire_hash_lock()
1883 /* We can't use this lock, so bypass optimization entirely. */ in vdo_acquire_hash_lock()
1889 launch_dedupe(lock, data_vio, false); in vdo_acquire_hash_lock()
1893 /* A lock in this state should not be acquired by new VIOs. */ in vdo_acquire_hash_lock()
1894 report_bogus_lock_state(lock, data_vio); in vdo_acquire_hash_lock()
1899 * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
1901 * @data_vio: The data_vio releasing its hash lock.
1903 * If the data_vio is the only one holding the lock, this also releases any resources or locks used
1904 * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
1905 * returns the lock to the hash zone's lock pool.
1912 struct hash_lock *lock = data_vio->hash_lock; in vdo_release_hash_lock() local
1913 struct hash_zone *zone = data_vio->hash_zone; in vdo_release_hash_lock()
1915 if (lock == NULL) in vdo_release_hash_lock()
1920 if (lock->reference_count > 0) { in vdo_release_hash_lock()
1921 /* The lock is still in use by other data_vios. */ in vdo_release_hash_lock()
1925 lock_key = hash_lock_key(lock); in vdo_release_hash_lock()
1926 if (lock->registered) { in vdo_release_hash_lock()
1929 removed = vdo_int_map_remove(zone->hash_lock_map, lock_key); in vdo_release_hash_lock()
1930 VDO_ASSERT_LOG_ONLY(lock == removed, in vdo_release_hash_lock()
1931 "hash lock being released must have been mapped"); in vdo_release_hash_lock()
1933 VDO_ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key), in vdo_release_hash_lock()
1934 "unregistered hash lock must not be in the lock map"); in vdo_release_hash_lock()
1937 VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters), in vdo_release_hash_lock()
1938 "hash lock returned to zone must have no waiters"); in vdo_release_hash_lock()
1939 VDO_ASSERT_LOG_ONLY((lock->duplicate_lock == NULL), in vdo_release_hash_lock()
1940 "hash lock returned to zone must not reference a PBN lock"); in vdo_release_hash_lock()
1941 VDO_ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING), in vdo_release_hash_lock()
1942 "returned hash lock must not be in use with state %s", in vdo_release_hash_lock()
1943 get_hash_lock_state_name(lock->state)); in vdo_release_hash_lock()
1944 VDO_ASSERT_LOG_ONLY(list_empty(&lock->pool_node), in vdo_release_hash_lock()
1945 "hash lock returned to zone must not be in a pool ring"); in vdo_release_hash_lock()
1946 VDO_ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring), in vdo_release_hash_lock()
1947 "hash lock returned to zone must not reference DataVIOs"); in vdo_release_hash_lock()
1949 return_hash_lock_to_pool(zone, lock); in vdo_release_hash_lock()
1953 * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
1954 * data_vio's hash lock, converting it to a duplicate PBN lock.
1955 * @data_vio: The data_vio holding the allocation lock to transfer.
1959 struct allocation *allocation = &data_vio->allocation; in transfer_allocation_lock()
1960 struct hash_lock *hash_lock = data_vio->hash_lock; in transfer_allocation_lock()
1962 VDO_ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn, in transfer_allocation_lock()
1963 "transferred lock must be for the block written"); in transfer_allocation_lock()
1965 allocation->pbn = VDO_ZERO_BLOCK; in transfer_allocation_lock()
1967 VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock), in transfer_allocation_lock()
1968 "must have downgraded the allocation lock before transfer"); in transfer_allocation_lock()
1970 hash_lock->duplicate = data_vio->new_mapped; in transfer_allocation_lock()
1971 data_vio->duplicate = data_vio->new_mapped; in transfer_allocation_lock()
1974 * Since the lock is being transferred, the holder count doesn't change (and isn't even in transfer_allocation_lock()
1977 hash_lock->duplicate_lock = vdo_forget(allocation->lock); in transfer_allocation_lock()
1981 * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
1984 * @pbn_lock: The PBN lock on the compressed block.
1986 * If the lock is still a write lock (as it will be for the first share), it will be converted to a
1987 * read lock. This also reserves a reference count increment for the data_vio.
1995 "a duplicate PBN lock should not exist when writing"); in vdo_share_compressed_write_lock()
1996 VDO_ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state), in vdo_share_compressed_write_lock()
1997 "lock transfer must be for a compressed write"); in vdo_share_compressed_write_lock()
2000 /* First sharer downgrades the lock. */ in vdo_share_compressed_write_lock()
2005 * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio in vdo_share_compressed_write_lock()
2008 data_vio->duplicate = data_vio->new_mapped; in vdo_share_compressed_write_lock()
2009 data_vio->hash_lock->duplicate = data_vio->new_mapped; in vdo_share_compressed_write_lock()
2010 set_duplicate_lock(data_vio->hash_lock, pbn_lock); in vdo_share_compressed_write_lock()
2030 vdo_register_allocating_thread(&thread->allocating_thread, NULL); in start_uds_queue()
2039 __must_hold(&zones->lock) in close_index()
2047 zones->index_state = IS_CHANGING; in close_index()
2048 /* Close the index session, while not holding the lock. */ in close_index()
2049 spin_unlock(&zones->lock); in close_index()
2050 result = uds_close_index(zones->index_session); in close_index()
2054 spin_lock(&zones->lock); in close_index()
2055 zones->index_state = IS_CLOSED; in close_index()
2056 zones->error_flag |= result != UDS_SUCCESS; in close_index()
2061 __must_hold(&zones->lock) in open_index()
2065 bool create_flag = zones->create_flag; in open_index()
2067 zones->create_flag = false; in open_index()
2072 zones->index_state = IS_CHANGING; in open_index()
2073 zones->error_flag = false; in open_index()
2075 /* Open the index session, while not holding the lock */ in open_index()
2076 spin_unlock(&zones->lock); in open_index()
2078 &zones->parameters, zones->index_session); in open_index()
2082 spin_lock(&zones->lock); in open_index()
2085 case -ENOENT: in open_index()
2090 zones->index_state = IS_CLOSED; in open_index()
2091 zones->create_flag = true; in open_index()
2098 zones->index_state = IS_OPENED; in open_index()
2100 zones->index_state = IS_CLOSED; in open_index()
2101 zones->index_target = IS_CLOSED; in open_index()
2102 zones->error_flag = true; in open_index()
2103 spin_unlock(&zones->lock); in open_index()
2105 spin_lock(&zones->lock); in open_index()
2117 spin_lock(&zones->lock); in change_dedupe_state()
2120 while (vdo_is_state_normal(&zones->state) && in change_dedupe_state()
2121 ((zones->index_state != zones->index_target) || zones->create_flag)) { in change_dedupe_state()
2122 if (zones->index_state == IS_OPENED) in change_dedupe_state()
2128 zones->changing = false; in change_dedupe_state()
2129 spin_unlock(&zones->lock); in change_dedupe_state()
2134 u64 start_time = context->submission_jiffies; in start_expiration_timer()
2137 if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE, in start_expiration_timer()
2143 mod_timer(&context->zone->timer, end_time); in start_expiration_timer()
2147 * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
2154 atomic64_add(timeouts, &zones->timeouts); in report_dedupe_timeouts()
2155 spin_lock(&zones->lock); in report_dedupe_timeouts()
2156 if (__ratelimit(&zones->ratelimiter)) { in report_dedupe_timeouts()
2157 u64 unreported = atomic64_read(&zones->timeouts); in report_dedupe_timeouts()
2159 unreported -= zones->reported_timeouts; in report_dedupe_timeouts()
2162 zones->reported_timeouts += unreported; in report_dedupe_timeouts()
2164 spin_unlock(&zones->lock); in report_dedupe_timeouts()
2171 struct volume_geometry geometry = vdo->geometry; in initialize_index()
2186 ratelimit_default_init(&zones->ratelimiter); in initialize_index()
2187 ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE); in initialize_index()
2188 uds_offset = ((vdo_get_index_region_start(geometry) - in initialize_index()
2190 zones->parameters = (struct uds_parameters) { in initialize_index()
2191 .bdev = vdo->device_config->owned_device->bdev, in initialize_index()
2199 result = uds_create_index_session(&zones->index_session); in initialize_index()
2203 result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type, in initialize_index()
2206 uds_destroy_index_session(vdo_forget(zones->index_session)); in initialize_index()
2211 vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION); in initialize_index()
2212 vdo_set_completion_callback(&zones->completion, change_dedupe_state, in initialize_index()
2213 vdo->thread_config.dedupe_thread); in initialize_index()
2218 * finish_index_operation() - This is the UDS callback for index queries.
2232 continue_data_vio(context->requestor); in finish_index_operation()
2243 atomic_read(&context->state)); in finish_index_operation()
2246 vdo_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry); in finish_index_operation()
2250 * check_for_drain_complete() - Check whether this zone has drained.
2257 if (!vdo_is_state_draining(&zone->state)) in check_for_drain_complete()
2260 if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) || in check_for_drain_complete()
2263 del_timer_sync(&zone->timer); in check_for_drain_complete()
2266 * There is an in flight time-out, which must get processed before we can continue. in check_for_drain_complete()
2275 entry = vdo_funnel_queue_poll(zone->timed_out_complete); in check_for_drain_complete()
2280 atomic_set(&context->state, DEDUPE_CONTEXT_IDLE); in check_for_drain_complete()
2281 list_add(&context->list_entry, &zone->available); in check_for_drain_complete()
2286 WRITE_ONCE(zone->active, zone->active - recycled); in check_for_drain_complete()
2287 VDO_ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive"); in check_for_drain_complete()
2288 vdo_finish_draining(&zone->state); in check_for_drain_complete()
2296 unsigned long cutoff = jiffies - timeout_jiffies; in timeout_index_operations_callback()
2299 atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE); in timeout_index_operations_callback()
2300 list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) { in timeout_index_operations_callback()
2301 if (cutoff <= context->submission_jiffies) { in timeout_index_operations_callback()
2325 list_del_init(&context->list_entry); in timeout_index_operations_callback()
2326 context->requestor->dedupe_context = NULL; in timeout_index_operations_callback()
2327 continue_data_vio(context->requestor); in timeout_index_operations_callback()
2332 report_dedupe_timeouts(completion->vdo->hash_zones, timed_out); in timeout_index_operations_callback()
2343 vdo_launch_completion(&zone->completion); in timeout_index_operations()
2351 struct hash_zone *zone = &zones->zones[zone_number]; in initialize_zone()
2353 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map); in initialize_zone()
2357 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION); in initialize_zone()
2358 zone->zone_number = zone_number; in initialize_zone()
2359 zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number]; in initialize_zone()
2360 vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION); in initialize_zone()
2361 vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback, in initialize_zone()
2362 zone->thread_id); in initialize_zone()
2363 INIT_LIST_HEAD(&zone->lock_pool); in initialize_zone()
2365 &zone->lock_array); in initialize_zone()
2370 return_hash_lock_to_pool(zone, &zone->lock_array[i]); in initialize_zone()
2372 INIT_LIST_HEAD(&zone->available); in initialize_zone()
2373 INIT_LIST_HEAD(&zone->pending); in initialize_zone()
2374 result = vdo_make_funnel_queue(&zone->timed_out_complete); in initialize_zone()
2378 timer_setup(&zone->timer, timeout_index_operations, 0); in initialize_zone()
2381 struct dedupe_context *context = &zone->contexts[i]; in initialize_zone()
2383 context->zone = zone; in initialize_zone()
2384 context->request.callback = finish_index_operation; in initialize_zone()
2385 context->request.session = zones->index_session; in initialize_zone()
2386 list_add(&context->list_entry, &zone->available); in initialize_zone()
2389 return vdo_make_default_thread(vdo, zone->thread_id); in initialize_zone()
2392 /** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
2397 return zones->zones[zone_number].thread_id; in get_thread_id_for_zone()
2401 * vdo_make_hash_zones() - Create the hash zones.
2413 zone_count_t zone_count = vdo->thread_config.hash_zone_count; in vdo_make_hash_zones()
2429 vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW); in vdo_make_hash_zones()
2431 zones->zone_count = zone_count; in vdo_make_hash_zones()
2440 result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone, in vdo_make_hash_zones()
2441 vdo->thread_config.admin_thread, zones, NULL, in vdo_make_hash_zones()
2442 vdo, &zones->manager); in vdo_make_hash_zones()
2457 uds_destroy_index_session(vdo_forget(zones->index_session)); in vdo_finish_dedupe_index()
2461 * vdo_free_hash_zones() - Free the hash zones.
2471 vdo_free(vdo_forget(zones->manager)); in vdo_free_hash_zones()
2473 for (i = 0; i < zones->zone_count; i++) { in vdo_free_hash_zones()
2474 struct hash_zone *zone = &zones->zones[i]; in vdo_free_hash_zones()
2476 vdo_free_funnel_queue(vdo_forget(zone->timed_out_complete)); in vdo_free_hash_zones()
2477 vdo_int_map_free(vdo_forget(zone->hash_lock_map)); in vdo_free_hash_zones()
2478 vdo_free(vdo_forget(zone->lock_array)); in vdo_free_hash_zones()
2481 if (zones->index_session != NULL) in vdo_free_hash_zones()
2484 ratelimit_state_exit(&zones->ratelimiter); in vdo_free_hash_zones()
2493 spin_lock(&zones->lock); in initiate_suspend_index()
2494 index_state = zones->index_state; in initiate_suspend_index()
2495 spin_unlock(&zones->lock); in initiate_suspend_index()
2498 bool save = vdo_is_state_saving(&zones->state); in initiate_suspend_index()
2501 result = uds_suspend_index_session(zones->index_session, save); in initiate_suspend_index()
2510 * suspend_index() - Suspend the UDS index prior to draining hash zones.
2518 vdo_start_draining(&zones->state, in suspend_index()
2519 vdo_get_current_manager_operation(zones->manager), completion, in suspend_index()
2524 * initiate_drain() - Initiate a drain.
2534 * drain_hash_zone() - Drain a hash zone.
2543 vdo_start_draining(&zones->zones[zone_number].state, in drain_hash_zone()
2544 vdo_get_current_manager_operation(zones->manager), parent, in drain_hash_zone()
2548 /** vdo_drain_hash_zones() - Drain all hash zones. */
2551 vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index, in vdo_drain_hash_zones()
2556 __must_hold(&zones->lock) in launch_dedupe_state_change()
2558 /* ASSERTION: We enter with the lock held. */ in launch_dedupe_state_change()
2559 if (zones->changing || !vdo_is_state_normal(&zones->state)) in launch_dedupe_state_change()
2563 if (zones->create_flag || (zones->index_state != zones->index_target)) { in launch_dedupe_state_change()
2564 zones->changing = true; in launch_dedupe_state_change()
2565 vdo_launch_completion(&zones->completion); in launch_dedupe_state_change()
2569 /* ASSERTION: We exit with the lock held. */ in launch_dedupe_state_change()
2573 * resume_index() - Resume the UDS index prior to resuming hash zones.
2580 struct device_config *config = parent->vdo->device_config; in resume_index()
2583 zones->parameters.bdev = config->owned_device->bdev; in resume_index()
2584 result = uds_resume_index_session(zones->index_session, zones->parameters.bdev); in resume_index()
2588 spin_lock(&zones->lock); in resume_index()
2589 vdo_resume_if_quiescent(&zones->state); in resume_index()
2591 if (config->deduplication) { in resume_index()
2592 zones->index_target = IS_OPENED; in resume_index()
2593 WRITE_ONCE(zones->dedupe_flag, true); in resume_index()
2595 zones->index_target = IS_CLOSED; in resume_index()
2599 spin_unlock(&zones->lock); in resume_index()
2605 * resume_hash_zone() - Resume a hash zone.
2612 struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]); in resume_hash_zone()
2614 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state)); in resume_hash_zone()
2618 * vdo_resume_hash_zones() - Resume a set of hash zones.
2624 if (vdo_is_read_only(parent->vdo)) { in vdo_resume_hash_zones()
2629 vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index, in vdo_resume_hash_zones()
2634 * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
2641 const struct hash_lock_statistics *stats = &zone->statistics; in get_hash_zone_statistics()
2643 tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid); in get_hash_zone_statistics()
2644 tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale); in get_hash_zone_statistics()
2645 tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches); in get_hash_zone_statistics()
2646 tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions); in get_hash_zone_statistics()
2647 tally->curr_dedupe_queries += READ_ONCE(zone->active); in get_hash_zone_statistics()
2657 spin_lock(&zones->lock); in get_index_statistics()
2658 state = zones->index_state; in get_index_statistics()
2659 spin_unlock(&zones->lock); in get_index_statistics()
2664 result = uds_get_index_session_stats(zones->index_session, &index_stats); in get_index_statistics()
2670 stats->entries_indexed = index_stats.entries_indexed; in get_index_statistics()
2671 stats->posts_found = index_stats.posts_found; in get_index_statistics()
2672 stats->posts_not_found = index_stats.posts_not_found; in get_index_statistics()
2673 stats->queries_found = index_stats.queries_found; in get_index_statistics()
2674 stats->queries_not_found = index_stats.queries_not_found; in get_index_statistics()
2675 stats->updates_found = index_stats.updates_found; in get_index_statistics()
2676 stats->updates_not_found = index_stats.updates_not_found; in get_index_statistics()
2677 stats->entries_discarded = index_stats.entries_discarded; in get_index_statistics()
2681 * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
2684 * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
2692 for (zone = 0; zone < zones->zone_count; zone++) in vdo_get_dedupe_statistics()
2693 get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock); in vdo_get_dedupe_statistics()
2695 get_index_statistics(zones, &stats->index); in vdo_get_dedupe_statistics()
2698 * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number in vdo_get_dedupe_statistics()
2701 stats->dedupe_advice_timeouts = in vdo_get_dedupe_statistics()
2702 (atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy)); in vdo_get_dedupe_statistics()
2706 * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
2720 u32 hash = name->name[0]; in vdo_select_hash_zone()
2723 * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and in vdo_select_hash_zone()
2725 * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1]. in vdo_select_hash_zone()
2728 hash = (hash * zones->zone_count) >> 8; in vdo_select_hash_zone()
2729 return &zones->zones[hash]; in vdo_select_hash_zone()
2733 * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
2735 * @lock: The hash lock to dump.
2737 static void dump_hash_lock(const struct hash_lock *lock) in dump_hash_lock() argument
2741 if (!list_empty(&lock->pool_node)) { in dump_hash_lock()
2742 /* This lock is on the free list. */ in dump_hash_lock()
2748 * unambiguous. 'U' indicates a lock not registered in the map. in dump_hash_lock()
2750 state = get_hash_lock_state_name(lock->state); in dump_hash_lock()
2752 lock, state, (lock->registered ? 'D' : 'U'), in dump_hash_lock()
2753 (unsigned long long) lock->duplicate.pbn, in dump_hash_lock()
2754 lock->duplicate.state, lock->reference_count, in dump_hash_lock()
2755 vdo_waitq_num_waiters(&lock->waiters), lock->agent); in dump_hash_lock()
2761 if (!vdo_is_state_normal(&zones->state)) in index_state_to_string()
2766 return zones->error_flag ? ERROR : CLOSED; in index_state_to_string()
2768 return zones->index_target == IS_OPENED ? OPENING : CLOSING; in index_state_to_string()
2770 return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE; in index_state_to_string()
2777 * dump_hash_zone() - Dump information about a hash zone to the log for debugging.
2784 if (zone->hash_lock_map == NULL) { in dump_hash_zone()
2785 vdo_log_info("struct hash_zone %u: NULL map", zone->zone_number); in dump_hash_zone()
2790 zone->zone_number, vdo_int_map_size(zone->hash_lock_map)); in dump_hash_zone()
2792 dump_hash_lock(&zone->lock_array[i]); in dump_hash_zone()
2796 * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
2804 spin_lock(&zones->lock); in vdo_dump_hash_zones()
2805 state = index_state_to_string(zones, zones->index_state); in vdo_dump_hash_zones()
2806 target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL); in vdo_dump_hash_zones()
2807 spin_unlock(&zones->lock); in vdo_dump_hash_zones()
2813 for (zone = 0; zone < zones->zone_count; zone++) in vdo_dump_hash_zones()
2814 dump_hash_zone(&zones->zones[zone]); in vdo_dump_hash_zones()
2856 * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
2868 if (!list_empty(&zone->available)) { in acquire_context()
2869 WRITE_ONCE(zone->active, zone->active + 1); in acquire_context()
2870 context = list_first_entry(&zone->available, struct dedupe_context, in acquire_context()
2872 list_del_init(&context->list_entry); in acquire_context()
2876 entry = vdo_funnel_queue_poll(zone->timed_out_complete); in acquire_context()
2884 request->record_name = data_vio->record_name; in prepare_uds_request()
2885 request->type = operation; in prepare_uds_request()
2888 struct uds_record_data *encoding = &request->new_metadata; in prepare_uds_request()
2890 encoding->data[offset++] = UDS_ADVICE_VERSION; in prepare_uds_request()
2891 encoding->data[offset++] = data_vio->new_mapped.state; in prepare_uds_request()
2892 put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]); in prepare_uds_request()
2909 struct hash_zone *zone = data_vio->hash_zone; in query_index()
2913 if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) { in query_index()
2920 atomic64_inc(&vdo->hash_zones->dedupe_context_busy); in query_index()
2925 data_vio->dedupe_context = context; in query_index()
2926 context->requestor = data_vio; in query_index()
2927 context->submission_jiffies = jiffies; in query_index()
2928 prepare_uds_request(&context->request, data_vio, operation); in query_index()
2929 atomic_set(&context->state, DEDUPE_CONTEXT_PENDING); in query_index()
2930 list_add_tail(&context->list_entry, &zone->pending); in query_index()
2932 result = uds_launch_request(&context->request); in query_index()
2934 context->request.status = result; in query_index()
2935 finish_index_operation(&context->request); in query_index()
2944 spin_lock(&zones->lock); in set_target_state()
2945 old_state = index_state_to_string(zones, zones->index_target); in set_target_state()
2947 WRITE_ONCE(zones->dedupe_flag, dedupe); in set_target_state()
2950 zones->create_flag = true; in set_target_state()
2952 zones->index_target = target; in set_target_state()
2954 new_state = index_state_to_string(zones, zones->index_target); in set_target_state()
2955 spin_unlock(&zones->lock); in set_target_state()
2965 spin_lock(&zones->lock); in vdo_get_dedupe_index_state_name()
2966 state = index_state_to_string(zones, zones->index_state); in vdo_get_dedupe_index_state_name()
2967 spin_unlock(&zones->lock); in vdo_get_dedupe_index_state_name()
2975 if (strcasecmp(name, "index-close") == 0) { in vdo_message_dedupe_index()
2978 } else if (strcasecmp(name, "index-create") == 0) { in vdo_message_dedupe_index()
2981 } else if (strcasecmp(name, "index-disable") == 0) { in vdo_message_dedupe_index()
2984 } else if (strcasecmp(name, "index-enable") == 0) { in vdo_message_dedupe_index()
2989 return -EINVAL; in vdo_message_dedupe_index()
2994 vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NORMAL_OPERATION); in vdo_set_dedupe_state_normal()