summaryrefslogtreecommitdiffstats
path: root/src/VBox/Runtime/common/ioqueue
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 14:19:18 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 14:19:18 +0000
commit4035b1bfb1e5843a539a8b624d21952b756974d1 (patch)
treef1e9cd5bf548cbc57ff2fddfb2b4aa9ae95587e2 /src/VBox/Runtime/common/ioqueue
parentInitial commit. (diff)
downloadvirtualbox-upstream.tar.xz
virtualbox-upstream.zip
Adding upstream version 6.1.22-dfsg.upstream/6.1.22-dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/VBox/Runtime/common/ioqueue')
-rw-r--r--src/VBox/Runtime/common/ioqueue/ioqueue-aiofile-provider.cpp328
-rw-r--r--src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp539
-rw-r--r--src/VBox/Runtime/common/ioqueue/ioqueuebase.cpp287
3 files changed, 1154 insertions, 0 deletions
diff --git a/src/VBox/Runtime/common/ioqueue/ioqueue-aiofile-provider.cpp b/src/VBox/Runtime/common/ioqueue/ioqueue-aiofile-provider.cpp
new file mode 100644
index 00000000..c4efa38d
--- /dev/null
+++ b/src/VBox/Runtime/common/ioqueue/ioqueue-aiofile-provider.cpp
@@ -0,0 +1,328 @@
+/* $Id: ioqueue-aiofile-provider.cpp $ */
+/** @file
+ * IPRT - I/O queue, Async I/O file provider.
+ */
+
+/*
+ * Copyright (C) 2019-2020 Oracle Corporation
+ *
+ * This file is part of VirtualBox Open Source Edition (OSE), as
+ * available from http://www.virtualbox.org. This file is free software;
+ * you can redistribute it and/or modify it under the terms of the GNU
+ * General Public License (GPL) as published by the Free Software
+ * Foundation, in version 2 as it comes in the "COPYING" file of the
+ * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
+ * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
+ *
+ * The contents of this file may alternatively be used under the terms
+ * of the Common Development and Distribution License Version 1.0
+ * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
+ * VirtualBox OSE distribution, in which case the provisions of the
+ * CDDL are applicable instead of those of the GPL.
+ *
+ * You may elect to license modified versions of this file under the
+ * terms and conditions of either the GPL or the CDDL or both.
+ */
+
+
+/*********************************************************************************************************************************
+* Header Files *
+*********************************************************************************************************************************/
+#define LOG_GROUP RTLOGGROUP_IOQUEUE
+#include <iprt/ioqueue.h>
+
+#include <iprt/asm.h>
+#include <iprt/errcore.h>
+#include <iprt/file.h>
+#include <iprt/log.h>
+#include <iprt/mem.h>
+#include <iprt/semaphore.h>
+#include <iprt/string.h>
+
+#include "internal/ioqueue.h"
+
+
+/*********************************************************************************************************************************
+* Structures and Typedefs *
+*********************************************************************************************************************************/
+
+
+/**
+ * Internal I/O queue provider instance data.
+ */
+typedef struct RTIOQUEUEPROVINT
+{
+ /** The async I/O context handle. */
+ RTFILEAIOCTX hAioCtx;
+ /** Pointer to the array of requests waiting for commit. */
+ PRTFILEAIOREQ pahReqsToCommit;
+ /** Maximum number of requests to wait for commit.. */
+ size_t cReqsToCommitMax;
+ /** Number of requests waiting for commit. */
+ uint32_t cReqsToCommit;
+ /** Array of free cached request handles. */
+ PRTFILEAIOREQ pahReqsFree;
+ /** Maximum number of cached requests. */
+ uint32_t cReqsFreeMax;
+ /** Number of free cached requests. */
+ volatile uint32_t cReqsFree;
+} RTIOQUEUEPROVINT;
+/** Pointer to the internal I/O queue provider instance data. */
+typedef RTIOQUEUEPROVINT *PRTIOQUEUEPROVINT;
+
+
+/*********************************************************************************************************************************
+* Internal Functions *
+*********************************************************************************************************************************/
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnIsSupported} */
+static DECLCALLBACK(bool) rtIoQueueAioFileProv_IsSupported(void)
+{
+ /* The common code/public API already checked for the proper handle type. */
+ /** @todo Check that the file was opened with async I/O enabled on some platforms? */
+ return true;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueInit} */
+static DECLCALLBACK(int) rtIoQueueAioFileProv_QueueInit(RTIOQUEUEPROV hIoQueueProv, uint32_t fFlags,
+ uint32_t cSqEntries, uint32_t cCqEntries)
+{
+ RT_NOREF(fFlags, cCqEntries);
+
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+ int rc = VINF_SUCCESS;
+
+ pThis->cReqsToCommitMax = cSqEntries;
+ pThis->cReqsFreeMax = cSqEntries;
+ pThis->cReqsFree = 0;
+
+ pThis->pahReqsToCommit = (PRTFILEAIOREQ)RTMemAllocZ(cSqEntries * sizeof(PRTFILEAIOREQ));
+ if (RT_LIKELY(pThis->pahReqsToCommit))
+ {
+ pThis->pahReqsFree = (PRTFILEAIOREQ)RTMemAllocZ(cSqEntries * sizeof(PRTFILEAIOREQ));
+ if (RT_LIKELY(pThis->pahReqsFree))
+ {
+ rc = RTFileAioCtxCreate(&pThis->hAioCtx, cSqEntries, RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS);
+ if (RT_SUCCESS(rc))
+ return VINF_SUCCESS;
+
+ RTMemFree(pThis->pahReqsFree);
+ }
+ else
+ rc = VERR_NO_MEMORY;
+
+ RTMemFree(pThis->pahReqsToCommit);
+ }
+ else
+ rc = VERR_NO_MEMORY;
+
+ return rc;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueDestroy} */
+static DECLCALLBACK(void) rtIoQueueAioFileProv_QueueDestroy(RTIOQUEUEPROV hIoQueueProv)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ RTFileAioCtxDestroy(pThis->hAioCtx);
+ RTMemFree(pThis->pahReqsFree);
+ RTMemFree(pThis->pahReqsToCommit);
+ RT_BZERO(pThis, sizeof(*pThis));
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleRegister} */
+static DECLCALLBACK(int) rtIoQueueAioFileProv_HandleRegister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ return RTFileAioCtxAssociateWithFile(pThis->hAioCtx, pHandle->u.hFile);
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleDeregister} */
+static DECLCALLBACK(int) rtIoQueueAioFileProv_HandleDeregister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
+{
+ RT_NOREF(hIoQueueProv, pHandle);
+
+ /** @todo For Windows there doesn't seem to be a way to deregister the file handle without reopening the file,
+ *.for all other hosts this is a nop, just like the register method.
+ */
+ return VINF_SUCCESS;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepare} */
+static DECLCALLBACK(int) rtIoQueueAioFileProv_ReqPrepare(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
+ uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags,
+ void *pvUser)
+{
+ RT_NOREF(fReqFlags);
+
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ /* Try to grab a free request structure from the cache. */
+ RTFILEAIOREQ hReq = NIL_RTFILEAIOREQ;
+ int rc = VINF_SUCCESS;
+ uint32_t cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree);
+ if (cReqsFree)
+ {
+ do
+ {
+ cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree);
+ hReq = pThis->pahReqsFree[pThis->cReqsFree - 1];
+ } while (!ASMAtomicCmpXchgU32(&pThis->cReqsFree, cReqsFree - 1, cReqsFree));
+ }
+ else
+ rc = RTFileAioReqCreate(&hReq);
+
+ if (RT_SUCCESS(rc))
+ {
+ switch (enmOp)
+ {
+ case RTIOQUEUEOP_READ:
+ rc = RTFileAioReqPrepareRead(hReq, pHandle->u.hFile, (RTFOFF)off, pvBuf, cbBuf, pvUser);
+ break;
+ case RTIOQUEUEOP_WRITE:
+ rc = RTFileAioReqPrepareWrite(hReq, pHandle->u.hFile, (RTFOFF)off, pvBuf, cbBuf, pvUser);
+ break;
+ case RTIOQUEUEOP_SYNC:
+ rc = RTFileAioReqPrepareFlush(hReq, pHandle->u.hFile, pvUser);
+ break;
+ default:
+ AssertMsgFailedReturn(("Invalid I/O queue operation: %d\n", enmOp), VERR_INTERNAL_ERROR);
+ }
+
+ if (RT_SUCCESS(rc))
+ pThis->pahReqsToCommit[pThis->cReqsToCommit++] = hReq;
+ else
+ {
+ int rc2 = RTFileAioReqDestroy(hReq);
+ Assert(rc2); RT_NOREF(rc2);
+ }
+ }
+
+ return rc;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnCommit} */
+static DECLCALLBACK(int) rtIoQueueAioFileProv_Commit(RTIOQUEUEPROV hIoQueueProv, uint32_t *pcReqsCommitted)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ int rc = RTFileAioCtxSubmit(pThis->hAioCtx, pThis->pahReqsToCommit, pThis->cReqsToCommit);
+ if (RT_SUCCESS(rc))
+ {
+ *pcReqsCommitted = pThis->cReqsToCommit;
+ pThis->cReqsToCommit = 0;
+ }
+
+ return rc;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWait} */
+static DECLCALLBACK(int) rtIoQueueAioFileProv_EvtWait(RTIOQUEUEPROV hIoQueueProv, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt,
+ uint32_t cMinWait, uint32_t *pcCEvt, uint32_t fFlags)
+{
+ RT_NOREF(fFlags);
+
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+ int rc = VINF_SUCCESS;
+ uint32_t idxCEvt = 0;
+
+ while ( RT_SUCCESS(rc)
+ && cMinWait
+ && cCEvt)
+ {
+ RTFILEAIOREQ ahReqs[64];
+ uint32_t cReqsCompleted = 0;
+
+ rc = RTFileAioCtxWait(pThis->hAioCtx, cMinWait, RT_INDEFINITE_WAIT,
+ &ahReqs[0], RT_MIN(RT_ELEMENTS(ahReqs), cCEvt), &cReqsCompleted);
+ if (RT_SUCCESS(rc))
+ {
+ for (unsigned i = 0; i < cReqsCompleted; i++)
+ {
+ RTFILEAIOREQ hReq = ahReqs[i];
+
+ paCEvt[idxCEvt].rcReq = RTFileAioReqGetRC(hReq, &paCEvt[idxCEvt].cbXfered);
+ paCEvt[idxCEvt].pvUser = RTFileAioReqGetUser(hReq);
+ idxCEvt++;
+
+ /* Try to insert the free request into the cache. */
+ uint32_t cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree);
+ if (cReqsFree < pThis->cReqsFreeMax)
+ {
+ do
+ {
+ cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree);
+ pThis->pahReqsFree[pThis->cReqsFree] = hReq;
+ } while (!ASMAtomicCmpXchgU32(&pThis->cReqsFree, cReqsFree + 1, cReqsFree));
+ }
+ else
+ rc = RTFileAioReqDestroy(hReq);
+ }
+
+ cCEvt -= cReqsCompleted;
+ cMinWait -= RT_MIN(cMinWait, cReqsCompleted);
+ }
+ }
+
+ *pcCEvt = idxCEvt;
+ return rc;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWaitWakeup} */
+static DECLCALLBACK(int) rtIoQueueAioFileProv_EvtWaitWakeup(RTIOQUEUEPROV hIoQueueProv)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ return RTFileAioCtxWakeup(pThis->hAioCtx);
+}
+
+
+/**
+ * Async file I/O queue provider virtual method table.
+ */
+RT_DECL_DATA_CONST(RTIOQUEUEPROVVTABLE const) g_RTIoQueueAioFileProv =
+{
+ /** uVersion */
+ RTIOQUEUEPROVVTABLE_VERSION,
+ /** pszId */
+ "AioFile",
+ /** cbIoQueueProv */
+ sizeof(RTIOQUEUEPROVINT),
+ /** enmHnd */
+ RTHANDLETYPE_FILE,
+ /** fFlags */
+ 0,
+ /** pfnIsSupported */
+ rtIoQueueAioFileProv_IsSupported,
+ /** pfnQueueInit */
+ rtIoQueueAioFileProv_QueueInit,
+ /** pfnQueueDestroy */
+ rtIoQueueAioFileProv_QueueDestroy,
+ /** pfnHandleRegister */
+ rtIoQueueAioFileProv_HandleRegister,
+ /** pfnHandleDeregister */
+ rtIoQueueAioFileProv_HandleDeregister,
+ /** pfnReqPrepare */
+ rtIoQueueAioFileProv_ReqPrepare,
+ /** pfnReqPrepareSg */
+ NULL,
+ /** pfnCommit */
+ rtIoQueueAioFileProv_Commit,
+ /** pfnEvtWait */
+ rtIoQueueAioFileProv_EvtWait,
+ /** pfnEvtWaitWakeup */
+ rtIoQueueAioFileProv_EvtWaitWakeup,
+ /** uEndMarker */
+ RTIOQUEUEPROVVTABLE_VERSION
+};
+
diff --git a/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp b/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp
new file mode 100644
index 00000000..826fdabc
--- /dev/null
+++ b/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp
@@ -0,0 +1,539 @@
+/* $Id: ioqueue-stdfile-provider.cpp $ */
+/** @file
+ * IPRT - I/O queue, Standard file provider.
+ */
+
+/*
+ * Copyright (C) 2019-2020 Oracle Corporation
+ *
+ * This file is part of VirtualBox Open Source Edition (OSE), as
+ * available from http://www.virtualbox.org. This file is free software;
+ * you can redistribute it and/or modify it under the terms of the GNU
+ * General Public License (GPL) as published by the Free Software
+ * Foundation, in version 2 as it comes in the "COPYING" file of the
+ * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
+ * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
+ *
+ * The contents of this file may alternatively be used under the terms
+ * of the Common Development and Distribution License Version 1.0
+ * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
+ * VirtualBox OSE distribution, in which case the provisions of the
+ * CDDL are applicable instead of those of the GPL.
+ *
+ * You may elect to license modified versions of this file under the
+ * terms and conditions of either the GPL or the CDDL or both.
+ */
+
+
+/*********************************************************************************************************************************
+* Header Files *
+*********************************************************************************************************************************/
+#define LOG_GROUP RTLOGGROUP_IOQUEUE
+#include <iprt/ioqueue.h>
+
+#include <iprt/asm.h>
+#include <iprt/errcore.h>
+#include <iprt/file.h>
+#include <iprt/log.h>
+#include <iprt/mem.h>
+#include <iprt/semaphore.h>
+#include <iprt/string.h>
+#include <iprt/thread.h>
+
+#include "internal/ioqueue.h"
+
+
+/*********************************************************************************************************************************
+* Defined Constants And Macros *
+*********************************************************************************************************************************/
+
+/** The I/O queue worker thread needs to wake up the waiting thread when requests completed. */
+#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP RT_BIT(0)
+/** The waiting thread was interrupted by the external wakeup call. */
+#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR RT_BIT(1)
+#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR_BIT 1
+/** The I/O queue worker thread needs to be woken up to process new requests. */
+#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP RT_BIT(2)
+#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT 2
+
+
+/*********************************************************************************************************************************
+* Structures and Typedefs *
+*********************************************************************************************************************************/
+
+
+/**
+ * Submission queue entry.
+ */
+typedef struct RTIOQUEUESSQENTRY
+{
+ /** The file to work on. */
+ RTFILE hFile;
+ /** I/O operation. */
+ RTIOQUEUEOP enmOp;
+ /** Start offset. */
+ uint64_t off;
+ /** Additional request flags. */
+ uint32_t fReqFlags;
+ /** Size of the request. */
+ size_t cbReq;
+ /** Opaque user data passed on completion. */
+ void *pvUser;
+ /** Flag whether this is a S/G or standard request. */
+ bool fSg;
+ /** Type dependent data. */
+ union
+ {
+ /** Pointer to buffer for non S/G requests. */
+ void *pvBuf;
+ /** Pointer to S/G buffer. */
+ PCRTSGBUF pSgBuf;
+ } u;
+} RTIOQUEUESSQENTRY;
+/** Pointer to a submission queue entry. */
+typedef RTIOQUEUESSQENTRY *PRTIOQUEUESSQENTRY;
+/** Pointer to a constant submission queue entry. */
+typedef const RTIOQUEUESSQENTRY *PCRTIOQUEUESSQENTRY;
+
+
+/**
+ * Internal I/O queue provider instance data.
+ */
+typedef struct RTIOQUEUEPROVINT
+{
+ /** Size of the submission queue in entries. */
+ uint32_t cSqEntries;
+ /** Size of the completion queue in entries. */
+ uint32_t cCqEntries;
+ /** Pointer to the submission queue base. */
+ PRTIOQUEUESSQENTRY paSqEntryBase;
+ /** Submission queue producer index. */
+ volatile uint32_t idxSqProd;
+ /** Submission queue producer value for any uncommitted requests. */
+ uint32_t idxSqProdUncommit;
+ /** Submission queue consumer index. */
+ volatile uint32_t idxSqCons;
+ /** Pointer to the completion queue base. */
+ PRTIOQUEUECEVT paCqEntryBase;
+ /** Completion queue producer index. */
+ volatile uint32_t idxCqProd;
+ /** Completion queue consumer index. */
+ volatile uint32_t idxCqCons;
+ /** Various state flags for synchronizing the worker thread with other participants. */
+ volatile uint32_t fState;
+ /** The worker thread handle. */
+ RTTHREAD hThrdWork;
+ /** Event semaphore the worker thread waits on for work. */
+ RTSEMEVENT hSemEvtWorker;
+ /** Event semaphore the caller waits for completion events. */
+ RTSEMEVENT hSemEvtWaitEvts;
+ /** Flag whether to shutdown the worker thread. */
+ volatile bool fShutdown;
+} RTIOQUEUEPROVINT;
+/** Pointer to the internal I/O queue provider instance data. */
+typedef RTIOQUEUEPROVINT *PRTIOQUEUEPROVINT;
+
+
+/*********************************************************************************************************************************
+* Internal Functions *
+*********************************************************************************************************************************/
+
+
+/**
+ * Processes the given submission queue entry and reports back the result in the completion queue.
+ *
+ * @returns nothing.
+ * @param pSqEntry The submission queue entry to process.
+ * @param pCqEntry The comppletion queue entry to store the result in.
+ */
+static void rtIoQueueStdFileProv_SqEntryProcess(PCRTIOQUEUESSQENTRY pSqEntry, PRTIOQUEUECEVT pCqEntry)
+{
+ int rcReq = VINF_SUCCESS;
+
+ switch (pSqEntry->enmOp)
+ {
+ case RTIOQUEUEOP_READ:
+ if (!pSqEntry->fSg)
+ rcReq = RTFileReadAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL);
+ else
+ {
+ RTSGBUF SgBuf;
+ RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf);
+ rcReq = RTFileSgReadAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL);
+ }
+ break;
+ case RTIOQUEUEOP_WRITE:
+ if (!pSqEntry->fSg)
+ rcReq = RTFileWriteAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL);
+ else
+ {
+ RTSGBUF SgBuf;
+ RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf);
+ rcReq = RTFileSgWriteAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL);
+ }
+ break;
+ case RTIOQUEUEOP_SYNC:
+ rcReq = RTFileFlush(pSqEntry->hFile);
+ break;
+ default:
+ AssertMsgFailedReturnVoid(("Invalid I/O queue operation: %d\n", pSqEntry->enmOp));
+ }
+
+ /* Write the result back into the completion queue. */
+ pCqEntry->rcReq = rcReq;
+ pCqEntry->pvUser = pSqEntry->pvUser;
+ pCqEntry->cbXfered = RT_SUCCESS(rcReq) ? pSqEntry->cbReq : 0;
+}
+
+
+/**
+ * The main I/O queue worker loop which processes the incoming I/O requests.
+ */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_WorkerLoop(RTTHREAD hThrdSelf, void *pvUser)
+{
+ PRTIOQUEUEPROVINT pThis = (PRTIOQUEUEPROVINT)pvUser;
+
+ /* Signal that we started up. */
+ int rc = RTThreadUserSignal(hThrdSelf);
+ AssertRC(rc);
+
+ while (!ASMAtomicReadBool(&pThis->fShutdown))
+ {
+ /* Wait for some work. */
+ ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP);
+ uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
+ uint32_t idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
+ uint32_t idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
+
+ if (idxSqCons == idxSqProd)
+ {
+ rc = RTSemEventWait(pThis->hSemEvtWorker, RT_INDEFINITE_WAIT);
+ AssertRC(rc);
+
+ idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
+ idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons);
+ idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
+ }
+
+ ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT);
+
+ /* Process all requests. */
+ uint32_t cCqFree = 0;
+ if (idxCqCons > pThis->idxCqProd)
+ cCqFree = pThis->cCqEntries - (pThis->cCqEntries - idxCqCons) - pThis->idxCqProd;
+ else
+ cCqFree = pThis->cCqEntries - pThis->idxCqProd - idxCqCons;
+ do
+ {
+ while ( idxSqCons != idxSqProd
+ && cCqFree)
+ {
+ PCRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqCons];
+ PRTIOQUEUECEVT pCqEntry = &pThis->paCqEntryBase[pThis->idxCqProd];
+
+ rtIoQueueStdFileProv_SqEntryProcess(pSqEntry, pCqEntry);
+ ASMWriteFence();
+
+ idxSqCons = (idxSqCons + 1) % pThis->cSqEntries;
+ cCqFree--;
+ pThis->idxCqProd = (pThis->idxCqProd + 1) % pThis->cCqEntries;
+ ASMAtomicWriteU32(&pThis->idxSqCons, idxSqCons);
+ ASMWriteFence();
+ if (ASMAtomicReadU32(&pThis->fState) & RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP)
+ {
+ rc = RTSemEventSignal(pThis->hSemEvtWaitEvts);
+ AssertRC(rc);
+ }
+ }
+
+ idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd);
+ } while ( idxSqCons != idxSqProd
+ && cCqFree);
+ }
+
+ return VINF_SUCCESS;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnIsSupported} */
+static DECLCALLBACK(bool) rtIoQueueStdFileProv_IsSupported(void)
+{
+ /* The common code/public API already checked for the proper handle type. */
+ return true;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueInit} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_QueueInit(RTIOQUEUEPROV hIoQueueProv, uint32_t fFlags,
+ uint32_t cSqEntries, uint32_t cCqEntries)
+{
+ RT_NOREF(fFlags);
+
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+ int rc = VINF_SUCCESS;
+
+ cSqEntries++;
+ cCqEntries++;
+
+ pThis->cSqEntries = cSqEntries;
+ pThis->cCqEntries = cCqEntries;
+ pThis->idxSqProd = 0;
+ pThis->idxSqProdUncommit = 0;
+ pThis->idxSqCons = 0;
+ pThis->idxCqProd = 0;
+ pThis->idxCqCons = 0;
+ pThis->fShutdown = false;
+ pThis->fState = 0;
+
+ pThis->paSqEntryBase = (PRTIOQUEUESSQENTRY)RTMemAllocZ(cSqEntries * sizeof(RTIOQUEUESSQENTRY));
+ if (RT_LIKELY(pThis->paSqEntryBase))
+ {
+ pThis->paCqEntryBase = (PRTIOQUEUECEVT)RTMemAllocZ(cCqEntries * sizeof(RTIOQUEUECEVT));
+ if (RT_LIKELY(pThis->paSqEntryBase))
+ {
+ rc = RTSemEventCreate(&pThis->hSemEvtWorker);
+ if (RT_SUCCESS(rc))
+ {
+ rc = RTSemEventCreate(&pThis->hSemEvtWaitEvts);
+ if (RT_SUCCESS(rc))
+ {
+ /* Spin up the worker thread. */
+ rc = RTThreadCreate(&pThis->hThrdWork, rtIoQueueStdFileProv_WorkerLoop, pThis, 0, RTTHREADTYPE_IO, RTTHREADFLAGS_WAITABLE,
+ "IoQ-StdFile");
+ if (RT_SUCCESS(rc))
+ {
+ rc = RTThreadUserWait(pThis->hThrdWork, 10 * RT_MS_1SEC);
+ AssertRC(rc);
+
+ return VINF_SUCCESS;
+ }
+
+ RTSemEventDestroy(pThis->hSemEvtWaitEvts);
+ }
+
+ RTSemEventDestroy(pThis->hSemEvtWorker);
+ }
+
+ RTMemFree(pThis->paCqEntryBase);
+ }
+ else
+ rc = VERR_NO_MEMORY;
+
+ RTMemFree(pThis->paSqEntryBase);
+ }
+ else
+ rc = VERR_NO_MEMORY;
+
+ return rc;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueDestroy} */
+static DECLCALLBACK(void) rtIoQueueStdFileProv_QueueDestroy(RTIOQUEUEPROV hIoQueueProv)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ ASMAtomicXchgBool(&pThis->fShutdown, true);
+ RTSemEventSignal(pThis->hSemEvtWorker);
+
+ int rc = RTThreadWait(pThis->hThrdWork, 60 * RT_MS_1SEC, NULL);
+ AssertRC(rc);
+
+ RTSemEventDestroy(pThis->hSemEvtWaitEvts);
+ RTSemEventDestroy(pThis->hSemEvtWorker);
+ RTMemFree(pThis->paCqEntryBase);
+ RTMemFree(pThis->paSqEntryBase);
+ RT_BZERO(pThis, sizeof(*pThis));
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleRegister} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleRegister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
+{
+ RT_NOREF(hIoQueueProv, pHandle);
+
+ /* Nothing to do here. */
+ return VINF_SUCCESS;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleDeregister} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleDeregister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle)
+{
+ RT_NOREF(hIoQueueProv, pHandle);
+
+ /* Nothing to do here. */
+ return VINF_SUCCESS;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepare} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepare(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
+ uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags,
+ void *pvUser)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+ PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[pThis->idxSqProdUncommit];
+
+ pSqEntry->hFile = pHandle->u.hFile;
+ pSqEntry->enmOp = enmOp;
+ pSqEntry->off = off;
+ pSqEntry->fReqFlags = fReqFlags;
+ pSqEntry->cbReq = cbBuf;
+ pSqEntry->pvUser = pvUser;
+ pSqEntry->fSg = false;
+ pSqEntry->u.pvBuf = pvBuf;
+
+ pThis->idxSqProdUncommit = (pThis->idxSqProdUncommit + 1) % pThis->cSqEntries;
+ return VINF_SUCCESS;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepareSg} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepareSg(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
+ uint64_t off, PCRTSGBUF pSgBuf, size_t cbSg, uint32_t fReqFlags,
+ void *pvUser)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+ PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[pThis->idxSqProdUncommit];
+
+ pSqEntry->hFile = pHandle->u.hFile;
+ pSqEntry->enmOp = enmOp;
+ pSqEntry->off = off;
+ pSqEntry->fReqFlags = fReqFlags;
+ pSqEntry->cbReq = cbSg;
+ pSqEntry->pvUser = pvUser;
+ pSqEntry->fSg = true;
+ pSqEntry->u.pSgBuf = pSgBuf;
+
+ pThis->idxSqProdUncommit = (pThis->idxSqProdUncommit + 1) % pThis->cSqEntries;
+ return VINF_SUCCESS;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnCommit} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_Commit(RTIOQUEUEPROV hIoQueueProv, uint32_t *pcReqsCommitted)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ if (pThis->idxSqProd > pThis->idxSqProdUncommit)
+ *pcReqsCommitted = pThis->cSqEntries - pThis->idxSqProd + pThis->idxSqProdUncommit;
+ else
+ *pcReqsCommitted = pThis->idxSqProdUncommit - pThis->idxSqProd;
+
+ ASMWriteFence();
+ ASMAtomicWriteU32(&pThis->idxSqProd, pThis->idxSqProdUncommit);
+ return RTSemEventSignal(pThis->hSemEvtWorker);
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWait} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWait(RTIOQUEUEPROV hIoQueueProv, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt,
+ uint32_t cMinWait, uint32_t *pcCEvt, uint32_t fFlags)
+{
+ RT_NOREF(fFlags);
+
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+ int rc = VINF_SUCCESS;
+ uint32_t idxCEvt = 0;
+
+ while ( RT_SUCCESS(rc)
+ && cMinWait
+ && cCEvt)
+ {
+ ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP);
+ uint32_t idxCqProd = ASMAtomicReadU32(&pThis->idxCqProd);
+ uint32_t idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
+
+ if (idxCqCons == idxCqProd)
+ {
+ rc = RTSemEventWait(pThis->hSemEvtWaitEvts, RT_INDEFINITE_WAIT);
+ AssertRC(rc);
+ if (ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR_BIT))
+ {
+ rc = VERR_INTERRUPTED;
+ ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT);
+ break;
+ }
+
+ idxCqProd = ASMAtomicReadU32(&pThis->idxCqProd);
+ idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons);
+ }
+
+ ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT);
+
+ /* Process all requests. */
+ while ( idxCqCons != idxCqProd
+ && cCEvt)
+ {
+ PRTIOQUEUECEVT pCqEntry = &pThis->paCqEntryBase[idxCqCons];
+
+ paCEvt[idxCEvt].rcReq = pCqEntry->rcReq;
+ paCEvt[idxCEvt].pvUser = pCqEntry->pvUser;
+ paCEvt[idxCEvt].cbXfered = pCqEntry->cbXfered;
+ ASMReadFence();
+
+ idxCEvt++;
+ cCEvt--;
+ cMinWait--;
+
+ idxCqCons = (idxCqCons + 1) % pThis->cCqEntries;
+ pThis->idxCqCons = (pThis->idxCqCons + 1) % pThis->cCqEntries;
+ ASMWriteFence();
+ }
+ }
+
+ *pcCEvt = idxCEvt;
+ return rc;
+}
+
+
+/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWaitWakeup} */
+static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWaitWakeup(RTIOQUEUEPROV hIoQueueProv)
+{
+ PRTIOQUEUEPROVINT pThis = hIoQueueProv;
+
+ ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR);
+ return RTSemEventSignal(pThis->hSemEvtWaitEvts);
+}
+
+
+/**
+ * Standard file I/O queue provider virtual method table.
+ */
+RT_DECL_DATA_CONST(RTIOQUEUEPROVVTABLE const) g_RTIoQueueStdFileProv =
+{
+ /** uVersion */
+ RTIOQUEUEPROVVTABLE_VERSION,
+ /** pszId */
+ "StdFile",
+ /** cbIoQueueProv */
+ sizeof(RTIOQUEUEPROVINT),
+ /** enmHnd */
+ RTHANDLETYPE_FILE,
+ /** fFlags */
+ 0,
+ /** pfnIsSupported */
+ rtIoQueueStdFileProv_IsSupported,
+ /** pfnQueueInit */
+ rtIoQueueStdFileProv_QueueInit,
+ /** pfnQueueDestroy */
+ rtIoQueueStdFileProv_QueueDestroy,
+ /** pfnHandleRegister */
+ rtIoQueueStdFileProv_HandleRegister,
+ /** pfnHandleDeregister */
+ rtIoQueueStdFileProv_HandleDeregister,
+ /** pfnReqPrepare */
+ rtIoQueueStdFileProv_ReqPrepare,
+ /** pfnReqPrepareSg */
+ rtIoQueueStdFileProv_ReqPrepareSg,
+ /** pfnCommit */
+ rtIoQueueStdFileProv_Commit,
+ /** pfnEvtWait */
+ rtIoQueueStdFileProv_EvtWait,
+ /** pfnEvtWaitWakeup */
+ rtIoQueueStdFileProv_EvtWaitWakeup,
+ /** uEndMarker */
+ RTIOQUEUEPROVVTABLE_VERSION
+};
+
diff --git a/src/VBox/Runtime/common/ioqueue/ioqueuebase.cpp b/src/VBox/Runtime/common/ioqueue/ioqueuebase.cpp
new file mode 100644
index 00000000..28ecf1af
--- /dev/null
+++ b/src/VBox/Runtime/common/ioqueue/ioqueuebase.cpp
@@ -0,0 +1,287 @@
+/* $Id: ioqueuebase.cpp $ */
+/** @file
+ * IPRT - I/O queue, Base/Public API.
+ */
+
+/*
+ * Copyright (C) 2019-2020 Oracle Corporation
+ *
+ * This file is part of VirtualBox Open Source Edition (OSE), as
+ * available from http://www.virtualbox.org. This file is free software;
+ * you can redistribute it and/or modify it under the terms of the GNU
+ * General Public License (GPL) as published by the Free Software
+ * Foundation, in version 2 as it comes in the "COPYING" file of the
+ * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
+ * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
+ *
+ * The contents of this file may alternatively be used under the terms
+ * of the Common Development and Distribution License Version 1.0
+ * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
+ * VirtualBox OSE distribution, in which case the provisions of the
+ * CDDL are applicable instead of those of the GPL.
+ *
+ * You may elect to license modified versions of this file under the
+ * terms and conditions of either the GPL or the CDDL or both.
+ */
+
+
+/*********************************************************************************************************************************
+* Header Files *
+*********************************************************************************************************************************/
+#define LOG_GROUP RTLOGGROUP_IOQUEUE
+#include <iprt/ioqueue.h>
+
+#include <iprt/asm.h>
+#include <iprt/err.h>
+#include <iprt/log.h>
+#include <iprt/mem.h>
+#include <iprt/semaphore.h>
+#include <iprt/string.h>
+
+#include "internal/ioqueue.h"
+
+
+/*********************************************************************************************************************************
+* Defined Constants And Macros *
+*********************************************************************************************************************************/
+
+
+
+/*********************************************************************************************************************************
+* Structures and Typedefs *
+*********************************************************************************************************************************/
+
+/**
+ * Internal I/O queue instance data.
+ */
+typedef struct RTIOQUEUEINT
+{
+ /** Magic identifying the I/O queue structure. */
+ uint32_t u32Magic;
+ /** Pointer to the provider vtable. */
+ PCRTIOQUEUEPROVVTABLE pVTbl;
+ /** I/O queue provider instance handle. */
+ RTIOQUEUEPROV hIoQueueProv;
+ /** Maximum number of submission queue entries - constant. */
+ uint32_t cSqEntries;
+ /** Maximum number of completion queue entries - constant. */
+ uint32_t cCqEntries;
+ /** Number of currently committed and not completed requests. */
+ volatile uint32_t cReqsCommitted;
+ /** Number of prepared requests. */
+ volatile uint32_t cReqsPrepared;
+ /** Start of the provider specific instance data - vvariable in size. */
+ uint8_t abInst[1];
+} RTIOQUEUEINT;
+/** Pointer to the internal I/O queue instance data. */
+typedef RTIOQUEUEINT *PRTIOQUEUEINT;
+
+
+/*********************************************************************************************************************************
+* Global Variables *
+*********************************************************************************************************************************/
+/** Array of I/O queue providers we know about, order is important for each type.
+ * The best suited ones for each platform should come first.
+ */
+static PCRTIOQUEUEPROVVTABLE g_apIoQueueProviders[] =
+{
+#if defined(RT_OS_LINUX)
+ &g_RTIoQueueLnxIoURingProv,
+#endif
+#ifndef RT_OS_OS2
+ &g_RTIoQueueAioFileProv,
+#endif
+ &g_RTIoQueueStdFileProv
+};
+
+
+/*********************************************************************************************************************************
+* Internal Functions *
+*********************************************************************************************************************************/
+
+
+RTDECL(PCRTIOQUEUEPROVVTABLE) RTIoQueueProviderGetBestForHndType(RTHANDLETYPE enmHnd)
+{
+ /* Go through the array and pick the first supported provider for the given handle type. */
+ for (unsigned i = 0; i < RT_ELEMENTS(g_apIoQueueProviders); i++)
+ {
+ PCRTIOQUEUEPROVVTABLE pIoQueueProv = g_apIoQueueProviders[i];
+ if ( pIoQueueProv->enmHnd == enmHnd
+ && pIoQueueProv->pfnIsSupported())
+ return pIoQueueProv;
+ }
+
+ return NULL;
+}
+
+
+RTDECL(PCRTIOQUEUEPROVVTABLE) RTIoQueueProviderGetById(const char *pszId)
+{
+ for (unsigned i = 0; i < RT_ELEMENTS(g_apIoQueueProviders); i++)
+ {
+ PCRTIOQUEUEPROVVTABLE pIoQueueProv = g_apIoQueueProviders[i];
+ if (!strcmp(pIoQueueProv->pszId, pszId))
+ return pIoQueueProv;
+ }
+
+ return NULL;
+}
+
+
+RTDECL(int) RTIoQueueCreate(PRTIOQUEUE phIoQueue, PCRTIOQUEUEPROVVTABLE pProvVTable,
+ uint32_t fFlags, uint32_t cSqEntries, uint32_t cCqEntries)
+{
+ AssertPtrReturn(phIoQueue, VERR_INVALID_POINTER);
+ AssertPtrReturn(pProvVTable, VERR_INVALID_POINTER);
+ AssertReturn(!fFlags, VERR_INVALID_PARAMETER);
+ AssertReturn(cSqEntries > 0, VERR_INVALID_PARAMETER);
+ AssertReturn(cCqEntries > 0, VERR_INVALID_PARAMETER);
+
+ int rc = VINF_SUCCESS;
+ PRTIOQUEUEINT pThis = (PRTIOQUEUEINT)RTMemAllocZ(RT_UOFFSETOF_DYN(RTIOQUEUEINT, abInst[pProvVTable->cbIoQueueProv]));
+ if (RT_LIKELY(pThis))
+ {
+ pThis->pVTbl = pProvVTable;
+ pThis->hIoQueueProv = (RTIOQUEUEPROV)&pThis->abInst[0];
+ pThis->cSqEntries = cSqEntries;
+ pThis->cCqEntries = cCqEntries;
+ pThis->cReqsCommitted = 0;
+ pThis->cReqsPrepared = 0;
+
+ rc = pThis->pVTbl->pfnQueueInit(pThis->hIoQueueProv, fFlags, cSqEntries, cCqEntries);
+ if (RT_SUCCESS(rc))
+ {
+ *phIoQueue = pThis;
+ return VINF_SUCCESS;
+ }
+
+ RTMemFree(pThis);
+ }
+ else
+ rc = VERR_NO_MEMORY;
+
+ return rc;
+}
+
+
+RTDECL(int) RTIoQueueDestroy(RTIOQUEUE hIoQueue)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+ AssertReturn(ASMAtomicReadU32(&pThis->cReqsCommitted) == 0, VERR_IOQUEUE_BUSY);
+
+ pThis->pVTbl->pfnQueueDestroy(pThis->hIoQueueProv);
+ RTMemFree(pThis);
+ return VINF_SUCCESS;
+}
+
+
+RTDECL(int) RTIoQueueHandleRegister(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+
+ /** @todo Efficiently check that handle wasn't registered previously. */
+ return pThis->pVTbl->pfnHandleRegister(pThis->hIoQueueProv, pHandle);
+}
+
+
+RTDECL(int) RTIoQueueHandleDeregister(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+
+ /** @todo Efficiently check that handle was registered previously. */
+ return pThis->pVTbl->pfnHandleDeregister(pThis->hIoQueueProv, pHandle);
+}
+
+
+RTDECL(int) RTIoQueueRequestPrepare(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
+ uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags,
+ void *pvUser)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+ AssertReturn(pHandle->enmType == pThis->pVTbl->enmHnd, VERR_INVALID_HANDLE);
+
+ /** @todo Efficiently check that handle was registered previously. */
+ int rc = pThis->pVTbl->pfnReqPrepare(pThis->hIoQueueProv, pHandle, enmOp, off, pvBuf, cbBuf,
+ fReqFlags, pvUser);
+ if (RT_SUCCESS(rc))
+ ASMAtomicIncU32(&pThis->cReqsPrepared);
+
+ return rc;
+}
+
+
+RTDECL(int) RTIoQueueRequestPrepareSg(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp,
+ uint64_t off, PCRTSGBUF pSgBuf, size_t cbSg, uint32_t fReqFlags,
+ void *pvUser)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+ AssertReturn(pHandle->enmType == pThis->pVTbl->enmHnd, VERR_INVALID_HANDLE);
+
+ /** @todo Efficiently check that handle was registered previously. */
+ int rc = pThis->pVTbl->pfnReqPrepareSg(pThis->hIoQueueProv, pHandle, enmOp, off, pSgBuf, cbSg,
+ fReqFlags, pvUser);
+ if (RT_SUCCESS(rc))
+ ASMAtomicIncU32(&pThis->cReqsPrepared);
+
+ return rc;
+}
+
+
+RTDECL(int) RTIoQueueCommit(RTIOQUEUE hIoQueue)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+ AssertReturn(ASMAtomicReadU32(&pThis->cReqsPrepared) > 0, VERR_IOQUEUE_EMPTY);
+
+ uint32_t cReqsPreparedOld = 0;
+ uint32_t cReqsCommitted = 0;
+ int rc = VINF_SUCCESS;
+ do
+ {
+ rc = pThis->pVTbl->pfnCommit(pThis->hIoQueueProv, &cReqsCommitted);
+ if (RT_SUCCESS(rc))
+ {
+ ASMAtomicAddU32(&pThis->cReqsCommitted, cReqsCommitted);
+ cReqsPreparedOld = ASMAtomicSubU32(&pThis->cReqsPrepared, cReqsCommitted);
+ }
+ } while (RT_SUCCESS(rc) && cReqsPreparedOld - cReqsCommitted > 0);
+
+ return rc;
+}
+
+
+RTDECL(int) RTIoQueueEvtWait(RTIOQUEUE hIoQueue, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt, uint32_t cMinWait,
+ uint32_t *pcCEvt, uint32_t fFlags)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+ AssertPtrReturn(paCEvt, VERR_INVALID_POINTER);
+ AssertReturn(cCEvt > 0, VERR_INVALID_PARAMETER);
+ AssertReturn(cMinWait > 0, VERR_INVALID_PARAMETER);
+ AssertPtrReturn(pcCEvt, VERR_INVALID_POINTER);
+ AssertReturn(!fFlags, VERR_INVALID_PARAMETER);
+ AssertReturn(ASMAtomicReadU32(&pThis->cReqsCommitted) > 0, VERR_IOQUEUE_EMPTY);
+
+ *pcCEvt = 0;
+ int rc = pThis->pVTbl->pfnEvtWait(pThis->hIoQueueProv, paCEvt, cCEvt, cMinWait, pcCEvt, fFlags);
+ if ( (RT_SUCCESS(rc) || rc == VERR_INTERRUPTED)
+ && *pcCEvt > 0)
+ ASMAtomicSubU32(&pThis->cReqsCommitted, *pcCEvt);
+
+ return rc;
+}
+
+
+RTDECL(int) RTIoQueueEvtWaitWakeup(RTIOQUEUE hIoQueue)
+{
+ PRTIOQUEUEINT pThis = hIoQueue;
+ AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
+
+ return pThis->pVTbl->pfnEvtWaitWakeup(pThis->hIoQueueProv);
+}
+