FreeRDP
Loading...
Searching...
No Matches
StreamPool.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/wlog.h>
24
25#include <winpr/collections.h>
26
27#include "../stream.h"
28#include "../log.h"
29#define TAG WINPR_TAG("utils.streampool")
30
31struct s_StreamPoolEntry
32{
33#if defined(WITH_STREAMPOOL_DEBUG)
34 char** msg;
35 size_t lines;
36#endif
37 wStream* s;
38};
39
40struct s_wStreamPool
41{
42 size_t aSize;
43 size_t aCapacity;
44 struct s_StreamPoolEntry* aArray;
45
46 size_t uSize;
47 size_t uCapacity;
48 struct s_StreamPoolEntry* uArray;
49
51 BOOL synchronized;
52 size_t defaultSize;
53};
54
55static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
56{
57 if (!entry)
58 return;
59
60#if defined(WITH_STREAMPOOL_DEBUG)
61 free((void*)entry->msg);
62#endif
63
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
66
67 const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
68 *entry = empty;
69}
70
71static struct s_StreamPoolEntry add_entry(wStream* s)
72{
73 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
74
75#if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
77 if (stack)
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
80#endif
81
82 entry.s = s;
83 return entry;
84}
85
90static inline void StreamPool_Lock(wStreamPool* pool)
91{
92 WINPR_ASSERT(pool);
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
95}
96
101static inline void StreamPool_Unlock(wStreamPool* pool)
102{
103 WINPR_ASSERT(pool);
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
106}
107
108static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
109{
110 WINPR_ASSERT(pool);
111
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
115
116 size_t new_cap = 0;
117 if (*cap == 0)
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
122 new_cap = *cap / 2;
123
124 if (new_cap > 0)
125 {
126 struct s_StreamPoolEntry* new_arr = nullptr;
127
128 if (*cap < *size + count)
129 *cap += count;
130
131 new_arr =
132 (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
133 if (!new_arr)
134 return FALSE;
135 *cap = new_cap;
136 *array = new_arr;
137 }
138 return TRUE;
139}
140
145static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
146{
147 WINPR_ASSERT(pool);
148
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
152 {
153 for (size_t x = 0; x < pcount; x++)
154 {
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
157 }
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
161 }
162}
163
168static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
169{
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
172 pool->uSize++;
173}
174
179static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
180{
181 WINPR_ASSERT(pool);
182 for (size_t index = 0; index < pool->uSize; index++)
183 {
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
185 if (cur->s == s)
186 {
187 StreamPool_ShiftUsed(pool, index);
188 break;
189 }
190 }
191}
192
193static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
194{
195 WINPR_ASSERT(pool);
196
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
200 {
201 for (size_t x = 0; x < pcount; x++)
202 {
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
205 }
206
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
210 }
211}
212
217wStream* StreamPool_Take(wStreamPool* pool, size_t size)
218{
219 BOOL found = FALSE;
220 size_t foundIndex = 0;
221 wStream* s = nullptr;
222
223 StreamPool_Lock(pool);
224
225 if (size == 0)
226 size = pool->defaultSize;
227
228 for (size_t index = 0; index < pool->aSize; index++)
229 {
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
231 s = cur->s;
232
233 if (Stream_Capacity(s) >= size)
234 {
235 found = TRUE;
236 foundIndex = index;
237 break;
238 }
239 }
240
241 if (!found)
242 {
243 s = Stream_New(nullptr, size);
244 if (!s)
245 goto out_fail;
246 }
247 else if (s)
248 {
249 Stream_SetPosition(s, 0);
250 Stream_SetLength(s, Stream_Capacity(s));
251 StreamPool_ShiftAvailable(pool, foundIndex);
252 }
253
254 if (s)
255 {
256 s->pool = pool;
257 s->count = 1;
258 StreamPool_AddUsed(pool, s);
259 }
260
261out_fail:
262 StreamPool_Unlock(pool);
263
264 return s;
265}
266
271static void StreamPool_Remove(wStreamPool* pool, wStream* s)
272{
273 StreamPool_EnsureCapacity(pool, 1, FALSE);
274 Stream_EnsureValidity(s);
275 for (size_t x = 0; x < pool->aSize; x++)
276 {
277 wStream* cs = pool->aArray[x].s;
278 if (cs == s)
279 return;
280 }
281 pool->aArray[(pool->aSize)++] = add_entry(s);
282 StreamPool_RemoveUsed(pool, s);
283}
284
285static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
286{
287 StreamPool_Lock(pool);
288 StreamPool_Remove(pool, s);
289 StreamPool_Unlock(pool);
290}
291
292void StreamPool_Return(wStreamPool* pool, wStream* s)
293{
294 WINPR_ASSERT(pool);
295 if (!s)
296 return;
297
298 StreamPool_Lock(pool);
299 StreamPool_Remove(pool, s);
300 StreamPool_Unlock(pool);
301}
302
307void Stream_AddRef(wStream* s)
308{
309 WINPR_ASSERT(s);
310 s->count++;
311}
312
317void Stream_Release(wStream* s)
318{
319 WINPR_ASSERT(s);
320
321 if (s->count > 0)
322 s->count--;
323 if (s->count == 0)
324 {
325 if (s->pool)
326 StreamPool_ReleaseOrReturn(s->pool, s);
327 else
328 Stream_Free(s, TRUE);
329 }
330}
331
336wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
337{
338 wStream* s = nullptr;
339
340 StreamPool_Lock(pool);
341
342 for (size_t index = 0; index < pool->uSize; index++)
343 {
344 struct s_StreamPoolEntry* cur = &pool->uArray[index];
345
346 if ((ptr >= Stream_Buffer(cur->s)) &&
347 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
348 {
349 s = cur->s;
350 break;
351 }
352 }
353
354 StreamPool_Unlock(pool);
355
356 return s;
357}
358
363void StreamPool_Clear(wStreamPool* pool)
364{
365 StreamPool_Lock(pool);
366
367 for (size_t x = 0; x < pool->aSize; x++)
368 {
369 struct s_StreamPoolEntry* cur = &pool->aArray[x];
370 discard_entry(cur, TRUE);
371 }
372 pool->aSize = 0;
373
374 if (pool->uSize > 0)
375 {
376 WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
377 pool->uSize);
378 for (size_t x = 0; x < pool->uSize; x++)
379 {
380 struct s_StreamPoolEntry* cur = &pool->uArray[x];
381 discard_entry(cur, TRUE);
382 }
383 pool->uSize = 0;
384 }
385
386 StreamPool_Unlock(pool);
387}
388
389size_t StreamPool_UsedCount(wStreamPool* pool)
390{
391 StreamPool_Lock(pool);
392 size_t usize = pool->uSize;
393 StreamPool_Unlock(pool);
394 return usize;
395}
396
401wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
402{
403 wStreamPool* pool = nullptr;
404
405 pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
406
407 if (pool)
408 {
409 pool->synchronized = synchronized;
410 pool->defaultSize = defaultSize;
411
412 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
413 goto fail;
414 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
415 goto fail;
416
417 if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
418 goto fail;
419 }
420
421 return pool;
422fail:
423 WINPR_PRAGMA_DIAG_PUSH
424 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
425 StreamPool_Free(pool);
426 WINPR_PRAGMA_DIAG_POP
427 return nullptr;
428}
429
430void StreamPool_Free(wStreamPool* pool)
431{
432 if (pool)
433 {
434 StreamPool_Clear(pool);
435
436 DeleteCriticalSection(&pool->lock);
437
438 free(pool->aArray);
439 free(pool->uArray);
440
441 free(pool);
442 }
443}
444
445char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
446{
447 WINPR_ASSERT(pool);
448
449 if (!buffer || (size < 1))
450 return nullptr;
451
452 size_t used = 0;
453 int offset = _snprintf(buffer, size - 1,
454 "aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
455 ", uCapacity=%" PRIuz,
456 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
457 if ((offset > 0) && ((size_t)offset < size))
458 used += (size_t)offset;
459
460#if defined(WITH_STREAMPOOL_DEBUG)
461 StreamPool_Lock(pool);
462
463 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
464 if ((offset > 0) && ((size_t)offset < size - used))
465 used += (size_t)offset;
466 for (size_t x = 0; x < pool->uSize; x++)
467 {
468 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
469 WINPR_ASSERT(cur->msg || (cur->lines == 0));
470
471 for (size_t y = 0; y < cur->lines; y++)
472 {
473 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
474 y, cur->msg[y]);
475 if ((offset > 0) && ((size_t)offset < size - used))
476 used += (size_t)offset;
477 }
478 }
479
480 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
481 if ((offset > 0) && ((size_t)offset < size - used))
482 used += (size_t)offset;
483
484 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
485 void* stack = winpr_backtrace(20);
486 if (stack)
487 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
488 winpr_backtrace_free(stack);
489
490 for (size_t x = 0; x < entry.lines; x++)
491 {
492 const char* msg = entry.msg[x];
493 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
494 if ((offset > 0) && ((size_t)offset < size - used))
495 used += (size_t)offset;
496 }
497 free((void*)entry.msg);
498 StreamPool_Unlock(pool);
499#endif
500 buffer[used] = '\0';
501 return buffer;
502}
503
504BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
505{
506 wLog* log = WLog_Get(TAG);
507
508 /* HACK: We disconnected the transport above, now wait without a read or write lock until all
509 * streams in use have been returned to the pool. */
510 while (timeoutMS > 0)
511 {
512 const size_t used = StreamPool_UsedCount(pool);
513 if (used == 0)
514 return TRUE;
515 WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
516
517 char buffer[4096] = WINPR_C_ARRAY_INIT;
518 StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
519 WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
520
521 UINT32 diff = 10;
522 if (timeoutMS != INFINITE)
523 {
524 diff = timeoutMS > 10 ? 10 : timeoutMS;
525 timeoutMS -= diff;
526 }
527 Sleep(diff);
528 }
529
530 return FALSE;
531}