Signed-off-by: Torge Matthies tmatthies@codeweavers.com --- v4 -> v5: Merged in the implementation of StructuredTaskCollection_ctor and StructuredTaskCollection_dtor. Added the removal of test todos.
dlls/msvcr120/tests/msvcr120.c | 4 +- dlls/msvcrt/concurrency.c | 161 +++++++++++++++++++++++++++++++-- 2 files changed, 154 insertions(+), 11 deletions(-)
diff --git a/dlls/msvcr120/tests/msvcr120.c b/dlls/msvcr120/tests/msvcr120.c index d1367e274f3..7ea87b0123f 100644 --- a/dlls/msvcr120/tests/msvcr120.c +++ b/dlls/msvcr120/tests/msvcr120.c @@ -851,8 +851,8 @@ static void test_StructuredTaskCollection(void) call_func2(p__StructuredTaskCollection__Schedule, &task_coll, &chore2); status = call_func2(p__StructuredTaskCollection__RunAndWait, &task_coll, NULL); ok(status == 1, "_StructuredTaskCollection::_RunAndWait failed: %d\n", status); - todo_wine ok(chore1_executed, "_StructuredTaskCollection::_RunAndWait did not execute chore1\n"); - todo_wine ok(chore2_executed, "_StructuredTaskCollection::_RunAndWait did not execute chore2\n"); + ok(chore1_executed, "_StructuredTaskCollection::_RunAndWait did not execute chore1\n"); + ok(chore2_executed, "_StructuredTaskCollection::_RunAndWait did not execute chore2\n");
call_func1(p__StructuredTaskCollection_dtor, &task_coll);
diff --git a/dlls/msvcrt/concurrency.c b/dlls/msvcrt/concurrency.c index 7712b93f1a3..7ede7d8b6a7 100644 --- a/dlls/msvcrt/concurrency.c +++ b/dlls/msvcrt/concurrency.c @@ -22,8 +22,10 @@ #include <stdbool.h>
#include "windef.h" +#include "winbase.h" #include "winternl.h" #include "wine/debug.h" +#include "wine/exception.h" #include "msvcrt.h" #include "cxx.h"
@@ -176,14 +178,39 @@ typedef struct cs_queue
#if _MSVCR_VER >= 100
-typedef struct -{ - char dummy; +/* This class seems to be 80 bytes big on x86-64, judging by the addresses passed in to + StructuredTaskCollection__RunAndWait */ +typedef struct UnrealizedChore +{ + void *unk1; + void (__cdecl *callback)(struct UnrealizedChore *this, void *unk); + void *unk2; + void *unk3; + void *unk4; + void *unk5; + void *unk6; + void *unk7; + void *unk8; + void *unk9; } UnrealizedChore;
+typedef struct StructuredTaskCollectionChoresEntry +{ + struct StructuredTaskCollectionChoresEntry *next; + UnrealizedChore *chore; + TP_WORK *work; + volatile LONG *waiting_on_ptr; +} StructuredTaskCollectionChoresEntry; + typedef struct { - char dummy; + void *unk1; + unsigned int unk2; + void *unk3; + StructuredTaskCollectionChoresEntry *volatile unk_chores; + unsigned int count; + unsigned int unk5; + void *unk6; } StructuredTaskCollection;
#endif /* _MSVCR_VER >= 100 */ @@ -1763,7 +1790,9 @@ bool __thiscall SpinWait__SpinOnce(SpinWait *this) DEFINE_THISCALL_WRAPPER(StructuredTaskCollection_ctor, 8) void __thiscall StructuredTaskCollection_ctor(StructuredTaskCollection *this, void *unk) { - FIXME("(%p): stub!\n", this); + FIXME("(%p): semi-stub!\n", this); + + memset(this, 0, sizeof(*this)); }
#endif /* _MSVCR_VER >= 100 */ @@ -1776,21 +1805,123 @@ void __thiscall StructuredTaskCollection_ctor(StructuredTaskCollection *this, vo DEFINE_THISCALL_WRAPPER(StructuredTaskCollection_dtor, 4) void __thiscall StructuredTaskCollection_dtor(StructuredTaskCollection *this) { - FIXME("(%p): stub!\n", this); + StructuredTaskCollectionChoresEntry *entry, *next; + + TRACE("(%p)\n", this); + + for (entry = this->unk_chores; entry; entry = next) + { + next = entry->next; + operator_delete(entry); + } }
#endif /* _MSVCR_VER >= 120 */
#if _MSVCR_VER >= 100
+static void CALLBACK StructuredTaskCollection_threadpool_cb_finally(BOOL normal, void *data) +{ + StructuredTaskCollectionChoresEntry *entry = data; + + TRACE("(%u %p)\n", normal, data); + + if (entry->waiting_on_ptr && InterlockedDecrement(entry->waiting_on_ptr) == 0) + RtlWakeAddressSingle((void*)entry->waiting_on_ptr); +} + +static inline void StructuredTaskCollection_run_chore(UnrealizedChore *chore, void* unk) +{ + if (chore->callback) + chore->callback(chore, unk); +} + +static void WINAPI StructuredTaskCollection_threadpool_cb(TP_CALLBACK_INSTANCE *inst, void *data, TP_WORK *work) +{ + StructuredTaskCollectionChoresEntry *entry = data; + + TRACE("(%p %p)\n", inst, data); + + __TRY + { + StructuredTaskCollection_run_chore(entry->chore, NULL); + } + __FINALLY_CTX(StructuredTaskCollection_threadpool_cb_finally, data) +} + /* ?_RunAndWait@_StructuredTaskCollection@details@Concurrency@@QAA?AW4_TaskCollectionStatus@23@PAV_UnrealizedChore@23@@Z */ /* ?_RunAndWait@_StructuredTaskCollection@details@Concurrency@@QAG?AW4_TaskCollectionStatus@23@PAV_UnrealizedChore@23@@Z */ /* ?_RunAndWait@_StructuredTaskCollection@details@Concurrency@@QEAA?AW4_TaskCollectionStatus@23@PEAV_UnrealizedChore@23@@Z */ DEFINE_THISCALL_WRAPPER(StructuredTaskCollection__RunAndWait, 8) /*enum Concurrency::details::_TaskCollectionStatus*/int __thiscall StructuredTaskCollection__RunAndWait(StructuredTaskCollection *this, UnrealizedChore *chore) { - FIXME("(%p %p): stub!\n", this, chore); - return 1; + StructuredTaskCollectionChoresEntry *chores, *entry, *next; + LONG total_count = 0, created_count = 0; + TP_POOL *tpool = NULL; + TP_CALLBACK_ENVIRON cbenv; + volatile LONG waiting_on = 0; + int ret = 1; + LONG val; + + TRACE("(%p %p)\n", this, chore); + + chores = InterlockedExchangePointer((void *volatile *)&this->unk_chores, NULL); + this->count = 0; + + if (!chores) + return ret; + + for (entry = chores; entry; entry = entry->next) + total_count++; + + tpool = CreateThreadpool(NULL); + if (!tpool || !SetThreadpoolThreadMinimum(tpool, total_count)) + goto done; + SetThreadpoolThreadMaximum(tpool, total_count); + + memset(&cbenv, 0, sizeof(cbenv)); + cbenv.Version = 1; + cbenv.Pool = tpool; + + for (entry = chores; entry; entry = entry->next) + { + entry->waiting_on_ptr = &waiting_on; + entry->work = CreateThreadpoolWork(StructuredTaskCollection_threadpool_cb, entry, &cbenv); + if (!entry->work) + { + ret = 0; + goto done; + } + created_count++; + } + + InterlockedExchange(&waiting_on, created_count); + for (entry = chores; entry; entry = entry->next) + SubmitThreadpoolWork(entry->work); + + if (chore) + StructuredTaskCollection_run_chore(chore, NULL); + + TRACE("waiting on %lu workers\n", created_count); + + while ((val = InterlockedCompareExchange(&waiting_on, 0, 0))) + RtlWaitOnAddress((void*)&waiting_on, (void*)&val, sizeof(waiting_on), NULL); + +done: + for (entry = chores; entry; entry = next) + { + if (created_count > 0) + { + CloseThreadpoolWork(entry->work); + created_count--; + } + next = entry->next; + operator_delete(entry); + } + + if (tpool) CloseThreadpool(tpool); + + return ret; }
/* ?_Schedule@_StructuredTaskCollection@details@Concurrency@@QAAXPAV_UnrealizedChore@23@@Z */ @@ -1799,7 +1930,19 @@ DEFINE_THISCALL_WRAPPER(StructuredTaskCollection__RunAndWait, 8) DEFINE_THISCALL_WRAPPER(StructuredTaskCollection__Schedule, 8) void __thiscall StructuredTaskCollection__Schedule(StructuredTaskCollection *this, UnrealizedChore *chore) { - FIXME("(%p %p): stub!\n", this, chore); + StructuredTaskCollectionChoresEntry *entry, *next; + + TRACE("(%p %p)\n", this, chore); + + entry = operator_new(sizeof(StructuredTaskCollectionChoresEntry)); + entry->chore = chore; + do + { + next = this->unk_chores; + entry->next = next; + } + while (InterlockedCompareExchangePointer((void *volatile *)&this->unk_chores, entry, next) != next); + this->count++; }
#endif /* _MSVCR_VER >= 100 */