FreeRDP
Loading...
Searching...
No Matches
apc.c
1
19#ifndef _WIN32
20
21#include "apc.h"
22#include "thread.h"
23#include "../log.h"
24#include "../synch/pollset.h"
25#include <winpr/assert.h>
26
27#define TAG WINPR_TAG("apc")
28
29BOOL apc_init(APC_QUEUE* apc)
30{
31 pthread_mutexattr_t attr;
32 BOOL ret = FALSE;
33
34 WINPR_ASSERT(apc);
35
36 pthread_mutexattr_init(&attr);
37 if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) != 0)
38 {
39 WLog_ERR(TAG, "failed to initialize mutex attributes to recursive");
40 return FALSE;
41 }
42
43 memset(apc, 0, sizeof(*apc));
44
45 if (pthread_mutex_init(&apc->mutex, &attr) != 0)
46 {
47 WLog_ERR(TAG, "failed to initialize main thread APC mutex");
48 goto out;
49 }
50
51 ret = TRUE;
52out:
53 pthread_mutexattr_destroy(&attr);
54 return ret;
55}
56
57BOOL apc_uninit(APC_QUEUE* apc)
58{
59 WINPR_ASSERT(apc);
60 return pthread_mutex_destroy(&apc->mutex) == 0;
61}
62
63void apc_register(WINPR_THREAD* thread, WINPR_APC_ITEM* addItem)
64{
65 WINPR_APC_ITEM** nextp = NULL;
66 APC_QUEUE* apc = NULL;
67
68 WINPR_ASSERT(thread);
69 WINPR_ASSERT(addItem);
70
71 apc = &thread->apc;
72 WINPR_ASSERT(apc);
73
74 pthread_mutex_lock(&apc->mutex);
75 if (apc->tail)
76 {
77 nextp = &apc->tail->next;
78 addItem->last = apc->tail;
79 }
80 else
81 {
82 nextp = &apc->head;
83 }
84
85 *nextp = addItem;
86 apc->tail = addItem;
87 apc->length++;
88
89 addItem->markedForRemove = FALSE;
90 addItem->boundThread = GetCurrentThreadId();
91 addItem->linked = TRUE;
92 pthread_mutex_unlock(&apc->mutex);
93}
94
95static INLINE void apc_item_remove(APC_QUEUE* apc, WINPR_APC_ITEM* item)
96{
97 WINPR_ASSERT(apc);
98 WINPR_ASSERT(item);
99
100 if (!item->last)
101 apc->head = item->next;
102 else
103 item->last->next = item->next;
104
105 if (!item->next)
106 apc->tail = item->last;
107 else
108 item->next->last = item->last;
109
110 apc->length--;
111}
112
113APC_REMOVE_RESULT apc_remove(WINPR_APC_ITEM* item)
114{
115 WINPR_THREAD* thread = winpr_GetCurrentThread();
116 APC_QUEUE* apc = NULL;
117 APC_REMOVE_RESULT ret = APC_REMOVE_OK;
118
119 WINPR_ASSERT(item);
120
121 if (!item->linked)
122 return APC_REMOVE_OK;
123
124 if (item->boundThread != GetCurrentThreadId())
125 {
126 WLog_ERR(TAG, "removing an APC entry should be done in the creating thread");
127 return APC_REMOVE_ERROR;
128 }
129
130 if (!thread)
131 {
132 WLog_ERR(TAG, "unable to retrieve current thread");
133 return APC_REMOVE_ERROR;
134 }
135
136 apc = &thread->apc;
137 WINPR_ASSERT(apc);
138
139 pthread_mutex_lock(&apc->mutex);
140 if (apc->treatingCompletions)
141 {
142 item->markedForRemove = TRUE;
143 ret = APC_REMOVE_DELAY_FREE;
144 goto out;
145 }
146
147 apc_item_remove(apc, item);
148
149out:
150 pthread_mutex_unlock(&apc->mutex);
151 item->boundThread = 0xFFFFFFFF;
152 item->linked = FALSE;
153 return ret;
154}
155
156BOOL apc_collectFds(WINPR_THREAD* thread, WINPR_POLL_SET* set, BOOL* haveAutoSignaled)
157{
158 WINPR_APC_ITEM* item = NULL;
159 BOOL ret = FALSE;
160 APC_QUEUE* apc = NULL;
161
162 WINPR_ASSERT(thread);
163 WINPR_ASSERT(haveAutoSignaled);
164
165 apc = &thread->apc;
166 WINPR_ASSERT(apc);
167
168 *haveAutoSignaled = FALSE;
169 pthread_mutex_lock(&apc->mutex);
170 item = apc->head;
171 for (; item; item = item->next)
172 {
173 if (item->alwaysSignaled)
174 {
175 *haveAutoSignaled = TRUE;
176 }
177 else if (!pollset_add(set, item->pollFd, item->pollMode))
178 goto out;
179 }
180
181 ret = TRUE;
182out:
183 pthread_mutex_unlock(&apc->mutex);
184 return ret;
185}
186
187int apc_executeCompletions(WINPR_THREAD* thread, WINPR_POLL_SET* set, size_t startIndex)
188{
189 APC_QUEUE* apc = NULL;
190 WINPR_APC_ITEM* nextItem = NULL;
191 size_t idx = startIndex;
192 int ret = 0;
193
194 WINPR_ASSERT(thread);
195
196 apc = &thread->apc;
197 WINPR_ASSERT(apc);
198
199 pthread_mutex_lock(&apc->mutex);
200 apc->treatingCompletions = TRUE;
201
202 /* first pass to compute signaled items */
203 for (WINPR_APC_ITEM* item = apc->head; item; item = item->next)
204 {
205 item->isSignaled = item->alwaysSignaled || pollset_isSignaled(set, idx);
206 if (!item->alwaysSignaled)
207 idx++;
208 }
209
210 /* second pass: run completions */
211 for (WINPR_APC_ITEM* item = apc->head; item; item = nextItem)
212 {
213 if (item->isSignaled)
214 {
215 if (item->completion && !item->markedForRemove)
216 item->completion(item->completionArgs);
217 ret++;
218 }
219
220 nextItem = item->next;
221 }
222
223 /* third pass: to do final cleanup */
224 for (WINPR_APC_ITEM* item = apc->head; item; item = nextItem)
225 {
226 nextItem = item->next;
227
228 if (item->markedForRemove)
229 {
230 apc_item_remove(apc, item);
231 if (item->markedForFree)
232 free(item);
233 }
234 }
235
236 apc->treatingCompletions = FALSE;
237 pthread_mutex_unlock(&apc->mutex);
238
239 return ret;
240}
241
242void apc_cleanupThread(WINPR_THREAD* thread)
243{
244 WINPR_APC_ITEM* item = NULL;
245 WINPR_APC_ITEM* nextItem = NULL;
246 APC_QUEUE* apc = NULL;
247
248 WINPR_ASSERT(thread);
249
250 apc = &thread->apc;
251 WINPR_ASSERT(apc);
252
253 pthread_mutex_lock(&apc->mutex);
254 item = apc->head;
255 for (; item; item = nextItem)
256 {
257 nextItem = item->next;
258
259 if (item->type == APC_TYPE_HANDLE_FREE)
260 item->completion(item->completionArgs);
261
262 item->last = item->next = NULL;
263 item->linked = FALSE;
264 if (item->markedForFree)
265 free(item);
266 }
267
268 apc->head = apc->tail = NULL;
269 pthread_mutex_unlock(&apc->mutex);
270}
271
272#endif