FreeRDP
Loading...
Searching...
No Matches
MessageQueue.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/sysinfo.h>
24#include <winpr/assert.h>
25
26#include <winpr/collections.h>
27
28struct s_wMessageQueue
29{
30 size_t head;
31 size_t tail;
32 size_t size;
33 size_t capacity;
34 BOOL closed;
35 wMessage* array;
37 HANDLE event;
38
39 wObject object;
40};
41
51wObject* MessageQueue_Object(wMessageQueue* queue)
52{
53 WINPR_ASSERT(queue);
54 return &queue->object;
55}
56
61HANDLE MessageQueue_Event(wMessageQueue* queue)
62{
63 WINPR_ASSERT(queue);
64 return queue->event;
65}
66
71size_t MessageQueue_Size(wMessageQueue* queue)
72{
73 WINPR_ASSERT(queue);
74 EnterCriticalSection(&queue->lock);
75 const size_t ret = queue->size;
76 LeaveCriticalSection(&queue->lock);
77 return ret;
78}
79
84BOOL MessageQueue_Wait(wMessageQueue* queue)
85{
86 BOOL status = FALSE;
87
88 WINPR_ASSERT(queue);
89 if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
90 status = TRUE;
91
92 return status;
93}
94
95static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count)
96{
97 WINPR_ASSERT(queue);
98
99 if (queue->size + count >= queue->capacity)
100 {
101 wMessage* new_arr = NULL;
102 size_t old_capacity = queue->capacity;
103 size_t new_capacity = queue->capacity * 2;
104
105 if (new_capacity < queue->size + count)
106 new_capacity = queue->size + count;
107
108 new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity);
109 if (!new_arr)
110 return FALSE;
111 queue->array = new_arr;
112 queue->capacity = new_capacity;
113 ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage));
114
115 /* rearrange wrapped entries */
116 if (queue->tail <= queue->head)
117 {
118 CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage));
119 queue->tail += old_capacity;
120 }
121 }
122
123 return TRUE;
124}
125
126BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message)
127{
128 wMessage* dst = NULL;
129 BOOL ret = FALSE;
130 WINPR_ASSERT(queue);
131
132 if (!message)
133 return FALSE;
134
135 WINPR_ASSERT(queue);
136 EnterCriticalSection(&queue->lock);
137
138 if (queue->closed)
139 goto out;
140
141 if (!MessageQueue_EnsureCapacity(queue, 1))
142 goto out;
143
144 dst = &(queue->array[queue->tail]);
145 *dst = *message;
146 dst->time = GetTickCount64();
147
148 queue->tail = (queue->tail + 1) % queue->capacity;
149 queue->size++;
150
151 if (queue->size > 0)
152 (void)SetEvent(queue->event);
153
154 if (message->id == WMQ_QUIT)
155 queue->closed = TRUE;
156
157 ret = TRUE;
158out:
159 LeaveCriticalSection(&queue->lock);
160 return ret;
161}
162
163BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
164{
165 wMessage message = { 0 };
166
167 message.context = context;
168 message.id = type;
169 message.wParam = wParam;
170 message.lParam = lParam;
171 message.Free = NULL;
172
173 return MessageQueue_Dispatch(queue, &message);
174}
175
176BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
177{
178 return MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*)(size_t)nExitCode, NULL);
179}
180
181int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
182{
183 int status = -1;
184
185 if (!MessageQueue_Wait(queue))
186 return status;
187
188 EnterCriticalSection(&queue->lock);
189
190 if (queue->size > 0)
191 {
192 CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
193 ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
194 queue->head = (queue->head + 1) % queue->capacity;
195 queue->size--;
196
197 if (queue->size < 1)
198 (void)ResetEvent(queue->event);
199
200 status = (message->id != WMQ_QUIT) ? 1 : 0;
201 }
202
203 LeaveCriticalSection(&queue->lock);
204
205 return status;
206}
207
208int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
209{
210 int status = 0;
211
212 WINPR_ASSERT(queue);
213 EnterCriticalSection(&queue->lock);
214
215 if (queue->size > 0)
216 {
217 CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
218 status = 1;
219
220 if (remove)
221 {
222 ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
223 queue->head = (queue->head + 1) % queue->capacity;
224 queue->size--;
225
226 if (queue->size < 1)
227 (void)ResetEvent(queue->event);
228 }
229 }
230
231 LeaveCriticalSection(&queue->lock);
232
233 return status;
234}
235
240wMessageQueue* MessageQueue_New(const wObject* callback)
241{
242 wMessageQueue* queue = NULL;
243
244 queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue));
245 if (!queue)
246 return NULL;
247
248 if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000))
249 goto fail;
250
251 if (!MessageQueue_EnsureCapacity(queue, 32))
252 goto fail;
253
254 queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
255 if (!queue->event)
256 goto fail;
257
258 if (callback)
259 queue->object = *callback;
260
261 return queue;
262
263fail:
264 WINPR_PRAGMA_DIAG_PUSH
265 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
266 MessageQueue_Free(queue);
267 WINPR_PRAGMA_DIAG_POP
268 return NULL;
269}
270
271void MessageQueue_Free(wMessageQueue* queue)
272{
273 if (!queue)
274 return;
275
276 if (queue->event)
277 MessageQueue_Clear(queue);
278
279 (void)CloseHandle(queue->event);
280 DeleteCriticalSection(&queue->lock);
281
282 free(queue->array);
283 free(queue);
284}
285
286int MessageQueue_Clear(wMessageQueue* queue)
287{
288 int status = 0;
289
290 WINPR_ASSERT(queue);
291 WINPR_ASSERT(queue->event);
292
293 EnterCriticalSection(&queue->lock);
294
295 while (queue->size > 0)
296 {
297 wMessage* msg = &(queue->array[queue->head]);
298
299 /* Free resources of message. */
300 if (queue->object.fnObjectUninit)
301 queue->object.fnObjectUninit(msg);
302 if (queue->object.fnObjectFree)
303 queue->object.fnObjectFree(msg);
304
305 ZeroMemory(msg, sizeof(wMessage));
306
307 queue->head = (queue->head + 1) % queue->capacity;
308 queue->size--;
309 }
310 (void)ResetEvent(queue->event);
311 queue->closed = FALSE;
312
313 LeaveCriticalSection(&queue->lock);
314
315 return status;
316}
This struct contains function pointer to initialize/free objects.
Definition collections.h:57