1  // SPDX-License-Identifier: GPL-2.0
2  /*
3   * Copyright (C) 2007 Oracle.  All rights reserved.
4   * Copyright (C) 2014 Fujitsu.  All rights reserved.
5   */
6  
7  #include <linux/kthread.h>
8  #include <linux/slab.h>
9  #include <linux/list.h>
10  #include <linux/spinlock.h>
11  #include <linux/freezer.h>
12  #include <trace/events/btrfs.h>
13  #include "async-thread.h"
14  
15  enum {
16  	WORK_DONE_BIT,
17  	WORK_ORDER_DONE_BIT,
18  };
19  
20  #define NO_THRESHOLD (-1)
21  #define DFT_THRESHOLD (32)
22  
23  struct btrfs_workqueue {
24  	struct workqueue_struct *normal_wq;
25  
26  	/* File system this workqueue services */
27  	struct btrfs_fs_info *fs_info;
28  
29  	/* List head pointing to ordered work list */
30  	struct list_head ordered_list;
31  
32  	/* Spinlock for ordered_list */
33  	spinlock_t list_lock;
34  
35  	/* Thresholding related variants */
36  	atomic_t pending;
37  
38  	/* Up limit of concurrency workers */
39  	int limit_active;
40  
41  	/* Current number of concurrency workers */
42  	int current_active;
43  
44  	/* Threshold to change current_active */
45  	int thresh;
46  	unsigned int count;
47  	spinlock_t thres_lock;
48  };
49  
btrfs_workqueue_owner(const struct btrfs_workqueue * wq)50  struct btrfs_fs_info * __pure btrfs_workqueue_owner(const struct btrfs_workqueue *wq)
51  {
52  	return wq->fs_info;
53  }
54  
btrfs_work_owner(const struct btrfs_work * work)55  struct btrfs_fs_info * __pure btrfs_work_owner(const struct btrfs_work *work)
56  {
57  	return work->wq->fs_info;
58  }
59  
btrfs_workqueue_normal_congested(const struct btrfs_workqueue * wq)60  bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq)
61  {
62  	/*
63  	 * We could compare wq->pending with num_online_cpus()
64  	 * to support "thresh == NO_THRESHOLD" case, but it requires
65  	 * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's
66  	 * postpone it until someone needs the support of that case.
67  	 */
68  	if (wq->thresh == NO_THRESHOLD)
69  		return false;
70  
71  	return atomic_read(&wq->pending) > wq->thresh * 2;
72  }
73  
btrfs_init_workqueue(struct btrfs_workqueue * wq,struct btrfs_fs_info * fs_info)74  static void btrfs_init_workqueue(struct btrfs_workqueue *wq,
75  				 struct btrfs_fs_info *fs_info)
76  {
77  	wq->fs_info = fs_info;
78  	atomic_set(&wq->pending, 0);
79  	INIT_LIST_HEAD(&wq->ordered_list);
80  	spin_lock_init(&wq->list_lock);
81  	spin_lock_init(&wq->thres_lock);
82  }
83  
btrfs_alloc_workqueue(struct btrfs_fs_info * fs_info,const char * name,unsigned int flags,int limit_active,int thresh)84  struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info,
85  					      const char *name, unsigned int flags,
86  					      int limit_active, int thresh)
87  {
88  	struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
89  
90  	if (!ret)
91  		return NULL;
92  
93  	btrfs_init_workqueue(ret, fs_info);
94  
95  	ret->limit_active = limit_active;
96  	if (thresh == 0)
97  		thresh = DFT_THRESHOLD;
98  	/* For low threshold, disabling threshold is a better choice */
99  	if (thresh < DFT_THRESHOLD) {
100  		ret->current_active = limit_active;
101  		ret->thresh = NO_THRESHOLD;
102  	} else {
103  		/*
104  		 * For threshold-able wq, let its concurrency grow on demand.
105  		 * Use minimal max_active at alloc time to reduce resource
106  		 * usage.
107  		 */
108  		ret->current_active = 1;
109  		ret->thresh = thresh;
110  	}
111  
112  	ret->normal_wq = alloc_workqueue("btrfs-%s", flags, ret->current_active,
113  					 name);
114  	if (!ret->normal_wq) {
115  		kfree(ret);
116  		return NULL;
117  	}
118  
119  	trace_btrfs_workqueue_alloc(ret, name);
120  	return ret;
121  }
122  
btrfs_alloc_ordered_workqueue(struct btrfs_fs_info * fs_info,const char * name,unsigned int flags)123  struct btrfs_workqueue *btrfs_alloc_ordered_workqueue(
124  				struct btrfs_fs_info *fs_info, const char *name,
125  				unsigned int flags)
126  {
127  	struct btrfs_workqueue *ret;
128  
129  	ret = kzalloc(sizeof(*ret), GFP_KERNEL);
130  	if (!ret)
131  		return NULL;
132  
133  	btrfs_init_workqueue(ret, fs_info);
134  
135  	/* Ordered workqueues don't allow @max_active adjustments. */
136  	ret->limit_active = 1;
137  	ret->current_active = 1;
138  	ret->thresh = NO_THRESHOLD;
139  
140  	ret->normal_wq = alloc_ordered_workqueue("btrfs-%s", flags, name);
141  	if (!ret->normal_wq) {
142  		kfree(ret);
143  		return NULL;
144  	}
145  
146  	trace_btrfs_workqueue_alloc(ret, name);
147  	return ret;
148  }
149  
150  /*
151   * Hook for threshold which will be called in btrfs_queue_work.
152   * This hook WILL be called in IRQ handler context,
153   * so workqueue_set_max_active MUST NOT be called in this hook
154   */
thresh_queue_hook(struct btrfs_workqueue * wq)155  static inline void thresh_queue_hook(struct btrfs_workqueue *wq)
156  {
157  	if (wq->thresh == NO_THRESHOLD)
158  		return;
159  	atomic_inc(&wq->pending);
160  }
161  
162  /*
163   * Hook for threshold which will be called before executing the work,
164   * This hook is called in kthread content.
165   * So workqueue_set_max_active is called here.
166   */
thresh_exec_hook(struct btrfs_workqueue * wq)167  static inline void thresh_exec_hook(struct btrfs_workqueue *wq)
168  {
169  	int new_current_active;
170  	long pending;
171  	int need_change = 0;
172  
173  	if (wq->thresh == NO_THRESHOLD)
174  		return;
175  
176  	atomic_dec(&wq->pending);
177  	spin_lock(&wq->thres_lock);
178  	/*
179  	 * Use wq->count to limit the calling frequency of
180  	 * workqueue_set_max_active.
181  	 */
182  	wq->count++;
183  	wq->count %= (wq->thresh / 4);
184  	if (!wq->count)
185  		goto  out;
186  	new_current_active = wq->current_active;
187  
188  	/*
189  	 * pending may be changed later, but it's OK since we really
190  	 * don't need it so accurate to calculate new_max_active.
191  	 */
192  	pending = atomic_read(&wq->pending);
193  	if (pending > wq->thresh)
194  		new_current_active++;
195  	if (pending < wq->thresh / 2)
196  		new_current_active--;
197  	new_current_active = clamp_val(new_current_active, 1, wq->limit_active);
198  	if (new_current_active != wq->current_active)  {
199  		need_change = 1;
200  		wq->current_active = new_current_active;
201  	}
202  out:
203  	spin_unlock(&wq->thres_lock);
204  
205  	if (need_change) {
206  		workqueue_set_max_active(wq->normal_wq, wq->current_active);
207  	}
208  }
209  
run_ordered_work(struct btrfs_workqueue * wq,struct btrfs_work * self)210  static void run_ordered_work(struct btrfs_workqueue *wq,
211  			     struct btrfs_work *self)
212  {
213  	struct list_head *list = &wq->ordered_list;
214  	struct btrfs_work *work;
215  	spinlock_t *lock = &wq->list_lock;
216  	unsigned long flags;
217  	bool free_self = false;
218  
219  	while (1) {
220  		spin_lock_irqsave(lock, flags);
221  		if (list_empty(list))
222  			break;
223  		work = list_entry(list->next, struct btrfs_work,
224  				  ordered_list);
225  		if (!test_bit(WORK_DONE_BIT, &work->flags))
226  			break;
227  		/*
228  		 * Orders all subsequent loads after reading WORK_DONE_BIT,
229  		 * paired with the smp_mb__before_atomic in btrfs_work_helper
230  		 * this guarantees that the ordered function will see all
231  		 * updates from ordinary work function.
232  		 */
233  		smp_rmb();
234  
235  		/*
236  		 * we are going to call the ordered done function, but
237  		 * we leave the work item on the list as a barrier so
238  		 * that later work items that are done don't have their
239  		 * functions called before this one returns
240  		 */
241  		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
242  			break;
243  		trace_btrfs_ordered_sched(work);
244  		spin_unlock_irqrestore(lock, flags);
245  		work->ordered_func(work, false);
246  
247  		/* now take the lock again and drop our item from the list */
248  		spin_lock_irqsave(lock, flags);
249  		list_del(&work->ordered_list);
250  		spin_unlock_irqrestore(lock, flags);
251  
252  		if (work == self) {
253  			/*
254  			 * This is the work item that the worker is currently
255  			 * executing.
256  			 *
257  			 * The kernel workqueue code guarantees non-reentrancy
258  			 * of work items. I.e., if a work item with the same
259  			 * address and work function is queued twice, the second
260  			 * execution is blocked until the first one finishes. A
261  			 * work item may be freed and recycled with the same
262  			 * work function; the workqueue code assumes that the
263  			 * original work item cannot depend on the recycled work
264  			 * item in that case (see find_worker_executing_work()).
265  			 *
266  			 * Note that different types of Btrfs work can depend on
267  			 * each other, and one type of work on one Btrfs
268  			 * filesystem may even depend on the same type of work
269  			 * on another Btrfs filesystem via, e.g., a loop device.
270  			 * Therefore, we must not allow the current work item to
271  			 * be recycled until we are really done, otherwise we
272  			 * break the above assumption and can deadlock.
273  			 */
274  			free_self = true;
275  		} else {
276  			/*
277  			 * We don't want to call the ordered free functions with
278  			 * the lock held.
279  			 */
280  			work->ordered_func(work, true);
281  			/* NB: work must not be dereferenced past this point. */
282  			trace_btrfs_all_work_done(wq->fs_info, work);
283  		}
284  	}
285  	spin_unlock_irqrestore(lock, flags);
286  
287  	if (free_self) {
288  		self->ordered_func(self, true);
289  		/* NB: self must not be dereferenced past this point. */
290  		trace_btrfs_all_work_done(wq->fs_info, self);
291  	}
292  }
293  
btrfs_work_helper(struct work_struct * normal_work)294  static void btrfs_work_helper(struct work_struct *normal_work)
295  {
296  	struct btrfs_work *work = container_of(normal_work, struct btrfs_work,
297  					       normal_work);
298  	struct btrfs_workqueue *wq = work->wq;
299  	int need_order = 0;
300  
301  	/*
302  	 * We should not touch things inside work in the following cases:
303  	 * 1) after work->func() if it has no ordered_func(..., true) to free
304  	 *    Since the struct is freed in work->func().
305  	 * 2) after setting WORK_DONE_BIT
306  	 *    The work may be freed in other threads almost instantly.
307  	 * So we save the needed things here.
308  	 */
309  	if (work->ordered_func)
310  		need_order = 1;
311  
312  	trace_btrfs_work_sched(work);
313  	thresh_exec_hook(wq);
314  	work->func(work);
315  	if (need_order) {
316  		/*
317  		 * Ensures all memory accesses done in the work function are
318  		 * ordered before setting the WORK_DONE_BIT. Ensuring the thread
319  		 * which is going to executed the ordered work sees them.
320  		 * Pairs with the smp_rmb in run_ordered_work.
321  		 */
322  		smp_mb__before_atomic();
323  		set_bit(WORK_DONE_BIT, &work->flags);
324  		run_ordered_work(wq, work);
325  	} else {
326  		/* NB: work must not be dereferenced past this point. */
327  		trace_btrfs_all_work_done(wq->fs_info, work);
328  	}
329  }
330  
btrfs_init_work(struct btrfs_work * work,btrfs_func_t func,btrfs_ordered_func_t ordered_func)331  void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func,
332  		     btrfs_ordered_func_t ordered_func)
333  {
334  	work->func = func;
335  	work->ordered_func = ordered_func;
336  	INIT_WORK(&work->normal_work, btrfs_work_helper);
337  	INIT_LIST_HEAD(&work->ordered_list);
338  	work->flags = 0;
339  }
340  
btrfs_queue_work(struct btrfs_workqueue * wq,struct btrfs_work * work)341  void btrfs_queue_work(struct btrfs_workqueue *wq, struct btrfs_work *work)
342  {
343  	unsigned long flags;
344  
345  	work->wq = wq;
346  	thresh_queue_hook(wq);
347  	if (work->ordered_func) {
348  		spin_lock_irqsave(&wq->list_lock, flags);
349  		list_add_tail(&work->ordered_list, &wq->ordered_list);
350  		spin_unlock_irqrestore(&wq->list_lock, flags);
351  	}
352  	trace_btrfs_work_queued(work);
353  	queue_work(wq->normal_wq, &work->normal_work);
354  }
355  
btrfs_destroy_workqueue(struct btrfs_workqueue * wq)356  void btrfs_destroy_workqueue(struct btrfs_workqueue *wq)
357  {
358  	if (!wq)
359  		return;
360  	destroy_workqueue(wq->normal_wq);
361  	trace_btrfs_workqueue_destroy(wq);
362  	kfree(wq);
363  }
364  
btrfs_workqueue_set_max(struct btrfs_workqueue * wq,int limit_active)365  void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active)
366  {
367  	if (wq)
368  		wq->limit_active = limit_active;
369  }
370  
btrfs_flush_workqueue(struct btrfs_workqueue * wq)371  void btrfs_flush_workqueue(struct btrfs_workqueue *wq)
372  {
373  	flush_workqueue(wq->normal_wq);
374  }
375