1  // SPDX-License-Identifier: GPL-2.0
2  /*
3   * Contains the core associated with submission side polling of the SQ
4   * ring, offloading submissions from the application to a kernel thread.
5   */
6  #include <linux/kernel.h>
7  #include <linux/errno.h>
8  #include <linux/file.h>
9  #include <linux/mm.h>
10  #include <linux/slab.h>
11  #include <linux/audit.h>
12  #include <linux/security.h>
13  #include <linux/cpuset.h>
14  #include <linux/io_uring.h>
15  
16  #include <uapi/linux/io_uring.h>
17  
18  #include "io_uring.h"
19  #include "napi.h"
20  #include "sqpoll.h"
21  
22  #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
23  #define IORING_TW_CAP_ENTRIES_VALUE	8
24  
25  enum {
26  	IO_SQ_THREAD_SHOULD_STOP = 0,
27  	IO_SQ_THREAD_SHOULD_PARK,
28  };
29  
io_sq_thread_unpark(struct io_sq_data * sqd)30  void io_sq_thread_unpark(struct io_sq_data *sqd)
31  	__releases(&sqd->lock)
32  {
33  	WARN_ON_ONCE(sqd->thread == current);
34  
35  	/*
36  	 * Do the dance but not conditional clear_bit() because it'd race with
37  	 * other threads incrementing park_pending and setting the bit.
38  	 */
39  	clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
40  	if (atomic_dec_return(&sqd->park_pending))
41  		set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
42  	mutex_unlock(&sqd->lock);
43  }
44  
io_sq_thread_park(struct io_sq_data * sqd)45  void io_sq_thread_park(struct io_sq_data *sqd)
46  	__acquires(&sqd->lock)
47  {
48  	WARN_ON_ONCE(data_race(sqd->thread) == current);
49  
50  	atomic_inc(&sqd->park_pending);
51  	set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
52  	mutex_lock(&sqd->lock);
53  	if (sqd->thread)
54  		wake_up_process(sqd->thread);
55  }
56  
io_sq_thread_stop(struct io_sq_data * sqd)57  void io_sq_thread_stop(struct io_sq_data *sqd)
58  {
59  	WARN_ON_ONCE(sqd->thread == current);
60  	WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
61  
62  	set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
63  	mutex_lock(&sqd->lock);
64  	if (sqd->thread)
65  		wake_up_process(sqd->thread);
66  	mutex_unlock(&sqd->lock);
67  	wait_for_completion(&sqd->exited);
68  }
69  
io_put_sq_data(struct io_sq_data * sqd)70  void io_put_sq_data(struct io_sq_data *sqd)
71  {
72  	if (refcount_dec_and_test(&sqd->refs)) {
73  		WARN_ON_ONCE(atomic_read(&sqd->park_pending));
74  
75  		io_sq_thread_stop(sqd);
76  		kfree(sqd);
77  	}
78  }
79  
io_sqd_update_thread_idle(struct io_sq_data * sqd)80  static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
81  {
82  	struct io_ring_ctx *ctx;
83  	unsigned sq_thread_idle = 0;
84  
85  	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
86  		sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
87  	sqd->sq_thread_idle = sq_thread_idle;
88  }
89  
io_sq_thread_finish(struct io_ring_ctx * ctx)90  void io_sq_thread_finish(struct io_ring_ctx *ctx)
91  {
92  	struct io_sq_data *sqd = ctx->sq_data;
93  
94  	if (sqd) {
95  		io_sq_thread_park(sqd);
96  		list_del_init(&ctx->sqd_list);
97  		io_sqd_update_thread_idle(sqd);
98  		io_sq_thread_unpark(sqd);
99  
100  		io_put_sq_data(sqd);
101  		ctx->sq_data = NULL;
102  	}
103  }
104  
io_attach_sq_data(struct io_uring_params * p)105  static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
106  {
107  	struct io_ring_ctx *ctx_attach;
108  	struct io_sq_data *sqd;
109  	struct fd f;
110  
111  	f = fdget(p->wq_fd);
112  	if (!fd_file(f))
113  		return ERR_PTR(-ENXIO);
114  	if (!io_is_uring_fops(fd_file(f))) {
115  		fdput(f);
116  		return ERR_PTR(-EINVAL);
117  	}
118  
119  	ctx_attach = fd_file(f)->private_data;
120  	sqd = ctx_attach->sq_data;
121  	if (!sqd) {
122  		fdput(f);
123  		return ERR_PTR(-EINVAL);
124  	}
125  	if (sqd->task_tgid != current->tgid) {
126  		fdput(f);
127  		return ERR_PTR(-EPERM);
128  	}
129  
130  	refcount_inc(&sqd->refs);
131  	fdput(f);
132  	return sqd;
133  }
134  
io_get_sq_data(struct io_uring_params * p,bool * attached)135  static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
136  					 bool *attached)
137  {
138  	struct io_sq_data *sqd;
139  
140  	*attached = false;
141  	if (p->flags & IORING_SETUP_ATTACH_WQ) {
142  		sqd = io_attach_sq_data(p);
143  		if (!IS_ERR(sqd)) {
144  			*attached = true;
145  			return sqd;
146  		}
147  		/* fall through for EPERM case, setup new sqd/task */
148  		if (PTR_ERR(sqd) != -EPERM)
149  			return sqd;
150  	}
151  
152  	sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
153  	if (!sqd)
154  		return ERR_PTR(-ENOMEM);
155  
156  	atomic_set(&sqd->park_pending, 0);
157  	refcount_set(&sqd->refs, 1);
158  	INIT_LIST_HEAD(&sqd->ctx_list);
159  	mutex_init(&sqd->lock);
160  	init_waitqueue_head(&sqd->wait);
161  	init_completion(&sqd->exited);
162  	return sqd;
163  }
164  
io_sqd_events_pending(struct io_sq_data * sqd)165  static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
166  {
167  	return READ_ONCE(sqd->state);
168  }
169  
__io_sq_thread(struct io_ring_ctx * ctx,bool cap_entries)170  static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
171  {
172  	unsigned int to_submit;
173  	int ret = 0;
174  
175  	to_submit = io_sqring_entries(ctx);
176  	/* if we're handling multiple rings, cap submit size for fairness */
177  	if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
178  		to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
179  
180  	if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
181  		const struct cred *creds = NULL;
182  
183  		if (ctx->sq_creds != current_cred())
184  			creds = override_creds(ctx->sq_creds);
185  
186  		mutex_lock(&ctx->uring_lock);
187  		if (!wq_list_empty(&ctx->iopoll_list))
188  			io_do_iopoll(ctx, true);
189  
190  		/*
191  		 * Don't submit if refs are dying, good for io_uring_register(),
192  		 * but also it is relied upon by io_ring_exit_work()
193  		 */
194  		if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
195  		    !(ctx->flags & IORING_SETUP_R_DISABLED))
196  			ret = io_submit_sqes(ctx, to_submit);
197  		mutex_unlock(&ctx->uring_lock);
198  
199  		if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
200  			wake_up(&ctx->sqo_sq_wait);
201  		if (creds)
202  			revert_creds(creds);
203  	}
204  
205  	return ret;
206  }
207  
io_sqd_handle_event(struct io_sq_data * sqd)208  static bool io_sqd_handle_event(struct io_sq_data *sqd)
209  {
210  	bool did_sig = false;
211  	struct ksignal ksig;
212  
213  	if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
214  	    signal_pending(current)) {
215  		mutex_unlock(&sqd->lock);
216  		if (signal_pending(current))
217  			did_sig = get_signal(&ksig);
218  		cond_resched();
219  		mutex_lock(&sqd->lock);
220  		sqd->sq_cpu = raw_smp_processor_id();
221  	}
222  	return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
223  }
224  
225  /*
226   * Run task_work, processing the retry_list first. The retry_list holds
227   * entries that we passed on in the previous run, if we had more task_work
228   * than we were asked to process. Newly queued task_work isn't run until the
229   * retry list has been fully processed.
230   */
io_sq_tw(struct llist_node ** retry_list,int max_entries)231  static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
232  {
233  	struct io_uring_task *tctx = current->io_uring;
234  	unsigned int count = 0;
235  
236  	if (*retry_list) {
237  		*retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
238  		if (count >= max_entries)
239  			goto out;
240  		max_entries -= count;
241  	}
242  	*retry_list = tctx_task_work_run(tctx, max_entries, &count);
243  out:
244  	if (task_work_pending(current))
245  		task_work_run();
246  	return count;
247  }
248  
io_sq_tw_pending(struct llist_node * retry_list)249  static bool io_sq_tw_pending(struct llist_node *retry_list)
250  {
251  	struct io_uring_task *tctx = current->io_uring;
252  
253  	return retry_list || !llist_empty(&tctx->task_list);
254  }
255  
io_sq_update_worktime(struct io_sq_data * sqd,struct rusage * start)256  static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
257  {
258  	struct rusage end;
259  
260  	getrusage(current, RUSAGE_SELF, &end);
261  	end.ru_stime.tv_sec -= start->ru_stime.tv_sec;
262  	end.ru_stime.tv_usec -= start->ru_stime.tv_usec;
263  
264  	sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000;
265  }
266  
io_sq_thread(void * data)267  static int io_sq_thread(void *data)
268  {
269  	struct llist_node *retry_list = NULL;
270  	struct io_sq_data *sqd = data;
271  	struct io_ring_ctx *ctx;
272  	struct rusage start;
273  	unsigned long timeout = 0;
274  	char buf[TASK_COMM_LEN];
275  	DEFINE_WAIT(wait);
276  
277  	/* offload context creation failed, just exit */
278  	if (!current->io_uring)
279  		goto err_out;
280  
281  	snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
282  	set_task_comm(current, buf);
283  
284  	/* reset to our pid after we've set task_comm, for fdinfo */
285  	sqd->task_pid = current->pid;
286  
287  	if (sqd->sq_cpu != -1) {
288  		set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
289  	} else {
290  		set_cpus_allowed_ptr(current, cpu_online_mask);
291  		sqd->sq_cpu = raw_smp_processor_id();
292  	}
293  
294  	/*
295  	 * Force audit context to get setup, in case we do prep side async
296  	 * operations that would trigger an audit call before any issue side
297  	 * audit has been done.
298  	 */
299  	audit_uring_entry(IORING_OP_NOP);
300  	audit_uring_exit(true, 0);
301  
302  	mutex_lock(&sqd->lock);
303  	while (1) {
304  		bool cap_entries, sqt_spin = false;
305  
306  		if (io_sqd_events_pending(sqd) || signal_pending(current)) {
307  			if (io_sqd_handle_event(sqd))
308  				break;
309  			timeout = jiffies + sqd->sq_thread_idle;
310  		}
311  
312  		cap_entries = !list_is_singular(&sqd->ctx_list);
313  		getrusage(current, RUSAGE_SELF, &start);
314  		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
315  			int ret = __io_sq_thread(ctx, cap_entries);
316  
317  			if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
318  				sqt_spin = true;
319  		}
320  		if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
321  			sqt_spin = true;
322  
323  		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
324  			if (io_napi(ctx))
325  				io_napi_sqpoll_busy_poll(ctx);
326  
327  		if (sqt_spin || !time_after(jiffies, timeout)) {
328  			if (sqt_spin) {
329  				io_sq_update_worktime(sqd, &start);
330  				timeout = jiffies + sqd->sq_thread_idle;
331  			}
332  			if (unlikely(need_resched())) {
333  				mutex_unlock(&sqd->lock);
334  				cond_resched();
335  				mutex_lock(&sqd->lock);
336  				sqd->sq_cpu = raw_smp_processor_id();
337  			}
338  			continue;
339  		}
340  
341  		prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
342  		if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
343  			bool needs_sched = true;
344  
345  			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
346  				atomic_or(IORING_SQ_NEED_WAKEUP,
347  						&ctx->rings->sq_flags);
348  				if ((ctx->flags & IORING_SETUP_IOPOLL) &&
349  				    !wq_list_empty(&ctx->iopoll_list)) {
350  					needs_sched = false;
351  					break;
352  				}
353  
354  				/*
355  				 * Ensure the store of the wakeup flag is not
356  				 * reordered with the load of the SQ tail
357  				 */
358  				smp_mb__after_atomic();
359  
360  				if (io_sqring_entries(ctx)) {
361  					needs_sched = false;
362  					break;
363  				}
364  			}
365  
366  			if (needs_sched) {
367  				mutex_unlock(&sqd->lock);
368  				schedule();
369  				mutex_lock(&sqd->lock);
370  				sqd->sq_cpu = raw_smp_processor_id();
371  			}
372  			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
373  				atomic_andnot(IORING_SQ_NEED_WAKEUP,
374  						&ctx->rings->sq_flags);
375  		}
376  
377  		finish_wait(&sqd->wait, &wait);
378  		timeout = jiffies + sqd->sq_thread_idle;
379  	}
380  
381  	if (retry_list)
382  		io_sq_tw(&retry_list, UINT_MAX);
383  
384  	io_uring_cancel_generic(true, sqd);
385  	sqd->thread = NULL;
386  	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
387  		atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
388  	io_run_task_work();
389  	mutex_unlock(&sqd->lock);
390  err_out:
391  	complete(&sqd->exited);
392  	do_exit(0);
393  }
394  
io_sqpoll_wait_sq(struct io_ring_ctx * ctx)395  void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
396  {
397  	DEFINE_WAIT(wait);
398  
399  	do {
400  		if (!io_sqring_full(ctx))
401  			break;
402  		prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
403  
404  		if (!io_sqring_full(ctx))
405  			break;
406  		schedule();
407  	} while (!signal_pending(current));
408  
409  	finish_wait(&ctx->sqo_sq_wait, &wait);
410  }
411  
io_sq_offload_create(struct io_ring_ctx * ctx,struct io_uring_params * p)412  __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
413  				struct io_uring_params *p)
414  {
415  	int ret;
416  
417  	/* Retain compatibility with failing for an invalid attach attempt */
418  	if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
419  				IORING_SETUP_ATTACH_WQ) {
420  		struct fd f;
421  
422  		f = fdget(p->wq_fd);
423  		if (!fd_file(f))
424  			return -ENXIO;
425  		if (!io_is_uring_fops(fd_file(f))) {
426  			fdput(f);
427  			return -EINVAL;
428  		}
429  		fdput(f);
430  	}
431  	if (ctx->flags & IORING_SETUP_SQPOLL) {
432  		struct task_struct *tsk;
433  		struct io_sq_data *sqd;
434  		bool attached;
435  
436  		ret = security_uring_sqpoll();
437  		if (ret)
438  			return ret;
439  
440  		sqd = io_get_sq_data(p, &attached);
441  		if (IS_ERR(sqd)) {
442  			ret = PTR_ERR(sqd);
443  			goto err;
444  		}
445  
446  		ctx->sq_creds = get_current_cred();
447  		ctx->sq_data = sqd;
448  		ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
449  		if (!ctx->sq_thread_idle)
450  			ctx->sq_thread_idle = HZ;
451  
452  		io_sq_thread_park(sqd);
453  		list_add(&ctx->sqd_list, &sqd->ctx_list);
454  		io_sqd_update_thread_idle(sqd);
455  		/* don't attach to a dying SQPOLL thread, would be racy */
456  		ret = (attached && !sqd->thread) ? -ENXIO : 0;
457  		io_sq_thread_unpark(sqd);
458  
459  		if (ret < 0)
460  			goto err;
461  		if (attached)
462  			return 0;
463  
464  		if (p->flags & IORING_SETUP_SQ_AFF) {
465  			cpumask_var_t allowed_mask;
466  			int cpu = p->sq_thread_cpu;
467  
468  			ret = -EINVAL;
469  			if (cpu >= nr_cpu_ids || !cpu_online(cpu))
470  				goto err_sqpoll;
471  			ret = -ENOMEM;
472  			if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
473  				goto err_sqpoll;
474  			ret = -EINVAL;
475  			cpuset_cpus_allowed(current, allowed_mask);
476  			if (!cpumask_test_cpu(cpu, allowed_mask)) {
477  				free_cpumask_var(allowed_mask);
478  				goto err_sqpoll;
479  			}
480  			free_cpumask_var(allowed_mask);
481  			sqd->sq_cpu = cpu;
482  		} else {
483  			sqd->sq_cpu = -1;
484  		}
485  
486  		sqd->task_pid = current->pid;
487  		sqd->task_tgid = current->tgid;
488  		tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
489  		if (IS_ERR(tsk)) {
490  			ret = PTR_ERR(tsk);
491  			goto err_sqpoll;
492  		}
493  
494  		sqd->thread = tsk;
495  		ret = io_uring_alloc_task_context(tsk, ctx);
496  		wake_up_new_task(tsk);
497  		if (ret)
498  			goto err;
499  	} else if (p->flags & IORING_SETUP_SQ_AFF) {
500  		/* Can't have SQ_AFF without SQPOLL */
501  		ret = -EINVAL;
502  		goto err;
503  	}
504  
505  	return 0;
506  err_sqpoll:
507  	complete(&ctx->sq_data->exited);
508  err:
509  	io_sq_thread_finish(ctx);
510  	return ret;
511  }
512  
io_sqpoll_wq_cpu_affinity(struct io_ring_ctx * ctx,cpumask_var_t mask)513  __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
514  				     cpumask_var_t mask)
515  {
516  	struct io_sq_data *sqd = ctx->sq_data;
517  	int ret = -EINVAL;
518  
519  	if (sqd) {
520  		io_sq_thread_park(sqd);
521  		/* Don't set affinity for a dying thread */
522  		if (sqd->thread)
523  			ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask);
524  		io_sq_thread_unpark(sqd);
525  	}
526  
527  	return ret;
528  }
529