1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * Copyright 2023 Red Hat
4   */
5  
6  /**
7   * DOC:
8   *
9   * Hash Locks:
10   *
11   * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
12   * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
13   * also against each other. This saves on index queries and allows those data_vios to concurrently
14   * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
15   * index query is needed for each hash_lock, instead of one for every data_vio.
16   *
17   * Hash_locks are assigned to hash_zones by computing a modulus on the hash itself. Each hash_zone
18   * has a single dedicated queue and thread for performing all operations on the hash_locks assigned
19   * to that zone. The concurrency guarantees of this single-threaded model allow the code to omit
20   * more fine-grained locking for the hash_lock structures.
21   *
22   * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
23   * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
24   * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
25   * containing the lock. An asynchronous operation is almost always performed upon entering a state,
26   * and the callback from that operation triggers exiting the state and entering a new state.
27   *
28   * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
29   * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
30   * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
31   * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
32   * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
33   * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
34   * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
35   * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
36   *
37   * The existence of lock waiters is a key factor controlling which state the lock transitions to
38   * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
39   * doesn't, it will try to clean up and exit.
40   *
41   * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
42   * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
43   * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
44   * new copy of the data to a full data block or a slot in a compressed block (WRITING).
45   *
46   * Cleaning up consists of updating the index when the data location is different from the initial
47   * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
48   * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
49   * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
50   *
51   * The shortest sequence of states is for non-concurrent writes of new data:
52   *   INITIALIZING -> QUERYING -> WRITING -> BYPASSING
53   * This sequence is short because no PBN read lock or index update is needed.
54   *
55   * Non-concurrent, finding valid advice looks like this (endpoints elided):
56   *   -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
57   * Or with stale advice (endpoints elided):
58   *   -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
59   *
60   * When there are not enough available reference count increments available on a PBN for a data_vio
61   * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
62   * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
63   * data_vios will be directed to it. The two locks will proceed independently, but only the new
64   * lock will have the right to update the index (unless it also forks).
65   *
66   * Since rollover happens in a lock instance, once a valid data location has been selected, it will
67   * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
68   * non-endpoint states can be re-entered.
69   *
70   * The function names in this module follow a convention referencing the states and transitions in
71   * the state machine. For example, for the LOCKING state, there are start_locking() and
72   * finish_locking() functions.  start_locking() is invoked by the finish function of the state (or
73   * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
74   * on the hash zone thread.  finish_locking() is called by (or continued via callback from) the
75   * code actually obtaining the lock. It does any bookkeeping or decision-making required and
76   * invokes the appropriate start function of the state being transitioned to after LOCKING.
77   *
78   * ----------------------------------------------------------------------
79   *
80   * Index Queries:
81   *
82   * A query to the UDS index is handled asynchronously by the index's threads. When the query is
83   * complete, a callback supplied with the query will be called from one of the those threads. Under
84   * heavy system load, the index may be slower to respond than is desirable for reasonable I/O
85   * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
86   * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
87   * request without deduplicating. However, because the uds_request struct itself is supplied by the
88   * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
89   * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
90   * reference to the data_vio on behalf of which they are performing a query.
91   *
92   * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
93   * its hash_zone's pool. If one is available, that context is prepared, associated with the
94   * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
95   * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
96   * goes well, the dedupe callback will be called by the index which will change the context's state
97   * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
98   * zone where the query results will be processed and the context will be put back in the idle
99   * state and returned to the hash_zone's available list.
100   *
101   * The first time an index query is launched from a given hash_zone, a timer is started. When the
102   * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
103   * pending list will be searched for any contexts in the pending state which have been running for
104   * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
105   * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
106   * data_vios associated with timed out contexts are sent to continue processing their write
107   * operation without deduplicating. The timer is also restarted.
108   *
109   * When the dedupe callback is run for a context which is in the timed out state, that context is
110   * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
111   * associated data_vios have already been dispatched.
112   *
113   * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
114   * be searched for any contexts which are timed out and complete. One of these will be used
115   * immediately, and the rest will be returned to the available list and marked idle.
116   */
117  
118  #include "dedupe.h"
119  
120  #include <linux/atomic.h>
121  #include <linux/jiffies.h>
122  #include <linux/kernel.h>
123  #include <linux/list.h>
124  #include <linux/ratelimit.h>
125  #include <linux/spinlock.h>
126  #include <linux/timer.h>
127  
128  #include "logger.h"
129  #include "memory-alloc.h"
130  #include "numeric.h"
131  #include "permassert.h"
132  #include "string-utils.h"
133  
134  #include "indexer.h"
135  
136  #include "action-manager.h"
137  #include "admin-state.h"
138  #include "completion.h"
139  #include "constants.h"
140  #include "data-vio.h"
141  #include "int-map.h"
142  #include "io-submitter.h"
143  #include "packer.h"
144  #include "physical-zone.h"
145  #include "slab-depot.h"
146  #include "statistics.h"
147  #include "types.h"
148  #include "vdo.h"
149  #include "wait-queue.h"
150  
151  #define DEDUPE_QUERY_TIMER_IDLE 0
152  #define DEDUPE_QUERY_TIMER_RUNNING 1
153  #define DEDUPE_QUERY_TIMER_FIRED 2
154  
155  enum dedupe_context_state {
156  	DEDUPE_CONTEXT_IDLE,
157  	DEDUPE_CONTEXT_PENDING,
158  	DEDUPE_CONTEXT_TIMED_OUT,
159  	DEDUPE_CONTEXT_COMPLETE,
160  	DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
161  };
162  
163  /* Possible index states: closed, opened, or transitioning between those two. */
164  enum index_state {
165  	IS_CLOSED,
166  	IS_CHANGING,
167  	IS_OPENED,
168  };
169  
170  static const char *CLOSED = "closed";
171  static const char *CLOSING = "closing";
172  static const char *ERROR = "error";
173  static const char *OFFLINE = "offline";
174  static const char *ONLINE = "online";
175  static const char *OPENING = "opening";
176  static const char *SUSPENDED = "suspended";
177  static const char *UNKNOWN = "unknown";
178  
179  /* Version 2 uses the kernel space UDS index and is limited to 16 bytes */
180  #define UDS_ADVICE_VERSION 2
181  /* version byte + state byte + 64-bit little-endian PBN */
182  #define UDS_ADVICE_SIZE (1 + 1 + sizeof(u64))
183  
184  enum hash_lock_state {
185  	/* State for locks that are not in use or are being initialized. */
186  	VDO_HASH_LOCK_INITIALIZING,
187  
188  	/* This is the sequence of states typically used on the non-dedupe path. */
189  	VDO_HASH_LOCK_QUERYING,
190  	VDO_HASH_LOCK_WRITING,
191  	VDO_HASH_LOCK_UPDATING,
192  
193  	/* The remaining states are typically used on the dedupe path in this order. */
194  	VDO_HASH_LOCK_LOCKING,
195  	VDO_HASH_LOCK_VERIFYING,
196  	VDO_HASH_LOCK_DEDUPING,
197  	VDO_HASH_LOCK_UNLOCKING,
198  
199  	/*
200  	 * Terminal state for locks returning to the pool. Must be last both because it's the final
201  	 * state, and also because it's used to count the states.
202  	 */
203  	VDO_HASH_LOCK_BYPASSING,
204  };
205  
206  static const char * const LOCK_STATE_NAMES[] = {
207  	[VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
208  	[VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
209  	[VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
210  	[VDO_HASH_LOCK_LOCKING] = "LOCKING",
211  	[VDO_HASH_LOCK_QUERYING] = "QUERYING",
212  	[VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
213  	[VDO_HASH_LOCK_UPDATING] = "UPDATING",
214  	[VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
215  	[VDO_HASH_LOCK_WRITING] = "WRITING",
216  };
217  
218  struct hash_lock {
219  	/* The block hash covered by this lock */
220  	struct uds_record_name hash;
221  
222  	/* When the lock is unused, this list entry allows the lock to be pooled */
223  	struct list_head pool_node;
224  
225  	/*
226  	 * A list containing the data VIOs sharing this lock, all having the same record name and
227  	 * data block contents, linked by their hash_lock_node fields.
228  	 */
229  	struct list_head duplicate_ring;
230  
231  	/* The number of data_vios sharing this lock instance */
232  	data_vio_count_t reference_count;
233  
234  	/* The maximum value of reference_count in the lifetime of this lock */
235  	data_vio_count_t max_references;
236  
237  	/* The current state of this lock */
238  	enum hash_lock_state state;
239  
240  	/* True if the UDS index should be updated with new advice */
241  	bool update_advice;
242  
243  	/* True if the advice has been verified to be a true duplicate */
244  	bool verified;
245  
246  	/* True if the lock has already accounted for an initial verification */
247  	bool verify_counted;
248  
249  	/* True if this lock is registered in the lock map (cleared on rollover) */
250  	bool registered;
251  
252  	/*
253  	 * If verified is false, this is the location of a possible duplicate. If verified is true,
254  	 * it is the verified location of a true duplicate.
255  	 */
256  	struct zoned_pbn duplicate;
257  
258  	/* The PBN lock on the block containing the duplicate data */
259  	struct pbn_lock *duplicate_lock;
260  
261  	/* The data_vio designated to act on behalf of the lock */
262  	struct data_vio *agent;
263  
264  	/*
265  	 * Other data_vios with data identical to the agent who are currently waiting for the agent
266  	 * to get the information they all need to deduplicate--either against each other, or
267  	 * against an existing duplicate on disk.
268  	 */
269  	struct vdo_wait_queue waiters;
270  };
271  
272  #define LOCK_POOL_CAPACITY MAXIMUM_VDO_USER_VIOS
273  
274  struct hash_zones {
275  	struct action_manager *manager;
276  	struct uds_parameters parameters;
277  	struct uds_index_session *index_session;
278  	struct ratelimit_state ratelimiter;
279  	atomic64_t timeouts;
280  	atomic64_t dedupe_context_busy;
281  
282  	/* This spinlock protects the state fields and the starting of dedupe requests. */
283  	spinlock_t lock;
284  
285  	/* The fields in the next block are all protected by the lock */
286  	struct vdo_completion completion;
287  	enum index_state index_state;
288  	enum index_state index_target;
289  	struct admin_state state;
290  	bool changing;
291  	bool create_flag;
292  	bool dedupe_flag;
293  	bool error_flag;
294  	u64 reported_timeouts;
295  
296  	/* The number of zones */
297  	zone_count_t zone_count;
298  	/* The hash zones themselves */
299  	struct hash_zone zones[];
300  };
301  
302  /* These are in milliseconds. */
303  unsigned int vdo_dedupe_index_timeout_interval = 5000;
304  unsigned int vdo_dedupe_index_min_timer_interval = 100;
305  /* Same two variables, in jiffies for easier consumption. */
306  static u64 vdo_dedupe_index_timeout_jiffies;
307  static u64 vdo_dedupe_index_min_timer_jiffies;
308  
as_hash_zone(struct vdo_completion * completion)309  static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
310  {
311  	vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
312  	return container_of(completion, struct hash_zone, completion);
313  }
314  
as_hash_zones(struct vdo_completion * completion)315  static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
316  {
317  	vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
318  	return container_of(completion, struct hash_zones, completion);
319  }
320  
assert_in_hash_zone(struct hash_zone * zone,const char * name)321  static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
322  {
323  	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
324  			    "%s called on hash zone thread", name);
325  }
326  
change_context_state(struct dedupe_context * context,int old,int new)327  static inline bool change_context_state(struct dedupe_context *context, int old, int new)
328  {
329  	return (atomic_cmpxchg(&context->state, old, new) == old);
330  }
331  
change_timer_state(struct hash_zone * zone,int old,int new)332  static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
333  {
334  	return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
335  }
336  
337  /**
338   * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
339   * @zone: The zone from which the lock was borrowed.
340   * @lock: The lock that is no longer in use.
341   */
return_hash_lock_to_pool(struct hash_zone * zone,struct hash_lock * lock)342  static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
343  {
344  	memset(lock, 0, sizeof(*lock));
345  	INIT_LIST_HEAD(&lock->pool_node);
346  	INIT_LIST_HEAD(&lock->duplicate_ring);
347  	vdo_waitq_init(&lock->waiters);
348  	list_add_tail(&lock->pool_node, &zone->lock_pool);
349  }
350  
351  /**
352   * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
353   *                            the hash_lock the data_vio holds (if there is one).
354   * @data_vio: The data_vio to query.
355   *
356   * Return: The PBN lock on the data_vio's duplicate location.
357   */
vdo_get_duplicate_lock(struct data_vio * data_vio)358  struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
359  {
360  	if (data_vio->hash_lock == NULL)
361  		return NULL;
362  
363  	return data_vio->hash_lock->duplicate_lock;
364  }
365  
366  /**
367   * hash_lock_key() - Return hash_lock's record name as a hash code.
368   * @lock: The hash lock.
369   *
370   * Return: The key to use for the int map.
371   */
hash_lock_key(struct hash_lock * lock)372  static inline u64 hash_lock_key(struct hash_lock *lock)
373  {
374  	return get_unaligned_le64(&lock->hash.name);
375  }
376  
377  /**
378   * get_hash_lock_state_name() - Get the string representation of a hash lock state.
379   * @state: The hash lock state.
380   *
381   * Return: The short string representing the state
382   */
get_hash_lock_state_name(enum hash_lock_state state)383  static const char *get_hash_lock_state_name(enum hash_lock_state state)
384  {
385  	/* Catch if a state has been added without updating the name array. */
386  	BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
387  	return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
388  }
389  
390  /**
391   * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
392   *                            is being called in the hash zone.
393   * @data_vio: The data_vio expected to be the lock agent.
394   * @where: A string describing the function making the assertion.
395   */
assert_hash_lock_agent(struct data_vio * data_vio,const char * where)396  static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
397  {
398  	/* Not safe to access the agent field except from the hash zone. */
399  	assert_data_vio_in_hash_zone(data_vio);
400  	VDO_ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
401  			    "%s must be for the hash lock agent", where);
402  }
403  
404  /**
405   * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
406   *                        physical zone of the PBN lock.
407   * @hash_lock: The hash lock to update.
408   * @pbn_lock: The PBN read lock to use as the duplicate lock.
409   */
set_duplicate_lock(struct hash_lock * hash_lock,struct pbn_lock * pbn_lock)410  static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
411  {
412  	VDO_ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
413  			    "hash lock must not already hold a duplicate lock");
414  	pbn_lock->holder_count += 1;
415  	hash_lock->duplicate_lock = pbn_lock;
416  }
417  
418  /**
419   * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
420   * @lock: The lock containing the wait queue.
421   *
422   * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
423   */
dequeue_lock_waiter(struct hash_lock * lock)424  static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
425  {
426  	return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
427  }
428  
429  /**
430   * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
431   * @data_vio: The data_vio to update.
432   * @new_lock: The hash lock the data_vio is joining.
433   *
434   * Updates the hash lock (or locks) to reflect the change in membership.
435   */
set_hash_lock(struct data_vio * data_vio,struct hash_lock * new_lock)436  static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
437  {
438  	struct hash_lock *old_lock = data_vio->hash_lock;
439  
440  	if (old_lock != NULL) {
441  		VDO_ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
442  				    "must have a hash zone when holding a hash lock");
443  		VDO_ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
444  				    "must be on a hash lock ring when holding a hash lock");
445  		VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 0,
446  				    "hash lock reference must be counted");
447  
448  		if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
449  		    (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
450  			/*
451  			 * If the reference count goes to zero in a non-terminal state, we're most
452  			 * likely leaking this lock.
453  			 */
454  			VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 1,
455  					    "hash locks should only become unreferenced in a terminal state, not state %s",
456  					    get_hash_lock_state_name(old_lock->state));
457  		}
458  
459  		list_del_init(&data_vio->hash_lock_entry);
460  		old_lock->reference_count -= 1;
461  
462  		data_vio->hash_lock = NULL;
463  	}
464  
465  	if (new_lock != NULL) {
466  		/*
467  		 * Keep all data_vios sharing the lock on a ring since they can complete in any
468  		 * order and we'll always need a pointer to one to compare data.
469  		 */
470  		list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring);
471  		new_lock->reference_count += 1;
472  		if (new_lock->max_references < new_lock->reference_count)
473  			new_lock->max_references = new_lock->reference_count;
474  
475  		data_vio->hash_lock = new_lock;
476  	}
477  }
478  
479  /* There are loops in the state diagram, so some forward decl's are needed. */
480  static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
481  			   bool agent_is_done);
482  static void start_locking(struct hash_lock *lock, struct data_vio *agent);
483  static void start_writing(struct hash_lock *lock, struct data_vio *agent);
484  static void unlock_duplicate_pbn(struct vdo_completion *completion);
485  static void transfer_allocation_lock(struct data_vio *data_vio);
486  
487  /**
488   * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
489   *                    longer needed to be an agent for the hash lock.
490   * @data_vio: The data_vio to complete and send to be cleaned up.
491   */
exit_hash_lock(struct data_vio * data_vio)492  static void exit_hash_lock(struct data_vio *data_vio)
493  {
494  	/* Release the hash lock now, saving a thread transition in cleanup. */
495  	vdo_release_hash_lock(data_vio);
496  
497  	/* Complete the data_vio and start the clean-up path to release any locks it still holds. */
498  	data_vio->vio.completion.callback = complete_data_vio;
499  
500  	continue_data_vio(data_vio);
501  }
502  
503  /**
504   * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
505   *                            is_duplicate and duplicate fields from a zoned_pbn.
506   * @data_vio: The data_vio to modify.
507   * @source: The location of the duplicate.
508   */
set_duplicate_location(struct data_vio * data_vio,const struct zoned_pbn source)509  static void set_duplicate_location(struct data_vio *data_vio,
510  				   const struct zoned_pbn source)
511  {
512  	data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
513  	data_vio->duplicate = source;
514  }
515  
516  /**
517   * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
518   *                       make the retired agent exit the hash lock.
519   * @lock: The hash lock to update.
520   *
521   * Return: The new lock agent (which will be NULL if there was no waiter)
522   */
retire_lock_agent(struct hash_lock * lock)523  static struct data_vio *retire_lock_agent(struct hash_lock *lock)
524  {
525  	struct data_vio *old_agent = lock->agent;
526  	struct data_vio *new_agent = dequeue_lock_waiter(lock);
527  
528  	lock->agent = new_agent;
529  	exit_hash_lock(old_agent);
530  	if (new_agent != NULL)
531  		set_duplicate_location(new_agent, lock->duplicate);
532  	return new_agent;
533  }
534  
535  /**
536   * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
537   * @lock: The hash lock on which to wait.
538   * @data_vio: The data_vio to add to the queue.
539   */
wait_on_hash_lock(struct hash_lock * lock,struct data_vio * data_vio)540  static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
541  {
542  	vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);
543  
544  	/*
545  	 * Make sure the agent doesn't block indefinitely in the packer since it now has at least
546  	 * one other data_vio waiting on it.
547  	 */
548  	if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
549  		return;
550  
551  	/*
552  	 * Even though we're waiting, we also have to send ourselves as a one-way message to the
553  	 * packer to ensure the agent continues executing. This is safe because
554  	 * cancel_vio_compression() guarantees the agent won't continue executing until this
555  	 * message arrives in the packer, and because the wait queue link isn't used for sending
556  	 * the message.
557  	 */
558  	data_vio->compression.lock_holder = lock->agent;
559  	launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
560  }
561  
562  /**
563   * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
564   *                  optimization.
565   * @waiter: The data_vio's waiter link.
566   * @context: Not used.
567   */
abort_waiter(struct vdo_waiter * waiter,void * context __always_unused)568  static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
569  {
570  	write_data_vio(vdo_waiter_as_data_vio(waiter));
571  }
572  
573  /**
574   * start_bypassing() - Stop using the hash lock.
575   * @lock: The hash lock.
576   * @agent: The data_vio acting as the agent for the lock.
577   *
578   * Stops using the hash lock. This is the final transition for hash locks which did not get an
579   * error.
580   */
start_bypassing(struct hash_lock * lock,struct data_vio * agent)581  static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
582  {
583  	lock->state = VDO_HASH_LOCK_BYPASSING;
584  	exit_hash_lock(agent);
585  }
586  
vdo_clean_failed_hash_lock(struct data_vio * data_vio)587  void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
588  {
589  	struct hash_lock *lock = data_vio->hash_lock;
590  
591  	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
592  		exit_hash_lock(data_vio);
593  		return;
594  	}
595  
596  	if (lock->agent == NULL) {
597  		lock->agent = data_vio;
598  	} else if (data_vio != lock->agent) {
599  		exit_hash_lock(data_vio);
600  		return;
601  	}
602  
603  	lock->state = VDO_HASH_LOCK_BYPASSING;
604  
605  	/* Ensure we don't attempt to update advice when cleaning up. */
606  	lock->update_advice = false;
607  
608  	vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
609  
610  	if (lock->duplicate_lock != NULL) {
611  		/* The agent must reference the duplicate zone to launch it. */
612  		data_vio->duplicate = lock->duplicate;
613  		launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
614  		return;
615  	}
616  
617  	lock->agent = NULL;
618  	data_vio->is_duplicate = false;
619  	exit_hash_lock(data_vio);
620  }
621  
622  /**
623   * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
624   *                      duplicate candidate.
625   * @completion: The completion of the data_vio acting as the lock's agent.
626   *
627   * This continuation is registered in unlock_duplicate_pbn().
628   */
finish_unlocking(struct vdo_completion * completion)629  static void finish_unlocking(struct vdo_completion *completion)
630  {
631  	struct data_vio *agent = as_data_vio(completion);
632  	struct hash_lock *lock = agent->hash_lock;
633  
634  	assert_hash_lock_agent(agent, __func__);
635  
636  	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
637  			    "must have released the duplicate lock for the hash lock");
638  
639  	if (!lock->verified) {
640  		/*
641  		 * UNLOCKING -> WRITING transition: The lock we released was on an unverified
642  		 * block, so it must have been a lock on advice we were verifying, not on a
643  		 * location that was used for deduplication. Go write (or compress) the block to
644  		 * get a location to dedupe against.
645  		 */
646  		start_writing(lock, agent);
647  		return;
648  	}
649  
650  	/*
651  	 * With the lock released, the verified duplicate block may already have changed and will
652  	 * need to be re-verified if a waiter arrived.
653  	 */
654  	lock->verified = false;
655  
656  	if (vdo_waitq_has_waiters(&lock->waiters)) {
657  		/*
658  		 * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
659  		 * agent was releasing the PBN lock. The current agent exits and the waiter has to
660  		 * re-lock and re-verify the duplicate location.
661  		 *
662  		 * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
663  		 * to re-verify.
664  		 */
665  		agent = retire_lock_agent(lock);
666  		start_locking(lock, agent);
667  		return;
668  	}
669  
670  	/*
671  	 * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
672  	 * data_vios reference it, so remove it from the lock map and return it to the pool.
673  	 */
674  	start_bypassing(lock, agent);
675  }
676  
677  /**
678   * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
679   *                          contained duplicate data.
680   * @completion: The completion of the data_vio acting as the lock's agent.
681   *
682   * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
683   * hash zone thread.
684   */
unlock_duplicate_pbn(struct vdo_completion * completion)685  static void unlock_duplicate_pbn(struct vdo_completion *completion)
686  {
687  	struct data_vio *agent = as_data_vio(completion);
688  	struct hash_lock *lock = agent->hash_lock;
689  
690  	assert_data_vio_in_duplicate_zone(agent);
691  	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
692  			    "must have a duplicate lock to release");
693  
694  	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
695  					   vdo_forget(lock->duplicate_lock));
696  	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
697  		complete_data_vio(completion);
698  		return;
699  	}
700  
701  	launch_data_vio_hash_zone_callback(agent, finish_unlocking);
702  }
703  
704  /**
705   * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
706   *                     contained duplicate data.
707   * @lock: The hash lock.
708   * @agent: The data_vio currently acting as the agent for the lock.
709   */
start_unlocking(struct hash_lock * lock,struct data_vio * agent)710  static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
711  {
712  	lock->state = VDO_HASH_LOCK_UNLOCKING;
713  	launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
714  }
715  
release_context(struct dedupe_context * context)716  static void release_context(struct dedupe_context *context)
717  {
718  	struct hash_zone *zone = context->zone;
719  
720  	WRITE_ONCE(zone->active, zone->active - 1);
721  	list_move(&context->list_entry, &zone->available);
722  }
723  
process_update_result(struct data_vio * agent)724  static void process_update_result(struct data_vio *agent)
725  {
726  	struct dedupe_context *context = agent->dedupe_context;
727  
728  	if ((context == NULL) ||
729  	    !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
730  		return;
731  
732  	agent->dedupe_context = NULL;
733  	release_context(context);
734  }
735  
736  /**
737   * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
738   * @completion: The completion of the data_vio that performed the update
739   *
740   * This continuation is registered in start_querying().
741   */
finish_updating(struct vdo_completion * completion)742  static void finish_updating(struct vdo_completion *completion)
743  {
744  	struct data_vio *agent = as_data_vio(completion);
745  	struct hash_lock *lock = agent->hash_lock;
746  
747  	assert_hash_lock_agent(agent, __func__);
748  
749  	process_update_result(agent);
750  
751  	/*
752  	 * UDS was updated successfully, so don't update again unless the duplicate location
753  	 * changes due to rollover.
754  	 */
755  	lock->update_advice = false;
756  
757  	if (vdo_waitq_has_waiters(&lock->waiters)) {
758  		/*
759  		 * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
760  		 * Send it on the verified dedupe path. The agent is done with the lock, but the
761  		 * lock may still need to use it to clean up after rollover.
762  		 */
763  		start_deduping(lock, agent, true);
764  		return;
765  	}
766  
767  	if (lock->duplicate_lock != NULL) {
768  		/*
769  		 * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
770  		 * duplicate PBN lock, so go release it.
771  		 */
772  		start_unlocking(lock, agent);
773  		return;
774  	}
775  
776  	/*
777  	 * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
778  	 * release.
779  	 */
780  	start_bypassing(lock, agent);
781  }
782  
783  static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
784  
785  /**
786   * start_updating() - Continue deduplication with the last step, updating UDS with the location of
787   *                    the duplicate that should be returned as advice in the future.
788   * @lock: The hash lock.
789   * @agent: The data_vio currently acting as the agent for the lock.
790   */
start_updating(struct hash_lock * lock,struct data_vio * agent)791  static void start_updating(struct hash_lock *lock, struct data_vio *agent)
792  {
793  	lock->state = VDO_HASH_LOCK_UPDATING;
794  
795  	VDO_ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
796  	VDO_ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
797  
798  	agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
799  	set_data_vio_hash_zone_callback(agent, finish_updating);
800  	query_index(agent, UDS_UPDATE);
801  }
802  
803  /**
804   * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
805   *                     by the hash lock.
806   * @lock: The hash lock.
807   * @data_vio: The lock holder that has finished deduplicating.
808   *
809   * If there are other data_vios still sharing the lock, this will just release the data_vio's share
810   * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
811   * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
812   * eventually be released.
813   */
finish_deduping(struct hash_lock * lock,struct data_vio * data_vio)814  static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
815  {
816  	struct data_vio *agent = data_vio;
817  
818  	VDO_ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
819  	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
820  			    "shouldn't have any lock waiters in DEDUPING");
821  
822  	/* Just release the lock reference if other data_vios are still deduping. */
823  	if (lock->reference_count > 1) {
824  		exit_hash_lock(data_vio);
825  		return;
826  	}
827  
828  	/* The hash lock must have an agent for all other lock states. */
829  	lock->agent = agent;
830  	if (lock->update_advice) {
831  		/*
832  		 * DEDUPING -> UPDATING transition: The location of the duplicate block changed
833  		 * since the initial UDS query because of compression, rollover, or because the
834  		 * query agent didn't have an allocation. The UDS update was delayed in case there
835  		 * was another change in location, but with only this data_vio using the hash lock,
836  		 * it's time to update the advice.
837  		 */
838  		start_updating(lock, agent);
839  	} else {
840  		/*
841  		 * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
842  		 * location so the hash lock itself can be released (contingent on no new data_vios
843  		 * arriving in the lock before the agent returns).
844  		 */
845  		start_unlocking(lock, agent);
846  	}
847  }
848  
849  /**
850   * acquire_lock() - Get the lock for a record name.
851   * @zone: The zone responsible for the hash.
852   * @hash: The hash to lock.
853   * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
854   *                the new lock.
855   * @lock_ptr: A pointer to receive the hash lock.
856   *
857   * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
858   * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
859   * zone. This must only be called in the correct thread for the zone.
860   *
861   * Return: VDO_SUCCESS or an error code.
862   */
acquire_lock(struct hash_zone * zone,const struct uds_record_name * hash,struct hash_lock * replace_lock,struct hash_lock ** lock_ptr)863  static int __must_check acquire_lock(struct hash_zone *zone,
864  				     const struct uds_record_name *hash,
865  				     struct hash_lock *replace_lock,
866  				     struct hash_lock **lock_ptr)
867  {
868  	struct hash_lock *lock, *new_lock;
869  	int result;
870  
871  	/*
872  	 * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses
873  	 * in the common case of no lock contention.
874  	 */
875  	result = VDO_ASSERT(!list_empty(&zone->lock_pool),
876  			    "never need to wait for a free hash lock");
877  	if (result != VDO_SUCCESS)
878  		return result;
879  
880  	new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
881  	list_del_init(&new_lock->pool_node);
882  
883  	/*
884  	 * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
885  	 * map key.
886  	 */
887  	new_lock->hash = *hash;
888  
889  	result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
890  				 new_lock, (replace_lock != NULL), (void **) &lock);
891  	if (result != VDO_SUCCESS) {
892  		return_hash_lock_to_pool(zone, vdo_forget(new_lock));
893  		return result;
894  	}
895  
896  	if (replace_lock != NULL) {
897  		/* On mismatch put the old lock back and return a severe error */
898  		VDO_ASSERT_LOG_ONLY(lock == replace_lock,
899  				    "old lock must have been in the lock map");
900  		/* TODO: Check earlier and bail out? */
901  		VDO_ASSERT_LOG_ONLY(replace_lock->registered,
902  				    "old lock must have been marked registered");
903  		replace_lock->registered = false;
904  	}
905  
906  	if (lock == replace_lock) {
907  		lock = new_lock;
908  		lock->registered = true;
909  	} else {
910  		/* There's already a lock for the hash, so we don't need the borrowed lock. */
911  		return_hash_lock_to_pool(zone, vdo_forget(new_lock));
912  	}
913  
914  	*lock_ptr = lock;
915  	return VDO_SUCCESS;
916  }
917  
918  /**
919   * enter_forked_lock() - Bind the data_vio to a new hash lock.
920   *
921   * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
922   * on that lock.
923   */
enter_forked_lock(struct vdo_waiter * waiter,void * context)924  static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
925  {
926  	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
927  	struct hash_lock *new_lock = context;
928  
929  	set_hash_lock(data_vio, new_lock);
930  	wait_on_hash_lock(new_lock, data_vio);
931  }
932  
933  /**
934   * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
935   * @old_lock: The hash lock to fork.
936   * @new_agent: The data_vio that will be the agent for the new lock.
937   *
938   * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
939   * of the old lock in the lock map. The old lock remains active, but will not update advice.
940   */
fork_hash_lock(struct hash_lock * old_lock,struct data_vio * new_agent)941  static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
942  {
943  	struct hash_lock *new_lock;
944  	int result;
945  
946  	result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
947  			      &new_lock);
948  	if (result != VDO_SUCCESS) {
949  		continue_data_vio_with_error(new_agent, result);
950  		return;
951  	}
952  
953  	/*
954  	 * Only one of the two locks should update UDS. The old lock is out of references, so it
955  	 * would be poor dedupe advice in the short term.
956  	 */
957  	old_lock->update_advice = false;
958  	new_lock->update_advice = true;
959  
960  	set_hash_lock(new_agent, new_lock);
961  	new_lock->agent = new_agent;
962  
963  	vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
964  
965  	new_agent->is_duplicate = false;
966  	start_writing(new_lock, new_agent);
967  }
968  
969  /**
970   * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
971   *                   path.
972   * @lock: The hash lock.
973   * @data_vio: The data_vio to deduplicate using the hash lock.
974   * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
975   *
976   * If no increments are available, this will roll over to a new hash lock and launch the data_vio
977   * as the writing agent for that lock.
978   */
launch_dedupe(struct hash_lock * lock,struct data_vio * data_vio,bool has_claim)979  static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
980  			  bool has_claim)
981  {
982  	if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
983  		/* Out of increments, so must roll over to a new lock. */
984  		fork_hash_lock(lock, data_vio);
985  		return;
986  	}
987  
988  	/* Deduplicate against the lock's verified location. */
989  	set_duplicate_location(data_vio, lock->duplicate);
990  	data_vio->new_mapped = data_vio->duplicate;
991  	update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
992  }
993  
994  /**
995   * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
996   *                    true copy of their data on disk.
997   * @lock: The hash lock.
998   * @agent: The data_vio acting as the agent for the lock.
999   * @agent_is_done: true only if the agent has already written or deduplicated against its data.
1000   *
1001   * If the agent itself needs to deduplicate, an increment for it must already have been claimed
1002   * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
1003   */
start_deduping(struct hash_lock * lock,struct data_vio * agent,bool agent_is_done)1004  static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
1005  			   bool agent_is_done)
1006  {
1007  	lock->state = VDO_HASH_LOCK_DEDUPING;
1008  
1009  	/*
1010  	 * We don't take the downgraded allocation lock from the agent unless we actually need to
1011  	 * deduplicate against it.
1012  	 */
1013  	if (lock->duplicate_lock == NULL) {
1014  		VDO_ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
1015  				    "compression must have shared a lock");
1016  		VDO_ASSERT_LOG_ONLY(agent_is_done,
1017  				    "agent must have written the new duplicate");
1018  		transfer_allocation_lock(agent);
1019  	}
1020  
1021  	VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
1022  			    "duplicate_lock must be a PBN read lock");
1023  
1024  	/*
1025  	 * This state is not like any of the other states. There is no designated agent--the agent
1026  	 * transitioning to this state and all the waiters will be launched to deduplicate in
1027  	 * parallel.
1028  	 */
1029  	lock->agent = NULL;
1030  
1031  	/*
1032  	 * Launch the agent (if not already deduplicated) and as many lock waiters as we have
1033  	 * available increments for on the dedupe path. If we run out of increments, rollover will
1034  	 * be triggered and the remaining waiters will be transferred to the new lock.
1035  	 */
1036  	if (!agent_is_done) {
1037  		launch_dedupe(lock, agent, true);
1038  		agent = NULL;
1039  	}
1040  	while (vdo_waitq_has_waiters(&lock->waiters))
1041  		launch_dedupe(lock, dequeue_lock_waiter(lock), false);
1042  
1043  	if (agent_is_done) {
1044  		/*
1045  		 * In the degenerate case where all the waiters rolled over to a new lock, this
1046  		 * will continue to use the old agent to clean up this lock, and otherwise it just
1047  		 * lets the agent exit the lock.
1048  		 */
1049  		finish_deduping(lock, agent);
1050  	}
1051  }
1052  
1053  /**
1054   * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
1055   * @stat: The statistic field to increment.
1056   */
increment_stat(u64 * stat)1057  static inline void increment_stat(u64 *stat)
1058  {
1059  	/*
1060  	 * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
1061  	 * affecting other threads reading stats.
1062  	 */
1063  	WRITE_ONCE(*stat, *stat + 1);
1064  }
1065  
1066  /**
1067   * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
1068   *                      duplicate candidate.
1069   * @completion: The completion of the data_vio used to verify dedupe
1070   *
1071   * This continuation is registered in start_verifying().
1072   */
finish_verifying(struct vdo_completion * completion)1073  static void finish_verifying(struct vdo_completion *completion)
1074  {
1075  	struct data_vio *agent = as_data_vio(completion);
1076  	struct hash_lock *lock = agent->hash_lock;
1077  
1078  	assert_hash_lock_agent(agent, __func__);
1079  
1080  	lock->verified = agent->is_duplicate;
1081  
1082  	/*
1083  	 * Only count the result of the initial verification of the advice as valid or stale, and
1084  	 * not any re-verifications due to PBN lock releases.
1085  	 */
1086  	if (!lock->verify_counted) {
1087  		lock->verify_counted = true;
1088  		if (lock->verified)
1089  			increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
1090  		else
1091  			increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1092  	}
1093  
1094  	/*
1095  	 * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
1096  	 * claim a reference count increment for the agent.
1097  	 */
1098  	if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1099  		agent->is_duplicate = false;
1100  		lock->verified = false;
1101  	}
1102  
1103  	if (lock->verified) {
1104  		/*
1105  		 * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
1106  		 * deduplicating against it, if references are available.
1107  		 */
1108  		start_deduping(lock, agent, false);
1109  	} else {
1110  		/*
1111  		 * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
1112  		 * dedupe and roll over immediately, which would fail because it would leave the
1113  		 * lock without an agent to release the PBN lock. In both cases, the data will have
1114  		 * to be written or compressed, but first the advice PBN must be unlocked by the
1115  		 * VERIFYING agent.
1116  		 */
1117  		lock->update_advice = true;
1118  		start_unlocking(lock, agent);
1119  	}
1120  }
1121  
blocks_equal(char * block1,char * block2)1122  static bool blocks_equal(char *block1, char *block2)
1123  {
1124  	int i;
1125  
1126  	for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
1127  		if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
1128  			return false;
1129  	}
1130  
1131  	return true;
1132  }
1133  
verify_callback(struct vdo_completion * completion)1134  static void verify_callback(struct vdo_completion *completion)
1135  {
1136  	struct data_vio *agent = as_data_vio(completion);
1137  
1138  	agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
1139  	launch_data_vio_hash_zone_callback(agent, finish_verifying);
1140  }
1141  
uncompress_and_verify(struct vdo_completion * completion)1142  static void uncompress_and_verify(struct vdo_completion *completion)
1143  {
1144  	struct data_vio *agent = as_data_vio(completion);
1145  	int result;
1146  
1147  	result = uncompress_data_vio(agent, agent->duplicate.state,
1148  				     agent->scratch_block);
1149  	if (result == VDO_SUCCESS) {
1150  		verify_callback(completion);
1151  		return;
1152  	}
1153  
1154  	agent->is_duplicate = false;
1155  	launch_data_vio_hash_zone_callback(agent, finish_verifying);
1156  }
1157  
verify_endio(struct bio * bio)1158  static void verify_endio(struct bio *bio)
1159  {
1160  	struct data_vio *agent = vio_as_data_vio(bio->bi_private);
1161  	int result = blk_status_to_errno(bio->bi_status);
1162  
1163  	vdo_count_completed_bios(bio);
1164  	if (result != VDO_SUCCESS) {
1165  		agent->is_duplicate = false;
1166  		launch_data_vio_hash_zone_callback(agent, finish_verifying);
1167  		return;
1168  	}
1169  
1170  	if (vdo_is_state_compressed(agent->duplicate.state)) {
1171  		launch_data_vio_cpu_callback(agent, uncompress_and_verify,
1172  					     CPU_Q_COMPRESS_BLOCK_PRIORITY);
1173  		return;
1174  	}
1175  
1176  	launch_data_vio_cpu_callback(agent, verify_callback,
1177  				     CPU_Q_COMPLETE_READ_PRIORITY);
1178  }
1179  
1180  /**
1181   * start_verifying() - Begin the data verification phase.
1182   * @lock: The hash lock (must be LOCKING).
1183   * @agent: The data_vio to use to read and compare candidate data.
1184   *
1185   * Continue the deduplication path for a hash lock by using the agent to read (and possibly
1186   * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
1187   * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
1188   * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
1189   * dedupe.
1190   */
start_verifying(struct hash_lock * lock,struct data_vio * agent)1191  static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
1192  {
1193  	int result;
1194  	struct vio *vio = &agent->vio;
1195  	char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
1196  			(char *) agent->compression.block :
1197  			agent->scratch_block);
1198  
1199  	lock->state = VDO_HASH_LOCK_VERIFYING;
1200  	VDO_ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
1201  
1202  	agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
1203  	result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
1204  			       agent->duplicate.pbn);
1205  	if (result != VDO_SUCCESS) {
1206  		set_data_vio_hash_zone_callback(agent, finish_verifying);
1207  		continue_data_vio_with_error(agent, result);
1208  		return;
1209  	}
1210  
1211  	set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
1212  	vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
1213  }
1214  
1215  /**
1216   * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
1217   *                    lock on the candidate duplicate block.
1218   * @completion: The completion of the data_vio that attempted to get the read lock.
1219   *
1220   * This continuation is registered in lock_duplicate_pbn().
1221   */
finish_locking(struct vdo_completion * completion)1222  static void finish_locking(struct vdo_completion *completion)
1223  {
1224  	struct data_vio *agent = as_data_vio(completion);
1225  	struct hash_lock *lock = agent->hash_lock;
1226  
1227  	assert_hash_lock_agent(agent, __func__);
1228  
1229  	if (!agent->is_duplicate) {
1230  		VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1231  				    "must not hold duplicate_lock if not flagged as a duplicate");
1232  		/*
1233  		 * LOCKING -> WRITING transition: The advice block is being modified or has no
1234  		 * available references, so try to write or compress the data, remembering to
1235  		 * update UDS later with the new advice.
1236  		 */
1237  		increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1238  		lock->update_advice = true;
1239  		start_writing(lock, agent);
1240  		return;
1241  	}
1242  
1243  	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
1244  			    "must hold duplicate_lock if flagged as a duplicate");
1245  
1246  	if (!lock->verified) {
1247  		/*
1248  		 * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
1249  		 * the candidate duplicate and comparing it to the agent's data to decide whether
1250  		 * it is a true duplicate or stale advice.
1251  		 */
1252  		start_verifying(lock, agent);
1253  		return;
1254  	}
1255  
1256  	if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1257  		/*
1258  		 * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
1259  		 * available increments left. Must first release the useless PBN read lock before
1260  		 * rolling over to a new copy of the block.
1261  		 */
1262  		agent->is_duplicate = false;
1263  		lock->verified = false;
1264  		lock->update_advice = true;
1265  		start_unlocking(lock, agent);
1266  		return;
1267  	}
1268  
1269  	/*
1270  	 * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
1271  	 * against a location that was previously verified or written to.
1272  	 */
1273  	start_deduping(lock, agent, false);
1274  }
1275  
acquire_provisional_reference(struct data_vio * agent,struct pbn_lock * lock,struct slab_depot * depot)1276  static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
1277  					  struct slab_depot *depot)
1278  {
1279  	/* Ensure that the newly-locked block is referenced. */
1280  	struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
1281  	int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
1282  
1283  	if (result == VDO_SUCCESS)
1284  		return true;
1285  
1286  	vdo_log_warning_strerror(result,
1287  				 "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
1288  	agent->is_duplicate = false;
1289  	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
1290  					   agent->duplicate.pbn, lock);
1291  	continue_data_vio_with_error(agent, result);
1292  	return false;
1293  }
1294  
1295  /**
1296   * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
1297   *                        duplicate data (compressed or uncompressed).
1298   * @completion: The completion of the data_vio attempting to acquire the physical block lock on
1299   *              behalf of its hash lock.
1300   *
1301   * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
1302   * cleared before calling back. This continuation is launched from start_locking(), and calls back
1303   * to finish_locking() on the hash zone thread.
1304   */
lock_duplicate_pbn(struct vdo_completion * completion)1305  static void lock_duplicate_pbn(struct vdo_completion *completion)
1306  {
1307  	unsigned int increment_limit;
1308  	struct pbn_lock *lock;
1309  	int result;
1310  
1311  	struct data_vio *agent = as_data_vio(completion);
1312  	struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
1313  	struct physical_zone *zone = agent->duplicate.zone;
1314  
1315  	assert_data_vio_in_duplicate_zone(agent);
1316  
1317  	set_data_vio_hash_zone_callback(agent, finish_locking);
1318  
1319  	/*
1320  	 * While in the zone that owns it, find out how many additional references can be made to
1321  	 * the block if it turns out to truly be a duplicate.
1322  	 */
1323  	increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
1324  	if (increment_limit == 0) {
1325  		/*
1326  		 * We could deduplicate against it later if a reference happened to be released
1327  		 * during verification, but it's probably better to bail out now.
1328  		 */
1329  		agent->is_duplicate = false;
1330  		continue_data_vio(agent);
1331  		return;
1332  	}
1333  
1334  	result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
1335  						    VIO_READ_LOCK, &lock);
1336  	if (result != VDO_SUCCESS) {
1337  		continue_data_vio_with_error(agent, result);
1338  		return;
1339  	}
1340  
1341  	if (!vdo_is_pbn_read_lock(lock)) {
1342  		/*
1343  		 * There are three cases of write locks: uncompressed data block writes, compressed
1344  		 * (packed) block writes, and block map page writes. In all three cases, we give up
1345  		 * on trying to verify the advice and don't bother to try deduplicate against the
1346  		 * data in the write lock holder.
1347  		 *
1348  		 * 1) We don't ever want to try to deduplicate against a block map page.
1349  		 *
1350  		 * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
1351  		 * because of the chance of matching it, and because we don't record advice for it,
1352  		 * but for the uncompressed representation of all the fragments it contains. The
1353  		 * only way we'd be getting lock contention is if we've written the same
1354  		 * representation coincidentally before, had it become unreferenced, and it just
1355  		 * happened to be packed together from compressed writes when we go to verify the
1356  		 * lucky advice. Giving up is a minuscule loss of potential dedupe.
1357  		 *
1358  		 * 2b) If the advice is for a slot of a compressed block, it's about to get
1359  		 * smashed, and the write smashing it cannot contain our data--it would have to be
1360  		 * writing on behalf of our hash lock, but that's impossible since we're the lock
1361  		 * agent.
1362  		 *
1363  		 * 3a) If the lock is held by a data_vio with different data, the advice is already
1364  		 * stale or is about to become stale.
1365  		 *
1366  		 * 3b) If the lock is held by a data_vio that matches us, we may as well either
1367  		 * write it ourselves (or reference the copy we already wrote) instead of
1368  		 * potentially having many duplicates wait for the lock holder to write, journal,
1369  		 * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
1370  		 * update in the very rare case of advice for a free block that just happened to be
1371  		 * allocated to a data_vio with the same hash. There's also a chance to save on a
1372  		 * block write, at the cost of a block verify. Saving on a full block compare in
1373  		 * all stale advice cases almost certainly outweighs saving a UDS update and
1374  		 * trading a write for a read in a lucky case where advice would have been saved
1375  		 * from becoming stale.
1376  		 */
1377  		agent->is_duplicate = false;
1378  		continue_data_vio(agent);
1379  		return;
1380  	}
1381  
1382  	if (lock->holder_count == 0) {
1383  		if (!acquire_provisional_reference(agent, lock, depot))
1384  			return;
1385  
1386  		/*
1387  		 * The increment limit we grabbed earlier is still valid. The lock now holds the
1388  		 * rights to acquire all those references. Those rights will be claimed by hash
1389  		 * locks sharing this read lock.
1390  		 */
1391  		lock->increment_limit = increment_limit;
1392  	}
1393  
1394  	/*
1395  	 * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
1396  	 */
1397  	set_duplicate_lock(agent->hash_lock, lock);
1398  
1399  	/*
1400  	 * TODO: Optimization: We could directly launch the block verify, then switch to a hash
1401  	 * thread.
1402  	 */
1403  	continue_data_vio(agent);
1404  }
1405  
1406  /**
1407   * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
1408   *                   potential duplicate through its agent.
1409   * @lock: The hash lock (currently must be QUERYING).
1410   * @agent: The data_vio bearing the dedupe advice.
1411   */
start_locking(struct hash_lock * lock,struct data_vio * agent)1412  static void start_locking(struct hash_lock *lock, struct data_vio *agent)
1413  {
1414  	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1415  			    "must not acquire a duplicate lock when already holding it");
1416  
1417  	lock->state = VDO_HASH_LOCK_LOCKING;
1418  
1419  	/*
1420  	 * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
1421  	 * accepting the advice, and don't explicitly change lock states (or use an agent-local
1422  	 * state, or an atomic), we can avoid a thread transition here.
1423  	 */
1424  	agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
1425  	launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
1426  }
1427  
1428  /**
1429   * finish_writing() - Re-entry point for the lock agent after it has finished writing or
1430   *                    compressing its copy of the data block.
1431   * @lock: The hash lock, which must be in state WRITING.
1432   * @agent: The data_vio that wrote its data for the lock.
1433   *
1434   * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
1435   * may not be finished with it, as a UDS update might still be needed.
1436   *
1437   * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
1438   * the lock to deduplicate against the just-written block. If there are no other lock holders, the
1439   * agent either exits (and later tears down the hash lock), or it remains the agent and updates
1440   * UDS.
1441   */
finish_writing(struct hash_lock * lock,struct data_vio * agent)1442  static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
1443  {
1444  	/*
1445  	 * Dedupe against the data block or compressed block slot the agent wrote. Since we know
1446  	 * the write succeeded, there's no need to verify it.
1447  	 */
1448  	lock->duplicate = agent->new_mapped;
1449  	lock->verified = true;
1450  
1451  	if (vdo_is_state_compressed(lock->duplicate.state) && lock->registered) {
1452  		/*
1453  		 * Compression means the location we gave in the UDS query is not the location
1454  		 * we're using to deduplicate.
1455  		 */
1456  		lock->update_advice = true;
1457  	}
1458  
1459  	/* If there are any waiters, we need to start deduping them. */
1460  	if (vdo_waitq_has_waiters(&lock->waiters)) {
1461  		/*
1462  		 * WRITING -> DEDUPING transition: an asynchronously-written block failed to
1463  		 * compress, so the PBN lock on the written copy was already transferred. The agent
1464  		 * is done with the lock, but the lock may still need to use it to clean up after
1465  		 * rollover.
1466  		 */
1467  		start_deduping(lock, agent, true);
1468  		return;
1469  	}
1470  
1471  	/*
1472  	 * There are no waiters and the agent has successfully written, so take a step towards
1473  	 * being able to release the hash lock (or just release it).
1474  	 */
1475  	if (lock->update_advice) {
1476  		/*
1477  		 * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
1478  		 * retain the WRITING agent and use it to launch the update. The happens on
1479  		 * compression, rollover, or the QUERYING agent not having an allocation.
1480  		 */
1481  		start_updating(lock, agent);
1482  	} else if (lock->duplicate_lock != NULL) {
1483  		/*
1484  		 * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
1485  		 * compressed write gave us a shared duplicate lock that we must release.
1486  		 */
1487  		set_duplicate_location(agent, lock->duplicate);
1488  		start_unlocking(lock, agent);
1489  	} else {
1490  		/*
1491  		 * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
1492  		 * duplicate lock held, so both the agent and lock have no more work to do. The
1493  		 * agent will release its allocation lock in cleanup.
1494  		 */
1495  		start_bypassing(lock, agent);
1496  	}
1497  }
1498  
1499  /**
1500   * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
1501   * @lock: The hash lock to modify.
1502   *
1503   * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
1504   * return the new agent. Otherwise, just return the current agent.
1505   */
select_writing_agent(struct hash_lock * lock)1506  static struct data_vio *select_writing_agent(struct hash_lock *lock)
1507  {
1508  	struct vdo_wait_queue temp_queue;
1509  	struct data_vio *data_vio;
1510  
1511  	vdo_waitq_init(&temp_queue);
1512  
1513  	/*
1514  	 * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
1515  	 * search, but it only happens when nearly out of space.
1516  	 */
1517  	while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
1518  	       !data_vio_has_allocation(data_vio)) {
1519  		/* Use the lower-level enqueue since we're just moving waiters around. */
1520  		vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
1521  	}
1522  
1523  	if (data_vio != NULL) {
1524  		/*
1525  		 * Move the rest of the waiters over to the temp queue, preserving the order they
1526  		 * arrived at the lock.
1527  		 */
1528  		vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);
1529  
1530  		/*
1531  		 * The current agent is being replaced and will have to wait to dedupe; make it the
1532  		 * first waiter since it was the first to reach the lock.
1533  		 */
1534  		vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
1535  		lock->agent = data_vio;
1536  	} else {
1537  		/* No one has an allocation, so keep the current agent. */
1538  		data_vio = lock->agent;
1539  	}
1540  
1541  	/* Swap all the waiters back onto the lock's queue. */
1542  	vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
1543  	return data_vio;
1544  }
1545  
1546  /**
1547   * start_writing() - Begin the non-duplicate write path.
1548   * @lock: The hash lock (currently must be QUERYING).
1549   * @agent: The data_vio currently acting as the agent for the lock.
1550   *
1551   * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
1552   * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
1553   * path.
1554   */
start_writing(struct hash_lock * lock,struct data_vio * agent)1555  static void start_writing(struct hash_lock *lock, struct data_vio *agent)
1556  {
1557  	lock->state = VDO_HASH_LOCK_WRITING;
1558  
1559  	/*
1560  	 * The agent might not have received an allocation and so can't be used for writing, but
1561  	 * it's entirely possible that one of the waiters did.
1562  	 */
1563  	if (!data_vio_has_allocation(agent)) {
1564  		agent = select_writing_agent(lock);
1565  		/* If none of the waiters had an allocation, the writes all have to fail. */
1566  		if (!data_vio_has_allocation(agent)) {
1567  			/*
1568  			 * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
1569  			 * fail immediately if they don't have an allocation? It might be possible
1570  			 * that on some path there would be non-waiters still referencing the lock,
1571  			 * so it would remain in the map as everything is currently spelled, even
1572  			 * if the agent and all waiters release.
1573  			 */
1574  			continue_data_vio_with_error(agent, VDO_NO_SPACE);
1575  			return;
1576  		}
1577  	}
1578  
1579  	/*
1580  	 * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
1581  	 * there are any other data_vios waiting.
1582  	 */
1583  	if (vdo_waitq_has_waiters(&lock->waiters))
1584  		cancel_data_vio_compression(agent);
1585  
1586  	/*
1587  	 * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
1588  	 * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
1589  	 */
1590  	launch_compress_data_vio(agent);
1591  }
1592  
1593  /*
1594   * Decode VDO duplicate advice from the old_metadata field of a UDS request.
1595   * Returns true if valid advice was found and decoded
1596   */
decode_uds_advice(struct dedupe_context * context)1597  static bool decode_uds_advice(struct dedupe_context *context)
1598  {
1599  	const struct uds_request *request = &context->request;
1600  	struct data_vio *data_vio = context->requestor;
1601  	size_t offset = 0;
1602  	const struct uds_record_data *encoding = &request->old_metadata;
1603  	struct vdo *vdo = vdo_from_data_vio(data_vio);
1604  	struct zoned_pbn *advice = &data_vio->duplicate;
1605  	u8 version;
1606  	int result;
1607  
1608  	if ((request->status != UDS_SUCCESS) || !request->found)
1609  		return false;
1610  
1611  	version = encoding->data[offset++];
1612  	if (version != UDS_ADVICE_VERSION) {
1613  		vdo_log_error("invalid UDS advice version code %u", version);
1614  		return false;
1615  	}
1616  
1617  	advice->state = encoding->data[offset++];
1618  	advice->pbn = get_unaligned_le64(&encoding->data[offset]);
1619  	offset += sizeof(u64);
1620  	BUG_ON(offset != UDS_ADVICE_SIZE);
1621  
1622  	/* Don't use advice that's clearly meaningless. */
1623  	if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
1624  		vdo_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
1625  			      (unsigned long long) advice->pbn, advice->state,
1626  			      (unsigned long long) data_vio->logical.lbn);
1627  		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1628  		return false;
1629  	}
1630  
1631  	result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
1632  	if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
1633  		vdo_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
1634  			      (unsigned long long) advice->pbn,
1635  			      (unsigned long long) data_vio->logical.lbn);
1636  		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1637  		return false;
1638  	}
1639  
1640  	return true;
1641  }
1642  
process_query_result(struct data_vio * agent)1643  static void process_query_result(struct data_vio *agent)
1644  {
1645  	struct dedupe_context *context = agent->dedupe_context;
1646  
1647  	if (context == NULL)
1648  		return;
1649  
1650  	if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
1651  		agent->is_duplicate = decode_uds_advice(context);
1652  		agent->dedupe_context = NULL;
1653  		release_context(context);
1654  	}
1655  }
1656  
1657  /**
1658   * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
1659   * @completion: The completion of the data_vio that performed the query.
1660   *
1661   * This continuation is registered in start_querying().
1662   */
finish_querying(struct vdo_completion * completion)1663  static void finish_querying(struct vdo_completion *completion)
1664  {
1665  	struct data_vio *agent = as_data_vio(completion);
1666  	struct hash_lock *lock = agent->hash_lock;
1667  
1668  	assert_hash_lock_agent(agent, __func__);
1669  
1670  	process_query_result(agent);
1671  
1672  	if (agent->is_duplicate) {
1673  		lock->duplicate = agent->duplicate;
1674  		/*
1675  		 * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
1676  		 * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
1677  		 * that the advice can be used.
1678  		 */
1679  		start_locking(lock, agent);
1680  	} else {
1681  		/*
1682  		 * The agent will be used as the duplicate if has an allocation; if it does, that
1683  		 * location was posted to UDS, so no update will be needed.
1684  		 */
1685  		lock->update_advice = !data_vio_has_allocation(agent);
1686  		/*
1687  		 * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
1688  		 * so try to write or compress the data.
1689  		 */
1690  		start_writing(lock, agent);
1691  	}
1692  }
1693  
1694  /**
1695   * start_querying() - Start deduplication for a hash lock.
1696   * @lock: The initialized hash lock.
1697   * @data_vio: The data_vio that has just obtained the new lock.
1698   *
1699   * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
1700   * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
1701   * query on behalf of the lock.
1702   */
start_querying(struct hash_lock * lock,struct data_vio * data_vio)1703  static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
1704  {
1705  	lock->agent = data_vio;
1706  	lock->state = VDO_HASH_LOCK_QUERYING;
1707  	data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
1708  	set_data_vio_hash_zone_callback(data_vio, finish_querying);
1709  	query_index(data_vio,
1710  		    (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
1711  }
1712  
1713  /**
1714   * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
1715   *                             unimplemented or unusable state and continue the data_vio with an
1716   *                             error.
1717   * @lock: The hash lock.
1718   * @data_vio: The data_vio attempting to enter the lock.
1719   */
report_bogus_lock_state(struct hash_lock * lock,struct data_vio * data_vio)1720  static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
1721  {
1722  	VDO_ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
1723  			    get_hash_lock_state_name(lock->state));
1724  	continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
1725  }
1726  
1727  /**
1728   * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
1729   *                            deduplicating.
1730   * @data_vio: The data_vio to continue processing in its hash lock.
1731   *
1732   * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
1733   * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
1734   * lock, or update the UDS index, or simply release its share of the lock.
1735   *
1736   * Context: This must only be called in the correct thread for the hash zone.
1737   */
vdo_continue_hash_lock(struct vdo_completion * completion)1738  void vdo_continue_hash_lock(struct vdo_completion *completion)
1739  {
1740  	struct data_vio *data_vio = as_data_vio(completion);
1741  	struct hash_lock *lock = data_vio->hash_lock;
1742  
1743  	switch (lock->state) {
1744  	case VDO_HASH_LOCK_WRITING:
1745  		VDO_ASSERT_LOG_ONLY(data_vio == lock->agent,
1746  				    "only the lock agent may continue the lock");
1747  		finish_writing(lock, data_vio);
1748  		break;
1749  
1750  	case VDO_HASH_LOCK_DEDUPING:
1751  		finish_deduping(lock, data_vio);
1752  		break;
1753  
1754  	case VDO_HASH_LOCK_BYPASSING:
1755  		/* This data_vio has finished the write path and the lock doesn't need it. */
1756  		exit_hash_lock(data_vio);
1757  		break;
1758  
1759  	case VDO_HASH_LOCK_INITIALIZING:
1760  	case VDO_HASH_LOCK_QUERYING:
1761  	case VDO_HASH_LOCK_UPDATING:
1762  	case VDO_HASH_LOCK_LOCKING:
1763  	case VDO_HASH_LOCK_VERIFYING:
1764  	case VDO_HASH_LOCK_UNLOCKING:
1765  		/* A lock in this state should never be re-entered. */
1766  		report_bogus_lock_state(lock, data_vio);
1767  		break;
1768  
1769  	default:
1770  		report_bogus_lock_state(lock, data_vio);
1771  	}
1772  }
1773  
1774  /**
1775   * is_hash_collision() - Check to see if a hash collision has occurred.
1776   * @lock: The lock to check.
1777   * @candidate: The data_vio seeking to share the lock.
1778   *
1779   * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
1780   * share the lock, which should only be possible in the extremely unlikely case of a hash
1781   * collision.
1782   *
1783   * Return: true if the given data_vio must not share the lock because it doesn't have the same data
1784   *         as the lock holders.
1785   */
is_hash_collision(struct hash_lock * lock,struct data_vio * candidate)1786  static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
1787  {
1788  	struct data_vio *lock_holder;
1789  	struct hash_zone *zone;
1790  	bool collides;
1791  
1792  	if (list_empty(&lock->duplicate_ring))
1793  		return false;
1794  
1795  	lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio,
1796  				       hash_lock_entry);
1797  	zone = candidate->hash_zone;
1798  	collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
1799  	if (collides)
1800  		increment_stat(&zone->statistics.concurrent_hash_collisions);
1801  	else
1802  		increment_stat(&zone->statistics.concurrent_data_matches);
1803  
1804  	return collides;
1805  }
1806  
assert_hash_lock_preconditions(const struct data_vio * data_vio)1807  static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
1808  {
1809  	int result;
1810  
1811  	/* FIXME: BUG_ON() and/or enter read-only mode? */
1812  	result = VDO_ASSERT(data_vio->hash_lock == NULL,
1813  			    "must not already hold a hash lock");
1814  	if (result != VDO_SUCCESS)
1815  		return result;
1816  
1817  	result = VDO_ASSERT(list_empty(&data_vio->hash_lock_entry),
1818  			    "must not already be a member of a hash lock ring");
1819  	if (result != VDO_SUCCESS)
1820  		return result;
1821  
1822  	return VDO_ASSERT(data_vio->recovery_sequence_number == 0,
1823  			  "must not hold a recovery lock when getting a hash lock");
1824  }
1825  
1826  /**
1827   * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
1828   * @data_vio: The data_vio acquiring a lock on its record name.
1829   *
1830   * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
1831   * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
1832   * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
1833   * a lock reference.
1834   */
vdo_acquire_hash_lock(struct vdo_completion * completion)1835  void vdo_acquire_hash_lock(struct vdo_completion *completion)
1836  {
1837  	struct data_vio *data_vio = as_data_vio(completion);
1838  	struct hash_lock *lock;
1839  	int result;
1840  
1841  	assert_data_vio_in_hash_zone(data_vio);
1842  
1843  	result = assert_hash_lock_preconditions(data_vio);
1844  	if (result != VDO_SUCCESS) {
1845  		continue_data_vio_with_error(data_vio, result);
1846  		return;
1847  	}
1848  
1849  	result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
1850  	if (result != VDO_SUCCESS) {
1851  		continue_data_vio_with_error(data_vio, result);
1852  		return;
1853  	}
1854  
1855  	if (is_hash_collision(lock, data_vio)) {
1856  		/*
1857  		 * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
1858  		 * corruption. Bypass optimization entirely. We can't compress a data_vio without
1859  		 * a hash_lock as the compressed write depends on the hash_lock to manage the
1860  		 * references for the compressed block.
1861  		 */
1862  		write_data_vio(data_vio);
1863  		return;
1864  	}
1865  
1866  	set_hash_lock(data_vio, lock);
1867  	switch (lock->state) {
1868  	case VDO_HASH_LOCK_INITIALIZING:
1869  		start_querying(lock, data_vio);
1870  		return;
1871  
1872  	case VDO_HASH_LOCK_QUERYING:
1873  	case VDO_HASH_LOCK_WRITING:
1874  	case VDO_HASH_LOCK_UPDATING:
1875  	case VDO_HASH_LOCK_LOCKING:
1876  	case VDO_HASH_LOCK_VERIFYING:
1877  	case VDO_HASH_LOCK_UNLOCKING:
1878  		/* The lock is busy, and can't be shared yet. */
1879  		wait_on_hash_lock(lock, data_vio);
1880  		return;
1881  
1882  	case VDO_HASH_LOCK_BYPASSING:
1883  		/* We can't use this lock, so bypass optimization entirely. */
1884  		vdo_release_hash_lock(data_vio);
1885  		write_data_vio(data_vio);
1886  		return;
1887  
1888  	case VDO_HASH_LOCK_DEDUPING:
1889  		launch_dedupe(lock, data_vio, false);
1890  		return;
1891  
1892  	default:
1893  		/* A lock in this state should not be acquired by new VIOs. */
1894  		report_bogus_lock_state(lock, data_vio);
1895  	}
1896  }
1897  
1898  /**
1899   * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
1900   *                           data_vio's reference to it.
1901   * @data_vio: The data_vio releasing its hash lock.
1902   *
1903   * If the data_vio is the only one holding the lock, this also releases any resources or locks used
1904   * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
1905   * returns the lock to the hash zone's lock pool.
1906   *
1907   * Context: This must only be called in the correct thread for the hash zone.
1908   */
vdo_release_hash_lock(struct data_vio * data_vio)1909  void vdo_release_hash_lock(struct data_vio *data_vio)
1910  {
1911  	u64 lock_key;
1912  	struct hash_lock *lock = data_vio->hash_lock;
1913  	struct hash_zone *zone = data_vio->hash_zone;
1914  
1915  	if (lock == NULL)
1916  		return;
1917  
1918  	set_hash_lock(data_vio, NULL);
1919  
1920  	if (lock->reference_count > 0) {
1921  		/* The lock is still in use by other data_vios. */
1922  		return;
1923  	}
1924  
1925  	lock_key = hash_lock_key(lock);
1926  	if (lock->registered) {
1927  		struct hash_lock *removed;
1928  
1929  		removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
1930  		VDO_ASSERT_LOG_ONLY(lock == removed,
1931  				    "hash lock being released must have been mapped");
1932  	} else {
1933  		VDO_ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
1934  				    "unregistered hash lock must not be in the lock map");
1935  	}
1936  
1937  	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
1938  			    "hash lock returned to zone must have no waiters");
1939  	VDO_ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
1940  			    "hash lock returned to zone must not reference a PBN lock");
1941  	VDO_ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
1942  			    "returned hash lock must not be in use with state %s",
1943  			    get_hash_lock_state_name(lock->state));
1944  	VDO_ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
1945  			    "hash lock returned to zone must not be in a pool ring");
1946  	VDO_ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring),
1947  			    "hash lock returned to zone must not reference DataVIOs");
1948  
1949  	return_hash_lock_to_pool(zone, lock);
1950  }
1951  
1952  /**
1953   * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
1954   *                              data_vio's hash lock, converting it to a duplicate PBN lock.
1955   * @data_vio: The data_vio holding the allocation lock to transfer.
1956   */
transfer_allocation_lock(struct data_vio * data_vio)1957  static void transfer_allocation_lock(struct data_vio *data_vio)
1958  {
1959  	struct allocation *allocation = &data_vio->allocation;
1960  	struct hash_lock *hash_lock = data_vio->hash_lock;
1961  
1962  	VDO_ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
1963  			    "transferred lock must be for the block written");
1964  
1965  	allocation->pbn = VDO_ZERO_BLOCK;
1966  
1967  	VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
1968  			    "must have downgraded the allocation lock before transfer");
1969  
1970  	hash_lock->duplicate = data_vio->new_mapped;
1971  	data_vio->duplicate = data_vio->new_mapped;
1972  
1973  	/*
1974  	 * Since the lock is being transferred, the holder count doesn't change (and isn't even
1975  	 * safe to examine on this thread).
1976  	 */
1977  	hash_lock->duplicate_lock = vdo_forget(allocation->lock);
1978  }
1979  
1980  /**
1981   * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
1982   *                                     on the compressed block to which its data was just written.
1983   * @data_vio: The data_vio which was just compressed.
1984   * @pbn_lock: The PBN lock on the compressed block.
1985   *
1986   * If the lock is still a write lock (as it will be for the first share), it will be converted to a
1987   * read lock. This also reserves a reference count increment for the data_vio.
1988   */
vdo_share_compressed_write_lock(struct data_vio * data_vio,struct pbn_lock * pbn_lock)1989  void vdo_share_compressed_write_lock(struct data_vio *data_vio,
1990  				     struct pbn_lock *pbn_lock)
1991  {
1992  	bool claimed;
1993  
1994  	VDO_ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
1995  			    "a duplicate PBN lock should not exist when writing");
1996  	VDO_ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
1997  			    "lock transfer must be for a compressed write");
1998  	assert_data_vio_in_new_mapped_zone(data_vio);
1999  
2000  	/* First sharer downgrades the lock. */
2001  	if (!vdo_is_pbn_read_lock(pbn_lock))
2002  		vdo_downgrade_pbn_write_lock(pbn_lock, true);
2003  
2004  	/*
2005  	 * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
2006  	 * has had a chance to journal a reference.
2007  	 */
2008  	data_vio->duplicate = data_vio->new_mapped;
2009  	data_vio->hash_lock->duplicate = data_vio->new_mapped;
2010  	set_duplicate_lock(data_vio->hash_lock, pbn_lock);
2011  
2012  	/*
2013  	 * Claim a reference for this data_vio. Necessary since another hash_lock might start
2014  	 * deduplicating against it before our incRef.
2015  	 */
2016  	claimed = vdo_claim_pbn_lock_increment(pbn_lock);
2017  	VDO_ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
2018  }
2019  
start_uds_queue(void * ptr)2020  static void start_uds_queue(void *ptr)
2021  {
2022  	/*
2023  	 * Allow the UDS dedupe worker thread to do memory allocations. It will only do allocations
2024  	 * during the UDS calls that open or close an index, but those allocations can safely sleep
2025  	 * while reserving a large amount of memory. We could use an allocations_allowed boolean
2026  	 * (like the base threads do), but it would be an unnecessary embellishment.
2027  	 */
2028  	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
2029  
2030  	vdo_register_allocating_thread(&thread->allocating_thread, NULL);
2031  }
2032  
finish_uds_queue(void * ptr __always_unused)2033  static void finish_uds_queue(void *ptr __always_unused)
2034  {
2035  	vdo_unregister_allocating_thread();
2036  }
2037  
close_index(struct hash_zones * zones)2038  static void close_index(struct hash_zones *zones)
2039  	__must_hold(&zones->lock)
2040  {
2041  	int result;
2042  
2043  	/*
2044  	 * Change the index state so that get_index_statistics() will not try to use the index
2045  	 * session we are closing.
2046  	 */
2047  	zones->index_state = IS_CHANGING;
2048  	/* Close the index session, while not holding the lock. */
2049  	spin_unlock(&zones->lock);
2050  	result = uds_close_index(zones->index_session);
2051  
2052  	if (result != UDS_SUCCESS)
2053  		vdo_log_error_strerror(result, "Error closing index");
2054  	spin_lock(&zones->lock);
2055  	zones->index_state = IS_CLOSED;
2056  	zones->error_flag |= result != UDS_SUCCESS;
2057  	/* ASSERTION: We leave in IS_CLOSED state. */
2058  }
2059  
open_index(struct hash_zones * zones)2060  static void open_index(struct hash_zones *zones)
2061  	__must_hold(&zones->lock)
2062  {
2063  	/* ASSERTION: We enter in IS_CLOSED state. */
2064  	int result;
2065  	bool create_flag = zones->create_flag;
2066  
2067  	zones->create_flag = false;
2068  	/*
2069  	 * Change the index state so that the it will be reported to the outside world as
2070  	 * "opening".
2071  	 */
2072  	zones->index_state = IS_CHANGING;
2073  	zones->error_flag = false;
2074  
2075  	/* Open the index session, while not holding the lock */
2076  	spin_unlock(&zones->lock);
2077  	result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
2078  				&zones->parameters, zones->index_session);
2079  	if (result != UDS_SUCCESS)
2080  		vdo_log_error_strerror(result, "Error opening index");
2081  
2082  	spin_lock(&zones->lock);
2083  	if (!create_flag) {
2084  		switch (result) {
2085  		case -ENOENT:
2086  			/*
2087  			 * Either there is no index, or there is no way we can recover the index.
2088  			 * We will be called again and try to create a new index.
2089  			 */
2090  			zones->index_state = IS_CLOSED;
2091  			zones->create_flag = true;
2092  			return;
2093  		default:
2094  			break;
2095  		}
2096  	}
2097  	if (result == UDS_SUCCESS) {
2098  		zones->index_state = IS_OPENED;
2099  	} else {
2100  		zones->index_state = IS_CLOSED;
2101  		zones->index_target = IS_CLOSED;
2102  		zones->error_flag = true;
2103  		spin_unlock(&zones->lock);
2104  		vdo_log_info("Setting UDS index target state to error");
2105  		spin_lock(&zones->lock);
2106  	}
2107  	/*
2108  	 * ASSERTION: On success, we leave in IS_OPENED state.
2109  	 * ASSERTION: On failure, we leave in IS_CLOSED state.
2110  	 */
2111  }
2112  
change_dedupe_state(struct vdo_completion * completion)2113  static void change_dedupe_state(struct vdo_completion *completion)
2114  {
2115  	struct hash_zones *zones = as_hash_zones(completion);
2116  
2117  	spin_lock(&zones->lock);
2118  
2119  	/* Loop until the index is in the target state and the create flag is clear. */
2120  	while (vdo_is_state_normal(&zones->state) &&
2121  	       ((zones->index_state != zones->index_target) || zones->create_flag)) {
2122  		if (zones->index_state == IS_OPENED)
2123  			close_index(zones);
2124  		else
2125  			open_index(zones);
2126  	}
2127  
2128  	zones->changing = false;
2129  	spin_unlock(&zones->lock);
2130  }
2131  
start_expiration_timer(struct dedupe_context * context)2132  static void start_expiration_timer(struct dedupe_context *context)
2133  {
2134  	u64 start_time = context->submission_jiffies;
2135  	u64 end_time;
2136  
2137  	if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
2138  				DEDUPE_QUERY_TIMER_RUNNING))
2139  		return;
2140  
2141  	end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
2142  		       jiffies + vdo_dedupe_index_min_timer_jiffies);
2143  	mod_timer(&context->zone->timer, end_time);
2144  }
2145  
2146  /**
2147   * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
2148   *                            expiration time without getting answers, so we timed them out.
2149   * @zones: the hash zones.
2150   * @timeouts: the number of newly timed out requests.
2151   */
report_dedupe_timeouts(struct hash_zones * zones,unsigned int timeouts)2152  static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
2153  {
2154  	atomic64_add(timeouts, &zones->timeouts);
2155  	spin_lock(&zones->lock);
2156  	if (__ratelimit(&zones->ratelimiter)) {
2157  		u64 unreported = atomic64_read(&zones->timeouts);
2158  
2159  		unreported -= zones->reported_timeouts;
2160  		vdo_log_debug("UDS index timeout on %llu requests",
2161  			      (unsigned long long) unreported);
2162  		zones->reported_timeouts += unreported;
2163  	}
2164  	spin_unlock(&zones->lock);
2165  }
2166  
initialize_index(struct vdo * vdo,struct hash_zones * zones)2167  static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
2168  {
2169  	int result;
2170  	off_t uds_offset;
2171  	struct volume_geometry geometry = vdo->geometry;
2172  	static const struct vdo_work_queue_type uds_queue_type = {
2173  		.start = start_uds_queue,
2174  		.finish = finish_uds_queue,
2175  		.max_priority = UDS_Q_MAX_PRIORITY,
2176  		.default_priority = UDS_Q_PRIORITY,
2177  	};
2178  
2179  	vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
2180  	vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);
2181  
2182  	/*
2183  	 * Since we will save up the timeouts that would have been reported but were ratelimited,
2184  	 * we don't need to report ratelimiting.
2185  	 */
2186  	ratelimit_default_init(&zones->ratelimiter);
2187  	ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
2188  	uds_offset = ((vdo_get_index_region_start(geometry) -
2189  		       geometry.bio_offset) * VDO_BLOCK_SIZE);
2190  	zones->parameters = (struct uds_parameters) {
2191  		.bdev = vdo->device_config->owned_device->bdev,
2192  		.offset = uds_offset,
2193  		.size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
2194  		.memory_size = geometry.index_config.mem,
2195  		.sparse = geometry.index_config.sparse,
2196  		.nonce = (u64) geometry.nonce,
2197  	};
2198  
2199  	result = uds_create_index_session(&zones->index_session);
2200  	if (result != UDS_SUCCESS)
2201  		return result;
2202  
2203  	result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
2204  				 1, NULL);
2205  	if (result != VDO_SUCCESS) {
2206  		uds_destroy_index_session(vdo_forget(zones->index_session));
2207  		vdo_log_error("UDS index queue initialization failed (%d)", result);
2208  		return result;
2209  	}
2210  
2211  	vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
2212  	vdo_set_completion_callback(&zones->completion, change_dedupe_state,
2213  				    vdo->thread_config.dedupe_thread);
2214  	return VDO_SUCCESS;
2215  }
2216  
2217  /**
2218   * finish_index_operation() - This is the UDS callback for index queries.
2219   * @request: The uds request which has just completed.
2220   */
finish_index_operation(struct uds_request * request)2221  static void finish_index_operation(struct uds_request *request)
2222  {
2223  	struct dedupe_context *context = container_of(request, struct dedupe_context,
2224  						      request);
2225  
2226  	if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
2227  				 DEDUPE_CONTEXT_COMPLETE)) {
2228  		/*
2229  		 * This query has not timed out, so send its data_vio back to its hash zone to
2230  		 * process the results.
2231  		 */
2232  		continue_data_vio(context->requestor);
2233  		return;
2234  	}
2235  
2236  	/*
2237  	 * This query has timed out, so try to mark it complete and hence eligible for reuse. Its
2238  	 * data_vio has already moved on.
2239  	 */
2240  	if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
2241  				  DEDUPE_CONTEXT_TIMED_OUT_COMPLETE)) {
2242  		VDO_ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
2243  				    atomic_read(&context->state));
2244  	}
2245  
2246  	vdo_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
2247  }
2248  
2249  /**
2250   * check_for_drain_complete() - Check whether this zone has drained.
2251   * @zone: The zone to check.
2252   */
check_for_drain_complete(struct hash_zone * zone)2253  static void check_for_drain_complete(struct hash_zone *zone)
2254  {
2255  	data_vio_count_t recycled = 0;
2256  
2257  	if (!vdo_is_state_draining(&zone->state))
2258  		return;
2259  
2260  	if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
2261  	    change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2262  			       DEDUPE_QUERY_TIMER_IDLE)) {
2263  		del_timer_sync(&zone->timer);
2264  	} else {
2265  		/*
2266  		 * There is an in flight time-out, which must get processed before we can continue.
2267  		 */
2268  		return;
2269  	}
2270  
2271  	for (;;) {
2272  		struct dedupe_context *context;
2273  		struct funnel_queue_entry *entry;
2274  
2275  		entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2276  		if (entry == NULL)
2277  			break;
2278  
2279  		context = container_of(entry, struct dedupe_context, queue_entry);
2280  		atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
2281  		list_add(&context->list_entry, &zone->available);
2282  		recycled++;
2283  	}
2284  
2285  	if (recycled > 0)
2286  		WRITE_ONCE(zone->active, zone->active - recycled);
2287  	VDO_ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
2288  	vdo_finish_draining(&zone->state);
2289  }
2290  
timeout_index_operations_callback(struct vdo_completion * completion)2291  static void timeout_index_operations_callback(struct vdo_completion *completion)
2292  {
2293  	struct dedupe_context *context, *tmp;
2294  	struct hash_zone *zone = as_hash_zone(completion);
2295  	u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
2296  	unsigned long cutoff = jiffies - timeout_jiffies;
2297  	unsigned int timed_out = 0;
2298  
2299  	atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
2300  	list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
2301  		if (cutoff <= context->submission_jiffies) {
2302  			/*
2303  			 * We have reached the oldest query which has not timed out yet, so restart
2304  			 * the timer.
2305  			 */
2306  			start_expiration_timer(context);
2307  			break;
2308  		}
2309  
2310  		if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
2311  					  DEDUPE_CONTEXT_TIMED_OUT)) {
2312  			/*
2313  			 * This context completed between the time the timeout fired, and now. We
2314  			 * can treat it as a successful query, its requestor is already enqueued
2315  			 * to process it.
2316  			 */
2317  			continue;
2318  		}
2319  
2320  		/*
2321  		 * Remove this context from the pending list so we won't look at it again on a
2322  		 * subsequent timeout. Once the index completes it, it will be reused. Meanwhile,
2323  		 * send its requestor on its way.
2324  		 */
2325  		list_del_init(&context->list_entry);
2326  		context->requestor->dedupe_context = NULL;
2327  		continue_data_vio(context->requestor);
2328  		timed_out++;
2329  	}
2330  
2331  	if (timed_out > 0)
2332  		report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);
2333  
2334  	check_for_drain_complete(zone);
2335  }
2336  
timeout_index_operations(struct timer_list * t)2337  static void timeout_index_operations(struct timer_list *t)
2338  {
2339  	struct hash_zone *zone = from_timer(zone, t, timer);
2340  
2341  	if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2342  			       DEDUPE_QUERY_TIMER_FIRED))
2343  		vdo_launch_completion(&zone->completion);
2344  }
2345  
initialize_zone(struct vdo * vdo,struct hash_zones * zones,zone_count_t zone_number)2346  static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
2347  					zone_count_t zone_number)
2348  {
2349  	int result;
2350  	data_vio_count_t i;
2351  	struct hash_zone *zone = &zones->zones[zone_number];
2352  
2353  	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
2354  	if (result != VDO_SUCCESS)
2355  		return result;
2356  
2357  	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2358  	zone->zone_number = zone_number;
2359  	zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
2360  	vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
2361  	vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
2362  				    zone->thread_id);
2363  	INIT_LIST_HEAD(&zone->lock_pool);
2364  	result = vdo_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
2365  			      &zone->lock_array);
2366  	if (result != VDO_SUCCESS)
2367  		return result;
2368  
2369  	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2370  		return_hash_lock_to_pool(zone, &zone->lock_array[i]);
2371  
2372  	INIT_LIST_HEAD(&zone->available);
2373  	INIT_LIST_HEAD(&zone->pending);
2374  	result = vdo_make_funnel_queue(&zone->timed_out_complete);
2375  	if (result != VDO_SUCCESS)
2376  		return result;
2377  
2378  	timer_setup(&zone->timer, timeout_index_operations, 0);
2379  
2380  	for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
2381  		struct dedupe_context *context = &zone->contexts[i];
2382  
2383  		context->zone = zone;
2384  		context->request.callback = finish_index_operation;
2385  		context->request.session = zones->index_session;
2386  		list_add(&context->list_entry, &zone->available);
2387  	}
2388  
2389  	return vdo_make_default_thread(vdo, zone->thread_id);
2390  }
2391  
2392  /** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
get_thread_id_for_zone(void * context,zone_count_t zone_number)2393  static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
2394  {
2395  	struct hash_zones *zones = context;
2396  
2397  	return zones->zones[zone_number].thread_id;
2398  }
2399  
2400  /**
2401   * vdo_make_hash_zones() - Create the hash zones.
2402   *
2403   * @vdo: The vdo to which the zone will belong.
2404   * @zones_ptr: A pointer to hold the zones.
2405   *
2406   * Return: VDO_SUCCESS or an error code.
2407   */
vdo_make_hash_zones(struct vdo * vdo,struct hash_zones ** zones_ptr)2408  int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
2409  {
2410  	int result;
2411  	struct hash_zones *zones;
2412  	zone_count_t z;
2413  	zone_count_t zone_count = vdo->thread_config.hash_zone_count;
2414  
2415  	if (zone_count == 0)
2416  		return VDO_SUCCESS;
2417  
2418  	result = vdo_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
2419  				       __func__, &zones);
2420  	if (result != VDO_SUCCESS)
2421  		return result;
2422  
2423  	result = initialize_index(vdo, zones);
2424  	if (result != VDO_SUCCESS) {
2425  		vdo_free(zones);
2426  		return result;
2427  	}
2428  
2429  	vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
2430  
2431  	zones->zone_count = zone_count;
2432  	for (z = 0; z < zone_count; z++) {
2433  		result = initialize_zone(vdo, zones, z);
2434  		if (result != VDO_SUCCESS) {
2435  			vdo_free_hash_zones(zones);
2436  			return result;
2437  		}
2438  	}
2439  
2440  	result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
2441  					 vdo->thread_config.admin_thread, zones, NULL,
2442  					 vdo, &zones->manager);
2443  	if (result != VDO_SUCCESS) {
2444  		vdo_free_hash_zones(zones);
2445  		return result;
2446  	}
2447  
2448  	*zones_ptr = zones;
2449  	return VDO_SUCCESS;
2450  }
2451  
vdo_finish_dedupe_index(struct hash_zones * zones)2452  void vdo_finish_dedupe_index(struct hash_zones *zones)
2453  {
2454  	if (zones == NULL)
2455  		return;
2456  
2457  	uds_destroy_index_session(vdo_forget(zones->index_session));
2458  }
2459  
2460  /**
2461   * vdo_free_hash_zones() - Free the hash zones.
2462   * @zones: The zone to free.
2463   */
vdo_free_hash_zones(struct hash_zones * zones)2464  void vdo_free_hash_zones(struct hash_zones *zones)
2465  {
2466  	zone_count_t i;
2467  
2468  	if (zones == NULL)
2469  		return;
2470  
2471  	vdo_free(vdo_forget(zones->manager));
2472  
2473  	for (i = 0; i < zones->zone_count; i++) {
2474  		struct hash_zone *zone = &zones->zones[i];
2475  
2476  		vdo_free_funnel_queue(vdo_forget(zone->timed_out_complete));
2477  		vdo_int_map_free(vdo_forget(zone->hash_lock_map));
2478  		vdo_free(vdo_forget(zone->lock_array));
2479  	}
2480  
2481  	if (zones->index_session != NULL)
2482  		vdo_finish_dedupe_index(zones);
2483  
2484  	ratelimit_state_exit(&zones->ratelimiter);
2485  	vdo_free(zones);
2486  }
2487  
initiate_suspend_index(struct admin_state * state)2488  static void initiate_suspend_index(struct admin_state *state)
2489  {
2490  	struct hash_zones *zones = container_of(state, struct hash_zones, state);
2491  	enum index_state index_state;
2492  
2493  	spin_lock(&zones->lock);
2494  	index_state = zones->index_state;
2495  	spin_unlock(&zones->lock);
2496  
2497  	if (index_state != IS_CLOSED) {
2498  		bool save = vdo_is_state_saving(&zones->state);
2499  		int result;
2500  
2501  		result = uds_suspend_index_session(zones->index_session, save);
2502  		if (result != UDS_SUCCESS)
2503  			vdo_log_error_strerror(result, "Error suspending dedupe index");
2504  	}
2505  
2506  	vdo_finish_draining(state);
2507  }
2508  
2509  /**
2510   * suspend_index() - Suspend the UDS index prior to draining hash zones.
2511   *
2512   * Implements vdo_action_preamble_fn
2513   */
suspend_index(void * context,struct vdo_completion * completion)2514  static void suspend_index(void *context, struct vdo_completion *completion)
2515  {
2516  	struct hash_zones *zones = context;
2517  
2518  	vdo_start_draining(&zones->state,
2519  			   vdo_get_current_manager_operation(zones->manager), completion,
2520  			   initiate_suspend_index);
2521  }
2522  
2523  /**
2524   * initiate_drain() - Initiate a drain.
2525   *
2526   * Implements vdo_admin_initiator_fn.
2527   */
initiate_drain(struct admin_state * state)2528  static void initiate_drain(struct admin_state *state)
2529  {
2530  	check_for_drain_complete(container_of(state, struct hash_zone, state));
2531  }
2532  
2533  /**
2534   * drain_hash_zone() - Drain a hash zone.
2535   *
2536   * Implements vdo_zone_action_fn.
2537   */
drain_hash_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)2538  static void drain_hash_zone(void *context, zone_count_t zone_number,
2539  			    struct vdo_completion *parent)
2540  {
2541  	struct hash_zones *zones = context;
2542  
2543  	vdo_start_draining(&zones->zones[zone_number].state,
2544  			   vdo_get_current_manager_operation(zones->manager), parent,
2545  			   initiate_drain);
2546  }
2547  
2548  /** vdo_drain_hash_zones() - Drain all hash zones. */
vdo_drain_hash_zones(struct hash_zones * zones,struct vdo_completion * parent)2549  void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2550  {
2551  	vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
2552  			       drain_hash_zone, NULL, parent);
2553  }
2554  
launch_dedupe_state_change(struct hash_zones * zones)2555  static void launch_dedupe_state_change(struct hash_zones *zones)
2556  	__must_hold(&zones->lock)
2557  {
2558  	/* ASSERTION: We enter with the lock held. */
2559  	if (zones->changing || !vdo_is_state_normal(&zones->state))
2560  		/* Either a change is already in progress, or changes are not allowed. */
2561  		return;
2562  
2563  	if (zones->create_flag || (zones->index_state != zones->index_target)) {
2564  		zones->changing = true;
2565  		vdo_launch_completion(&zones->completion);
2566  		return;
2567  	}
2568  
2569  	/* ASSERTION: We exit with the lock held. */
2570  }
2571  
2572  /**
2573   * resume_index() - Resume the UDS index prior to resuming hash zones.
2574   *
2575   * Implements vdo_action_preamble_fn
2576   */
resume_index(void * context,struct vdo_completion * parent)2577  static void resume_index(void *context, struct vdo_completion *parent)
2578  {
2579  	struct hash_zones *zones = context;
2580  	struct device_config *config = parent->vdo->device_config;
2581  	int result;
2582  
2583  	zones->parameters.bdev = config->owned_device->bdev;
2584  	result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
2585  	if (result != UDS_SUCCESS)
2586  		vdo_log_error_strerror(result, "Error resuming dedupe index");
2587  
2588  	spin_lock(&zones->lock);
2589  	vdo_resume_if_quiescent(&zones->state);
2590  
2591  	if (config->deduplication) {
2592  		zones->index_target = IS_OPENED;
2593  		WRITE_ONCE(zones->dedupe_flag, true);
2594  	} else {
2595  		zones->index_target = IS_CLOSED;
2596  	}
2597  
2598  	launch_dedupe_state_change(zones);
2599  	spin_unlock(&zones->lock);
2600  
2601  	vdo_finish_completion(parent);
2602  }
2603  
2604  /**
2605   * resume_hash_zone() - Resume a hash zone.
2606   *
2607   * Implements vdo_zone_action_fn.
2608   */
resume_hash_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)2609  static void resume_hash_zone(void *context, zone_count_t zone_number,
2610  			     struct vdo_completion *parent)
2611  {
2612  	struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
2613  
2614  	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
2615  }
2616  
2617  /**
2618   * vdo_resume_hash_zones() - Resume a set of hash zones.
2619   * @zones: The hash zones to resume.
2620   * @parent: The object to notify when the zones have resumed.
2621   */
vdo_resume_hash_zones(struct hash_zones * zones,struct vdo_completion * parent)2622  void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2623  {
2624  	if (vdo_is_read_only(parent->vdo)) {
2625  		vdo_launch_completion(parent);
2626  		return;
2627  	}
2628  
2629  	vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
2630  			       resume_hash_zone, NULL, parent);
2631  }
2632  
2633  /**
2634   * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
2635   * @zone: The hash zone to query.
2636   * @tally: The tally
2637   */
get_hash_zone_statistics(const struct hash_zone * zone,struct hash_lock_statistics * tally)2638  static void get_hash_zone_statistics(const struct hash_zone *zone,
2639  				     struct hash_lock_statistics *tally)
2640  {
2641  	const struct hash_lock_statistics *stats = &zone->statistics;
2642  
2643  	tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
2644  	tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
2645  	tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
2646  	tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
2647  	tally->curr_dedupe_queries += READ_ONCE(zone->active);
2648  }
2649  
get_index_statistics(struct hash_zones * zones,struct index_statistics * stats)2650  static void get_index_statistics(struct hash_zones *zones,
2651  				 struct index_statistics *stats)
2652  {
2653  	enum index_state state;
2654  	struct uds_index_stats index_stats;
2655  	int result;
2656  
2657  	spin_lock(&zones->lock);
2658  	state = zones->index_state;
2659  	spin_unlock(&zones->lock);
2660  
2661  	if (state != IS_OPENED)
2662  		return;
2663  
2664  	result = uds_get_index_session_stats(zones->index_session, &index_stats);
2665  	if (result != UDS_SUCCESS) {
2666  		vdo_log_error_strerror(result, "Error reading index stats");
2667  		return;
2668  	}
2669  
2670  	stats->entries_indexed = index_stats.entries_indexed;
2671  	stats->posts_found = index_stats.posts_found;
2672  	stats->posts_not_found = index_stats.posts_not_found;
2673  	stats->queries_found = index_stats.queries_found;
2674  	stats->queries_not_found = index_stats.queries_not_found;
2675  	stats->updates_found = index_stats.updates_found;
2676  	stats->updates_not_found = index_stats.updates_not_found;
2677  	stats->entries_discarded = index_stats.entries_discarded;
2678  }
2679  
2680  /**
2681   * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
2682   * @hash_zones: The hash zones to query
2683   *
2684   * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
2685   *         index
2686   */
vdo_get_dedupe_statistics(struct hash_zones * zones,struct vdo_statistics * stats)2687  void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
2688  
2689  {
2690  	zone_count_t zone;
2691  
2692  	for (zone = 0; zone < zones->zone_count; zone++)
2693  		get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
2694  
2695  	get_index_statistics(zones, &stats->index);
2696  
2697  	/*
2698  	 * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
2699  	 * of queries not made because of earlier timeouts.
2700  	 */
2701  	stats->dedupe_advice_timeouts =
2702  		(atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
2703  }
2704  
2705  /**
2706   * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
2707   * @zones: The hash_zones from which to select.
2708   * @name: The record name.
2709   *
2710   * Return: The hash zone responsible for the record name.
2711   */
vdo_select_hash_zone(struct hash_zones * zones,const struct uds_record_name * name)2712  struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
2713  				       const struct uds_record_name *name)
2714  {
2715  	/*
2716  	 * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
2717  	 * since the number of hash zones is small.
2718  	 * TODO: Verify that the first byte is independent enough.
2719  	 */
2720  	u32 hash = name->name[0];
2721  
2722  	/*
2723  	 * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
2724  	 * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
2725  	 * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
2726  	 * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
2727  	 */
2728  	hash = (hash * zones->zone_count) >> 8;
2729  	return &zones->zones[hash];
2730  }
2731  
2732  /**
2733   * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
2734   *                    free list.
2735   * @lock: The hash lock to dump.
2736   */
dump_hash_lock(const struct hash_lock * lock)2737  static void dump_hash_lock(const struct hash_lock *lock)
2738  {
2739  	const char *state;
2740  
2741  	if (!list_empty(&lock->pool_node)) {
2742  		/* This lock is on the free list. */
2743  		return;
2744  	}
2745  
2746  	/*
2747  	 * Necessarily cryptic since we can log a lot of these. First three chars of state is
2748  	 * unambiguous. 'U' indicates a lock not registered in the map.
2749  	 */
2750  	state = get_hash_lock_state_name(lock->state);
2751  	vdo_log_info("  hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
2752  		     lock, state, (lock->registered ? 'D' : 'U'),
2753  		     (unsigned long long) lock->duplicate.pbn,
2754  		     lock->duplicate.state, lock->reference_count,
2755  		     vdo_waitq_num_waiters(&lock->waiters), lock->agent);
2756  }
2757  
index_state_to_string(struct hash_zones * zones,enum index_state state)2758  static const char *index_state_to_string(struct hash_zones *zones,
2759  					 enum index_state state)
2760  {
2761  	if (!vdo_is_state_normal(&zones->state))
2762  		return SUSPENDED;
2763  
2764  	switch (state) {
2765  	case IS_CLOSED:
2766  		return zones->error_flag ? ERROR : CLOSED;
2767  	case IS_CHANGING:
2768  		return zones->index_target == IS_OPENED ? OPENING : CLOSING;
2769  	case IS_OPENED:
2770  		return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
2771  	default:
2772  		return UNKNOWN;
2773  	}
2774  }
2775  
2776  /**
2777   * dump_hash_zone() - Dump information about a hash zone to the log for debugging.
2778   * @zone: The zone to dump.
2779   */
dump_hash_zone(const struct hash_zone * zone)2780  static void dump_hash_zone(const struct hash_zone *zone)
2781  {
2782  	data_vio_count_t i;
2783  
2784  	if (zone->hash_lock_map == NULL) {
2785  		vdo_log_info("struct hash_zone %u: NULL map", zone->zone_number);
2786  		return;
2787  	}
2788  
2789  	vdo_log_info("struct hash_zone %u: mapSize=%zu",
2790  		     zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
2791  	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2792  		dump_hash_lock(&zone->lock_array[i]);
2793  }
2794  
2795  /**
2796   * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
2797   * @zones: The zones to dump.
2798   */
vdo_dump_hash_zones(struct hash_zones * zones)2799  void vdo_dump_hash_zones(struct hash_zones *zones)
2800  {
2801  	const char *state, *target;
2802  	zone_count_t zone;
2803  
2804  	spin_lock(&zones->lock);
2805  	state = index_state_to_string(zones, zones->index_state);
2806  	target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
2807  	spin_unlock(&zones->lock);
2808  
2809  	vdo_log_info("UDS index: state: %s", state);
2810  	if (target != NULL)
2811  		vdo_log_info("UDS index: changing to state: %s", target);
2812  
2813  	for (zone = 0; zone < zones->zone_count; zone++)
2814  		dump_hash_zone(&zones->zones[zone]);
2815  }
2816  
vdo_set_dedupe_index_timeout_interval(unsigned int value)2817  void vdo_set_dedupe_index_timeout_interval(unsigned int value)
2818  {
2819  	u64 alb_jiffies;
2820  
2821  	/* Arbitrary maximum value is two minutes */
2822  	if (value > 120000)
2823  		value = 120000;
2824  	/* Arbitrary minimum value is 2 jiffies */
2825  	alb_jiffies = msecs_to_jiffies(value);
2826  
2827  	if (alb_jiffies < 2) {
2828  		alb_jiffies = 2;
2829  		value = jiffies_to_msecs(alb_jiffies);
2830  	}
2831  	vdo_dedupe_index_timeout_interval = value;
2832  	vdo_dedupe_index_timeout_jiffies = alb_jiffies;
2833  }
2834  
vdo_set_dedupe_index_min_timer_interval(unsigned int value)2835  void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
2836  {
2837  	u64 min_jiffies;
2838  
2839  	/* Arbitrary maximum value is one second */
2840  	if (value > 1000)
2841  		value = 1000;
2842  
2843  	/* Arbitrary minimum value is 2 jiffies */
2844  	min_jiffies = msecs_to_jiffies(value);
2845  
2846  	if (min_jiffies < 2) {
2847  		min_jiffies = 2;
2848  		value = jiffies_to_msecs(min_jiffies);
2849  	}
2850  
2851  	vdo_dedupe_index_min_timer_interval = value;
2852  	vdo_dedupe_index_min_timer_jiffies = min_jiffies;
2853  }
2854  
2855  /**
2856   * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
2857   * @zone: the hash zone
2858   *
2859   * Return: A dedupe_context or NULL if none are available
2860   */
acquire_context(struct hash_zone * zone)2861  static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
2862  {
2863  	struct dedupe_context *context;
2864  	struct funnel_queue_entry *entry;
2865  
2866  	assert_in_hash_zone(zone, __func__);
2867  
2868  	if (!list_empty(&zone->available)) {
2869  		WRITE_ONCE(zone->active, zone->active + 1);
2870  		context = list_first_entry(&zone->available, struct dedupe_context,
2871  					   list_entry);
2872  		list_del_init(&context->list_entry);
2873  		return context;
2874  	}
2875  
2876  	entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2877  	return ((entry == NULL) ?
2878  		NULL : container_of(entry, struct dedupe_context, queue_entry));
2879  }
2880  
prepare_uds_request(struct uds_request * request,struct data_vio * data_vio,enum uds_request_type operation)2881  static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
2882  				enum uds_request_type operation)
2883  {
2884  	request->record_name = data_vio->record_name;
2885  	request->type = operation;
2886  	if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
2887  		size_t offset = 0;
2888  		struct uds_record_data *encoding = &request->new_metadata;
2889  
2890  		encoding->data[offset++] = UDS_ADVICE_VERSION;
2891  		encoding->data[offset++] = data_vio->new_mapped.state;
2892  		put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
2893  		offset += sizeof(u64);
2894  		BUG_ON(offset != UDS_ADVICE_SIZE);
2895  	}
2896  }
2897  
2898  /*
2899   * The index operation will inquire about data_vio.record_name, providing (if the operation is
2900   * appropriate) advice from the data_vio's new_mapped fields. The advice found in the index (or
2901   * NULL if none) will be returned via receive_data_vio_dedupe_advice(). dedupe_context.status is
2902   * set to the return status code of any asynchronous index processing.
2903   */
query_index(struct data_vio * data_vio,enum uds_request_type operation)2904  static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
2905  {
2906  	int result;
2907  	struct dedupe_context *context;
2908  	struct vdo *vdo = vdo_from_data_vio(data_vio);
2909  	struct hash_zone *zone = data_vio->hash_zone;
2910  
2911  	assert_data_vio_in_hash_zone(data_vio);
2912  
2913  	if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
2914  		continue_data_vio(data_vio);
2915  		return;
2916  	}
2917  
2918  	context = acquire_context(zone);
2919  	if (context == NULL) {
2920  		atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
2921  		continue_data_vio(data_vio);
2922  		return;
2923  	}
2924  
2925  	data_vio->dedupe_context = context;
2926  	context->requestor = data_vio;
2927  	context->submission_jiffies = jiffies;
2928  	prepare_uds_request(&context->request, data_vio, operation);
2929  	atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
2930  	list_add_tail(&context->list_entry, &zone->pending);
2931  	start_expiration_timer(context);
2932  	result = uds_launch_request(&context->request);
2933  	if (result != UDS_SUCCESS) {
2934  		context->request.status = result;
2935  		finish_index_operation(&context->request);
2936  	}
2937  }
2938  
set_target_state(struct hash_zones * zones,enum index_state target,bool change_dedupe,bool dedupe,bool set_create)2939  static void set_target_state(struct hash_zones *zones, enum index_state target,
2940  			     bool change_dedupe, bool dedupe, bool set_create)
2941  {
2942  	const char *old_state, *new_state;
2943  
2944  	spin_lock(&zones->lock);
2945  	old_state = index_state_to_string(zones, zones->index_target);
2946  	if (change_dedupe)
2947  		WRITE_ONCE(zones->dedupe_flag, dedupe);
2948  
2949  	if (set_create)
2950  		zones->create_flag = true;
2951  
2952  	zones->index_target = target;
2953  	launch_dedupe_state_change(zones);
2954  	new_state = index_state_to_string(zones, zones->index_target);
2955  	spin_unlock(&zones->lock);
2956  
2957  	if (old_state != new_state)
2958  		vdo_log_info("Setting UDS index target state to %s", new_state);
2959  }
2960  
vdo_get_dedupe_index_state_name(struct hash_zones * zones)2961  const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
2962  {
2963  	const char *state;
2964  
2965  	spin_lock(&zones->lock);
2966  	state = index_state_to_string(zones, zones->index_state);
2967  	spin_unlock(&zones->lock);
2968  
2969  	return state;
2970  }
2971  
2972  /* Handle a dmsetup message relevant to the index. */
vdo_message_dedupe_index(struct hash_zones * zones,const char * name)2973  int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
2974  {
2975  	if (strcasecmp(name, "index-close") == 0) {
2976  		set_target_state(zones, IS_CLOSED, false, false, false);
2977  		return 0;
2978  	} else if (strcasecmp(name, "index-create") == 0) {
2979  		set_target_state(zones, IS_OPENED, false, false, true);
2980  		return 0;
2981  	} else if (strcasecmp(name, "index-disable") == 0) {
2982  		set_target_state(zones, IS_OPENED, true, false, false);
2983  		return 0;
2984  	} else if (strcasecmp(name, "index-enable") == 0) {
2985  		set_target_state(zones, IS_OPENED, true, true, false);
2986  		return 0;
2987  	}
2988  
2989  	return -EINVAL;
2990  }
2991  
vdo_set_dedupe_state_normal(struct hash_zones * zones)2992  void vdo_set_dedupe_state_normal(struct hash_zones *zones)
2993  {
2994  	vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2995  }
2996  
2997  /* If create_flag, create a new index without first attempting to load an existing index. */
vdo_start_dedupe_index(struct hash_zones * zones,bool create_flag)2998  void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
2999  {
3000  	set_target_state(zones, IS_OPENED, true, true, create_flag);
3001  }
3002