FreeRDP
Loading...
Searching...
No Matches
StreamPool.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/wlog.h>
24
25#include <winpr/collections.h>
26
27#include "../stream.h"
28#include "../log.h"
29#define TAG WINPR_TAG("utils.streampool")
30
31struct s_StreamPoolEntry
32{
33#if defined(WITH_STREAMPOOL_DEBUG)
34 char** msg;
35 size_t lines;
36#endif
37 wStream* s;
38};
39
40struct s_wStreamPool
41{
42 size_t aSize;
43 size_t aCapacity;
44 struct s_StreamPoolEntry* aArray;
45
46 size_t uSize;
47 size_t uCapacity;
48 struct s_StreamPoolEntry* uArray;
49
51 BOOL synchronized;
52 size_t defaultSize;
53};
54
55static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
56{
57 if (!entry)
58 return;
59
60#if defined(WITH_STREAMPOOL_DEBUG)
61 free((void*)entry->msg);
62#endif
63
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
66
67 const struct s_StreamPoolEntry empty = { 0 };
68 *entry = empty;
69}
70
71static struct s_StreamPoolEntry add_entry(wStream* s)
72{
73 struct s_StreamPoolEntry entry = { 0 };
74
75#if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
77 if (stack)
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
80#endif
81
82 entry.s = s;
83 return entry;
84}
85
90static INLINE void StreamPool_Lock(wStreamPool* pool)
91{
92 WINPR_ASSERT(pool);
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
95}
96
101static INLINE void StreamPool_Unlock(wStreamPool* pool)
102{
103 WINPR_ASSERT(pool);
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
106}
107
108static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
109{
110 WINPR_ASSERT(pool);
111
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
115
116 size_t new_cap = 0;
117 if (*cap == 0)
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
122 new_cap = *cap / 2;
123
124 if (new_cap > 0)
125 {
126 struct s_StreamPoolEntry* new_arr = NULL;
127
128 if (*cap < *size + count)
129 *cap += count;
130
131 new_arr =
132 (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
133 if (!new_arr)
134 return FALSE;
135 *cap = new_cap;
136 *array = new_arr;
137 }
138 return TRUE;
139}
140
145static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
146{
147 WINPR_ASSERT(pool);
148
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
152 {
153 for (size_t x = 0; x < pcount; x++)
154 {
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
157 }
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
161 }
162}
163
168static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
169{
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
172 pool->uSize++;
173}
174
179static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
180{
181 WINPR_ASSERT(pool);
182 for (size_t index = 0; index < pool->uSize; index++)
183 {
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
185 if (cur->s == s)
186 {
187 StreamPool_ShiftUsed(pool, index);
188 break;
189 }
190 }
191}
192
193static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
194{
195 WINPR_ASSERT(pool);
196
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
200 {
201 for (size_t x = 0; x < pcount; x++)
202 {
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
205 }
206
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
210 }
211}
212
217wStream* StreamPool_Take(wStreamPool* pool, size_t size)
218{
219 BOOL found = FALSE;
220 size_t foundIndex = 0;
221 wStream* s = NULL;
222
223 StreamPool_Lock(pool);
224
225 if (size == 0)
226 size = pool->defaultSize;
227
228 for (size_t index = 0; index < pool->aSize; index++)
229 {
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
231 s = cur->s;
232
233 if (Stream_Capacity(s) >= size)
234 {
235 found = TRUE;
236 foundIndex = index;
237 break;
238 }
239 }
240
241 if (!found)
242 {
243 s = Stream_New(NULL, size);
244 if (!s)
245 goto out_fail;
246 }
247 else if (s)
248 {
249 Stream_SetPosition(s, 0);
250 Stream_SetLength(s, Stream_Capacity(s));
251 StreamPool_ShiftAvailable(pool, foundIndex);
252 }
253
254 if (s)
255 {
256 s->pool = pool;
257 s->count = 1;
258 StreamPool_AddUsed(pool, s);
259 }
260
261out_fail:
262 StreamPool_Unlock(pool);
263
264 return s;
265}
266
271static void StreamPool_Remove(wStreamPool* pool, wStream* s)
272{
273 StreamPool_EnsureCapacity(pool, 1, FALSE);
274 Stream_EnsureValidity(s);
275 for (size_t x = 0; x < pool->aSize; x++)
276 {
277 wStream* cs = pool->aArray[x].s;
278 if (cs == s)
279 return;
280 }
281 pool->aArray[(pool->aSize)++] = add_entry(s);
282 StreamPool_RemoveUsed(pool, s);
283}
284
285static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
286{
287 StreamPool_Lock(pool);
288 StreamPool_Remove(pool, s);
289 StreamPool_Unlock(pool);
290}
291
292void StreamPool_Return(wStreamPool* pool, wStream* s)
293{
294 WINPR_ASSERT(pool);
295 if (!s)
296 return;
297
298 StreamPool_Lock(pool);
299 StreamPool_Remove(pool, s);
300 StreamPool_Unlock(pool);
301}
302
307void Stream_AddRef(wStream* s)
308{
309 WINPR_ASSERT(s);
310 s->count++;
311}
312
317void Stream_Release(wStream* s)
318{
319 WINPR_ASSERT(s);
320
321 if (s->count > 0)
322 s->count--;
323 if (s->count == 0)
324 {
325 if (s->pool)
326 StreamPool_ReleaseOrReturn(s->pool, s);
327 else
328 Stream_Free(s, TRUE);
329 }
330}
331
336wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
337{
338 wStream* s = NULL;
339
340 StreamPool_Lock(pool);
341
342 for (size_t index = 0; index < pool->uSize; index++)
343 {
344 struct s_StreamPoolEntry* cur = &pool->uArray[index];
345
346 if ((ptr >= Stream_Buffer(cur->s)) &&
347 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
348 {
349 s = cur->s;
350 break;
351 }
352 }
353
354 StreamPool_Unlock(pool);
355
356 return s;
357}
358
363void StreamPool_Clear(wStreamPool* pool)
364{
365 StreamPool_Lock(pool);
366
367 for (size_t x = 0; x < pool->aSize; x++)
368 {
369 struct s_StreamPoolEntry* cur = &pool->aArray[x];
370 discard_entry(cur, TRUE);
371 }
372
373 if (pool->uSize > 0)
374 {
375 WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
376 pool->uSize);
377 for (size_t x = 0; x < pool->uSize; x++)
378 {
379 struct s_StreamPoolEntry* cur = &pool->uArray[x];
380 discard_entry(cur, TRUE);
381 }
382 }
383
384 StreamPool_Unlock(pool);
385}
386
387size_t StreamPool_UsedCount(wStreamPool* pool)
388{
389 StreamPool_Lock(pool);
390 size_t usize = pool->uSize;
391 StreamPool_Unlock(pool);
392 return usize;
393}
394
399wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
400{
401 wStreamPool* pool = NULL;
402
403 pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
404
405 if (pool)
406 {
407 pool->synchronized = synchronized;
408 pool->defaultSize = defaultSize;
409
410 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
411 goto fail;
412 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
413 goto fail;
414
415 InitializeCriticalSectionAndSpinCount(&pool->lock, 4000);
416 }
417
418 return pool;
419fail:
420 WINPR_PRAGMA_DIAG_PUSH
421 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
422 StreamPool_Free(pool);
423 WINPR_PRAGMA_DIAG_POP
424 return NULL;
425}
426
427void StreamPool_Free(wStreamPool* pool)
428{
429 if (pool)
430 {
431 StreamPool_Clear(pool);
432
433 DeleteCriticalSection(&pool->lock);
434
435 free(pool->aArray);
436 free(pool->uArray);
437
438 free(pool);
439 }
440}
441
442char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
443{
444 WINPR_ASSERT(pool);
445
446 if (!buffer || (size < 1))
447 return NULL;
448
449 size_t used = 0;
450 int offset = _snprintf(buffer, size - 1,
451 "aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
452 ", uCapacity=%" PRIuz,
453 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
454 if ((offset > 0) && ((size_t)offset < size))
455 used += (size_t)offset;
456
457#if defined(WITH_STREAMPOOL_DEBUG)
458 StreamPool_Lock(pool);
459
460 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
461 if ((offset > 0) && ((size_t)offset < size - used))
462 used += (size_t)offset;
463 for (size_t x = 0; x < pool->uSize; x++)
464 {
465 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
466 WINPR_ASSERT(cur->msg || (cur->lines == 0));
467
468 for (size_t y = 0; y < cur->lines; y++)
469 {
470 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
471 y, cur->msg[y]);
472 if ((offset > 0) && ((size_t)offset < size - used))
473 used += (size_t)offset;
474 }
475 }
476
477 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
478 if ((offset > 0) && ((size_t)offset < size - used))
479 used += (size_t)offset;
480
481 struct s_StreamPoolEntry entry = { 0 };
482 void* stack = winpr_backtrace(20);
483 if (stack)
484 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
485 winpr_backtrace_free(stack);
486
487 for (size_t x = 0; x < entry.lines; x++)
488 {
489 const char* msg = entry.msg[x];
490 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
491 if ((offset > 0) && ((size_t)offset < size - used))
492 used += (size_t)offset;
493 }
494 free((void*)entry.msg);
495 StreamPool_Unlock(pool);
496#endif
497 buffer[used] = '\0';
498 return buffer;
499}
500
501BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
502{
503 wLog* log = WLog_Get(TAG);
504
505 /* HACK: We disconnected the transport above, now wait without a read or write lock until all
506 * streams in use have been returned to the pool. */
507 while (timeoutMS > 0)
508 {
509 const size_t used = StreamPool_UsedCount(pool);
510 if (used == 0)
511 return TRUE;
512 WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
513
514 char buffer[4096] = { 0 };
515 StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
516 WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
517
518 UINT32 diff = 10;
519 if (timeoutMS != INFINITE)
520 {
521 diff = timeoutMS > 10 ? 10 : timeoutMS;
522 timeoutMS -= diff;
523 }
524 Sleep(diff);
525 }
526
527 return FALSE;
528}