1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "funnel-requestqueue.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/compiler.h>
10 #include <linux/wait.h>
11 
12 #include "funnel-queue.h"
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "thread-utils.h"
16 
17 /*
18  * This queue will attempt to handle requests in reasonably sized batches instead of reacting
19  * immediately to each new request. The wait time between batches is dynamically adjusted up or
20  * down to try to balance responsiveness against wasted thread run time.
21  *
22  * If the wait time becomes long enough, the queue will become dormant and must be explicitly
23  * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
24  * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
25  * wakeup of the worker thread.
26  *
27  * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
28  * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
29  * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
30  * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
31  * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
32  * kick the worker thread out of dormant mode and back into timer-based mode.
33  *
34  * Unbatched requests are used to communicate between different zone threads and will also cause
35  * the queue to awaken immediately.
36  */
37 
38 enum {
39 	NANOSECOND = 1,
40 	MICROSECOND = 1000 * NANOSECOND,
41 	MILLISECOND = 1000 * MICROSECOND,
42 	DEFAULT_WAIT_TIME = 20 * MICROSECOND,
43 	MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
44 	MAXIMUM_WAIT_TIME = MILLISECOND,
45 	MINIMUM_BATCH = 32,
46 	MAXIMUM_BATCH = 64,
47 };
48 
49 struct uds_request_queue {
50 	/* Wait queue for synchronizing producers and consumer */
51 	struct wait_queue_head wait_head;
52 	/* Function to process a request */
53 	uds_request_queue_processor_fn processor;
54 	/* Queue of new incoming requests */
55 	struct funnel_queue *main_queue;
56 	/* Queue of old requests to retry */
57 	struct funnel_queue *retry_queue;
58 	/* The thread id of the worker thread */
59 	struct thread *thread;
60 	/* True if the worker was started */
61 	bool started;
62 	/* When true, requests can be enqueued */
63 	bool running;
64 	/* A flag set when the worker is waiting without a timeout */
65 	atomic_t dormant;
66 };
67 
poll_queues(struct uds_request_queue * queue)68 static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
69 {
70 	struct funnel_queue_entry *entry;
71 
72 	entry = vdo_funnel_queue_poll(queue->retry_queue);
73 	if (entry != NULL)
74 		return container_of(entry, struct uds_request, queue_link);
75 
76 	entry = vdo_funnel_queue_poll(queue->main_queue);
77 	if (entry != NULL)
78 		return container_of(entry, struct uds_request, queue_link);
79 
80 	return NULL;
81 }
82 
are_queues_idle(struct uds_request_queue * queue)83 static inline bool are_queues_idle(struct uds_request_queue *queue)
84 {
85 	return vdo_is_funnel_queue_idle(queue->retry_queue) &&
86 	       vdo_is_funnel_queue_idle(queue->main_queue);
87 }
88 
89 /*
90  * Determine if there is a next request to process, and return it if there is. Also return flags
91  * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
92  * the thread did sleep before returning a new request.
93  */
dequeue_request(struct uds_request_queue * queue,struct uds_request ** request_ptr,bool * waited_ptr)94 static inline bool dequeue_request(struct uds_request_queue *queue,
95 				   struct uds_request **request_ptr, bool *waited_ptr)
96 {
97 	struct uds_request *request = poll_queues(queue);
98 
99 	if (request != NULL) {
100 		*request_ptr = request;
101 		return true;
102 	}
103 
104 	if (!READ_ONCE(queue->running)) {
105 		/* Wake the worker thread so it can exit. */
106 		*request_ptr = NULL;
107 		return true;
108 	}
109 
110 	*request_ptr = NULL;
111 	*waited_ptr = true;
112 	return false;
113 }
114 
wait_for_request(struct uds_request_queue * queue,bool dormant,unsigned long timeout,struct uds_request ** request,bool * waited)115 static void wait_for_request(struct uds_request_queue *queue, bool dormant,
116 			     unsigned long timeout, struct uds_request **request,
117 			     bool *waited)
118 {
119 	if (dormant) {
120 		wait_event_interruptible(queue->wait_head,
121 					 (dequeue_request(queue, request, waited) ||
122 					  !are_queues_idle(queue)));
123 		return;
124 	}
125 
126 	wait_event_interruptible_hrtimeout(queue->wait_head,
127 					   dequeue_request(queue, request, waited),
128 					   ns_to_ktime(timeout));
129 }
130 
request_queue_worker(void * arg)131 static void request_queue_worker(void *arg)
132 {
133 	struct uds_request_queue *queue = arg;
134 	struct uds_request *request = NULL;
135 	unsigned long time_batch = DEFAULT_WAIT_TIME;
136 	bool dormant = atomic_read(&queue->dormant);
137 	bool waited = false;
138 	long current_batch = 0;
139 
140 	for (;;) {
141 		wait_for_request(queue, dormant, time_batch, &request, &waited);
142 		if (likely(request != NULL)) {
143 			current_batch++;
144 			queue->processor(request);
145 		} else if (!READ_ONCE(queue->running)) {
146 			break;
147 		}
148 
149 		if (dormant) {
150 			/*
151 			 * The queue has been roused from dormancy. Clear the flag so enqueuers can
152 			 * stop broadcasting. No fence is needed for this transition.
153 			 */
154 			atomic_set(&queue->dormant, false);
155 			dormant = false;
156 			time_batch = DEFAULT_WAIT_TIME;
157 		} else if (waited) {
158 			/*
159 			 * We waited for this request to show up. Adjust the wait time to smooth
160 			 * out the batch size.
161 			 */
162 			if (current_batch < MINIMUM_BATCH) {
163 				/*
164 				 * If the last batch of requests was too small, increase the wait
165 				 * time.
166 				 */
167 				time_batch += time_batch / 4;
168 				if (time_batch >= MAXIMUM_WAIT_TIME) {
169 					atomic_set(&queue->dormant, true);
170 					dormant = true;
171 				}
172 			} else if (current_batch > MAXIMUM_BATCH) {
173 				/*
174 				 * If the last batch of requests was too large, decrease the wait
175 				 * time.
176 				 */
177 				time_batch -= time_batch / 4;
178 				if (time_batch < MINIMUM_WAIT_TIME)
179 					time_batch = MINIMUM_WAIT_TIME;
180 			}
181 			current_batch = 0;
182 		}
183 	}
184 
185 	/*
186 	 * Ensure that we process any remaining requests that were enqueued before trying to shut
187 	 * down. The corresponding write barrier is in uds_request_queue_finish().
188 	 */
189 	smp_rmb();
190 	while ((request = poll_queues(queue)) != NULL)
191 		queue->processor(request);
192 }
193 
uds_make_request_queue(const char * queue_name,uds_request_queue_processor_fn processor,struct uds_request_queue ** queue_ptr)194 int uds_make_request_queue(const char *queue_name,
195 			   uds_request_queue_processor_fn processor,
196 			   struct uds_request_queue **queue_ptr)
197 {
198 	int result;
199 	struct uds_request_queue *queue;
200 
201 	result = vdo_allocate(1, struct uds_request_queue, __func__, &queue);
202 	if (result != VDO_SUCCESS)
203 		return result;
204 
205 	queue->processor = processor;
206 	queue->running = true;
207 	atomic_set(&queue->dormant, false);
208 	init_waitqueue_head(&queue->wait_head);
209 
210 	result = vdo_make_funnel_queue(&queue->main_queue);
211 	if (result != VDO_SUCCESS) {
212 		uds_request_queue_finish(queue);
213 		return result;
214 	}
215 
216 	result = vdo_make_funnel_queue(&queue->retry_queue);
217 	if (result != VDO_SUCCESS) {
218 		uds_request_queue_finish(queue);
219 		return result;
220 	}
221 
222 	result = vdo_create_thread(request_queue_worker, queue, queue_name,
223 				   &queue->thread);
224 	if (result != VDO_SUCCESS) {
225 		uds_request_queue_finish(queue);
226 		return result;
227 	}
228 
229 	queue->started = true;
230 	*queue_ptr = queue;
231 	return UDS_SUCCESS;
232 }
233 
wake_up_worker(struct uds_request_queue * queue)234 static inline void wake_up_worker(struct uds_request_queue *queue)
235 {
236 	if (wq_has_sleeper(&queue->wait_head))
237 		wake_up(&queue->wait_head);
238 }
239 
uds_request_queue_enqueue(struct uds_request_queue * queue,struct uds_request * request)240 void uds_request_queue_enqueue(struct uds_request_queue *queue,
241 			       struct uds_request *request)
242 {
243 	struct funnel_queue *sub_queue;
244 	bool unbatched = request->unbatched;
245 
246 	sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
247 	vdo_funnel_queue_put(sub_queue, &request->queue_link);
248 
249 	/*
250 	 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
251 	 * we know the queue operation acts as one.
252 	 */
253 	if (atomic_read(&queue->dormant) || unbatched)
254 		wake_up_worker(queue);
255 }
256 
uds_request_queue_finish(struct uds_request_queue * queue)257 void uds_request_queue_finish(struct uds_request_queue *queue)
258 {
259 	if (queue == NULL)
260 		return;
261 
262 	/*
263 	 * This memory barrier ensures that any requests we queued will be seen. The point is that
264 	 * when dequeue_request() sees the following update to the running flag, it will also be
265 	 * able to see any change we made to a next field in the funnel queue entry. The
266 	 * corresponding read barrier is in request_queue_worker().
267 	 */
268 	smp_wmb();
269 	WRITE_ONCE(queue->running, false);
270 
271 	if (queue->started) {
272 		wake_up_worker(queue);
273 		vdo_join_threads(queue->thread);
274 	}
275 
276 	vdo_free_funnel_queue(queue->main_queue);
277 	vdo_free_funnel_queue(queue->retry_queue);
278 	vdo_free(queue);
279 }
280