20#include <winpr/config.h>
23#include <winpr/wlog.h>
25#include <winpr/collections.h>
29#define TAG WINPR_TAG("utils.streampool")
31struct s_StreamPoolEntry
33#if defined(WITH_STREAMPOOL_DEBUG)
44 struct s_StreamPoolEntry* aArray;
48 struct s_StreamPoolEntry* uArray;
55static void discard_entry(
struct s_StreamPoolEntry* entry, BOOL discardStream)
60#if defined(WITH_STREAMPOOL_DEBUG)
61 free((
void*)entry->msg);
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
67 const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
71static struct s_StreamPoolEntry add_entry(
wStream* s)
73 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
75#if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
90static inline void StreamPool_Lock(wStreamPool* pool)
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
101static inline void StreamPool_Unlock(wStreamPool* pool)
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
108static BOOL StreamPool_EnsureCapacity(wStreamPool* pool,
size_t count, BOOL usedOrAvailable)
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
126 struct s_StreamPoolEntry* new_arr =
nullptr;
128 if (*cap < *size + count)
132 (
struct s_StreamPoolEntry*)realloc(*array,
sizeof(
struct s_StreamPoolEntry) * new_cap);
145static void StreamPool_ShiftUsed(wStreamPool* pool,
size_t index)
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
153 for (
size_t x = 0; x < pcount; x++)
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
168static void StreamPool_AddUsed(wStreamPool* pool,
wStream* s)
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
179static void StreamPool_RemoveUsed(wStreamPool* pool,
wStream* s)
182 for (
size_t index = 0; index < pool->uSize; index++)
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
187 StreamPool_ShiftUsed(pool, index);
193static void StreamPool_ShiftAvailable(wStreamPool* pool,
size_t index)
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
201 for (
size_t x = 0; x < pcount; x++)
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
217wStream* StreamPool_Take(wStreamPool* pool,
size_t size)
220 size_t foundIndex = 0;
223 StreamPool_Lock(pool);
226 size = pool->defaultSize;
228 for (
size_t index = 0; index < pool->aSize; index++)
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
233 if (Stream_Capacity(s) >= size)
243 s = Stream_New(
nullptr, size);
249 Stream_SetPosition(s, 0);
250 Stream_SetLength(s, Stream_Capacity(s));
251 StreamPool_ShiftAvailable(pool, foundIndex);
258 StreamPool_AddUsed(pool, s);
262 StreamPool_Unlock(pool);
271static void StreamPool_Remove(wStreamPool* pool,
wStream* s)
273 StreamPool_EnsureCapacity(pool, 1, FALSE);
274 Stream_EnsureValidity(s);
275 for (
size_t x = 0; x < pool->aSize; x++)
277 wStream* cs = pool->aArray[x].s;
281 pool->aArray[(pool->aSize)++] = add_entry(s);
282 StreamPool_RemoveUsed(pool, s);
285static void StreamPool_ReleaseOrReturn(wStreamPool* pool,
wStream* s)
287 StreamPool_Lock(pool);
288 StreamPool_Remove(pool, s);
289 StreamPool_Unlock(pool);
292void StreamPool_Return(wStreamPool* pool,
wStream* s)
298 StreamPool_Lock(pool);
299 StreamPool_Remove(pool, s);
300 StreamPool_Unlock(pool);
326 StreamPool_ReleaseOrReturn(s->pool, s);
328 Stream_Free(s, TRUE);
336wStream* StreamPool_Find(wStreamPool* pool,
const BYTE* ptr)
340 StreamPool_Lock(pool);
342 for (
size_t index = 0; index < pool->uSize; index++)
344 struct s_StreamPoolEntry* cur = &pool->uArray[index];
346 if ((ptr >= Stream_Buffer(cur->s)) &&
347 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
354 StreamPool_Unlock(pool);
363void StreamPool_Clear(wStreamPool* pool)
365 StreamPool_Lock(pool);
367 for (
size_t x = 0; x < pool->aSize; x++)
369 struct s_StreamPoolEntry* cur = &pool->aArray[x];
370 discard_entry(cur, TRUE);
376 WLog_WARN(TAG,
"Clearing StreamPool, but there are %" PRIuz
" streams currently in use",
378 for (
size_t x = 0; x < pool->uSize; x++)
380 struct s_StreamPoolEntry* cur = &pool->uArray[x];
381 discard_entry(cur, TRUE);
386 StreamPool_Unlock(pool);
389size_t StreamPool_UsedCount(wStreamPool* pool)
391 StreamPool_Lock(pool);
392 size_t usize = pool->uSize;
393 StreamPool_Unlock(pool);
401wStreamPool* StreamPool_New(BOOL
synchronized,
size_t defaultSize)
403 wStreamPool* pool =
nullptr;
405 pool = (wStreamPool*)calloc(1,
sizeof(wStreamPool));
409 pool->synchronized =
synchronized;
410 pool->defaultSize = defaultSize;
412 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
414 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
417 if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
423 WINPR_PRAGMA_DIAG_PUSH
424 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
425 StreamPool_Free(pool);
426 WINPR_PRAGMA_DIAG_POP
430void StreamPool_Free(wStreamPool* pool)
434 StreamPool_Clear(pool);
436 DeleteCriticalSection(&pool->lock);
445char* StreamPool_GetStatistics(wStreamPool* pool,
char* buffer,
size_t size)
449 if (!buffer || (size < 1))
453 int offset = _snprintf(buffer, size - 1,
454 "aSize =%" PRIuz
", uSize =%" PRIuz
", aCapacity=%" PRIuz
455 ", uCapacity=%" PRIuz,
456 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
457 if ((offset > 0) && ((
size_t)offset < size))
458 used += (size_t)offset;
460#if defined(WITH_STREAMPOOL_DEBUG)
461 StreamPool_Lock(pool);
463 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- dump used array take locations --\n");
464 if ((offset > 0) && ((
size_t)offset < size - used))
465 used += (size_t)offset;
466 for (
size_t x = 0; x < pool->uSize; x++)
468 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
469 WINPR_ASSERT(cur->msg || (cur->lines == 0));
471 for (
size_t y = 0; y < cur->lines; y++)
473 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
" | %" PRIuz
"]: %s\n", x,
475 if ((offset > 0) && ((
size_t)offset < size - used))
476 used += (size_t)offset;
480 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- statistics called from --\n");
481 if ((offset > 0) && ((
size_t)offset < size - used))
482 used += (size_t)offset;
484 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
485 void* stack = winpr_backtrace(20);
487 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
488 winpr_backtrace_free(stack);
490 for (
size_t x = 0; x < entry.lines; x++)
492 const char* msg = entry.msg[x];
493 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
"]: %s\n", x, msg);
494 if ((offset > 0) && ((
size_t)offset < size - used))
495 used += (size_t)offset;
497 free((
void*)entry.msg);
498 StreamPool_Unlock(pool);
504BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
506 wLog* log = WLog_Get(TAG);
510 while (timeoutMS > 0)
512 const size_t used = StreamPool_UsedCount(pool);
515 WLog_Print(log, WLOG_DEBUG,
"%" PRIuz
" streams still in use, sleeping...", used);
517 char buffer[4096] = WINPR_C_ARRAY_INIT;
518 StreamPool_GetStatistics(pool, buffer,
sizeof(buffer));
519 WLog_Print(log, WLOG_TRACE,
"Pool statistics: %s", buffer);
522 if (timeoutMS != INFINITE)
524 diff = timeoutMS > 10 ? 10 : timeoutMS;