diff options
Diffstat (limited to 'src/VBox/Runtime/common/ioqueue')
3 files changed, 1154 insertions, 0 deletions
diff --git a/src/VBox/Runtime/common/ioqueue/ioqueue-aiofile-provider.cpp b/src/VBox/Runtime/common/ioqueue/ioqueue-aiofile-provider.cpp new file mode 100644 index 00000000..c4efa38d --- /dev/null +++ b/src/VBox/Runtime/common/ioqueue/ioqueue-aiofile-provider.cpp @@ -0,0 +1,328 @@ +/* $Id: ioqueue-aiofile-provider.cpp $ */ +/** @file + * IPRT - I/O queue, Async I/O file provider. + */ + +/* + * Copyright (C) 2019-2020 Oracle Corporation + * + * This file is part of VirtualBox Open Source Edition (OSE), as + * available from http://www.virtualbox.org. This file is free software; + * you can redistribute it and/or modify it under the terms of the GNU + * General Public License (GPL) as published by the Free Software + * Foundation, in version 2 as it comes in the "COPYING" file of the + * VirtualBox OSE distribution. VirtualBox OSE is distributed in the + * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. + * + * The contents of this file may alternatively be used under the terms + * of the Common Development and Distribution License Version 1.0 + * (CDDL) only, as it comes in the "COPYING.CDDL" file of the + * VirtualBox OSE distribution, in which case the provisions of the + * CDDL are applicable instead of those of the GPL. + * + * You may elect to license modified versions of this file under the + * terms and conditions of either the GPL or the CDDL or both. + */ + + +/********************************************************************************************************************************* +* Header Files * +*********************************************************************************************************************************/ +#define LOG_GROUP RTLOGGROUP_IOQUEUE +#include <iprt/ioqueue.h> + +#include <iprt/asm.h> +#include <iprt/errcore.h> +#include <iprt/file.h> +#include <iprt/log.h> +#include <iprt/mem.h> +#include <iprt/semaphore.h> +#include <iprt/string.h> + +#include "internal/ioqueue.h" + + +/********************************************************************************************************************************* +* Structures and Typedefs * +*********************************************************************************************************************************/ + + +/** + * Internal I/O queue provider instance data. + */ +typedef struct RTIOQUEUEPROVINT +{ + /** The async I/O context handle. */ + RTFILEAIOCTX hAioCtx; + /** Pointer to the array of requests waiting for commit. */ + PRTFILEAIOREQ pahReqsToCommit; + /** Maximum number of requests to wait for commit.. */ + size_t cReqsToCommitMax; + /** Number of requests waiting for commit. */ + uint32_t cReqsToCommit; + /** Array of free cached request handles. */ + PRTFILEAIOREQ pahReqsFree; + /** Maximum number of cached requests. */ + uint32_t cReqsFreeMax; + /** Number of free cached requests. */ + volatile uint32_t cReqsFree; +} RTIOQUEUEPROVINT; +/** Pointer to the internal I/O queue provider instance data. */ +typedef RTIOQUEUEPROVINT *PRTIOQUEUEPROVINT; + + +/********************************************************************************************************************************* +* Internal Functions * +*********************************************************************************************************************************/ + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnIsSupported} */ +static DECLCALLBACK(bool) rtIoQueueAioFileProv_IsSupported(void) +{ + /* The common code/public API already checked for the proper handle type. */ + /** @todo Check that the file was opened with async I/O enabled on some platforms? */ + return true; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueInit} */ +static DECLCALLBACK(int) rtIoQueueAioFileProv_QueueInit(RTIOQUEUEPROV hIoQueueProv, uint32_t fFlags, + uint32_t cSqEntries, uint32_t cCqEntries) +{ + RT_NOREF(fFlags, cCqEntries); + + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + int rc = VINF_SUCCESS; + + pThis->cReqsToCommitMax = cSqEntries; + pThis->cReqsFreeMax = cSqEntries; + pThis->cReqsFree = 0; + + pThis->pahReqsToCommit = (PRTFILEAIOREQ)RTMemAllocZ(cSqEntries * sizeof(PRTFILEAIOREQ)); + if (RT_LIKELY(pThis->pahReqsToCommit)) + { + pThis->pahReqsFree = (PRTFILEAIOREQ)RTMemAllocZ(cSqEntries * sizeof(PRTFILEAIOREQ)); + if (RT_LIKELY(pThis->pahReqsFree)) + { + rc = RTFileAioCtxCreate(&pThis->hAioCtx, cSqEntries, RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS); + if (RT_SUCCESS(rc)) + return VINF_SUCCESS; + + RTMemFree(pThis->pahReqsFree); + } + else + rc = VERR_NO_MEMORY; + + RTMemFree(pThis->pahReqsToCommit); + } + else + rc = VERR_NO_MEMORY; + + return rc; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueDestroy} */ +static DECLCALLBACK(void) rtIoQueueAioFileProv_QueueDestroy(RTIOQUEUEPROV hIoQueueProv) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + RTFileAioCtxDestroy(pThis->hAioCtx); + RTMemFree(pThis->pahReqsFree); + RTMemFree(pThis->pahReqsToCommit); + RT_BZERO(pThis, sizeof(*pThis)); +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleRegister} */ +static DECLCALLBACK(int) rtIoQueueAioFileProv_HandleRegister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + return RTFileAioCtxAssociateWithFile(pThis->hAioCtx, pHandle->u.hFile); +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleDeregister} */ +static DECLCALLBACK(int) rtIoQueueAioFileProv_HandleDeregister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle) +{ + RT_NOREF(hIoQueueProv, pHandle); + + /** @todo For Windows there doesn't seem to be a way to deregister the file handle without reopening the file, + *.for all other hosts this is a nop, just like the register method. + */ + return VINF_SUCCESS; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepare} */ +static DECLCALLBACK(int) rtIoQueueAioFileProv_ReqPrepare(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp, + uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags, + void *pvUser) +{ + RT_NOREF(fReqFlags); + + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + /* Try to grab a free request structure from the cache. */ + RTFILEAIOREQ hReq = NIL_RTFILEAIOREQ; + int rc = VINF_SUCCESS; + uint32_t cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree); + if (cReqsFree) + { + do + { + cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree); + hReq = pThis->pahReqsFree[pThis->cReqsFree - 1]; + } while (!ASMAtomicCmpXchgU32(&pThis->cReqsFree, cReqsFree - 1, cReqsFree)); + } + else + rc = RTFileAioReqCreate(&hReq); + + if (RT_SUCCESS(rc)) + { + switch (enmOp) + { + case RTIOQUEUEOP_READ: + rc = RTFileAioReqPrepareRead(hReq, pHandle->u.hFile, (RTFOFF)off, pvBuf, cbBuf, pvUser); + break; + case RTIOQUEUEOP_WRITE: + rc = RTFileAioReqPrepareWrite(hReq, pHandle->u.hFile, (RTFOFF)off, pvBuf, cbBuf, pvUser); + break; + case RTIOQUEUEOP_SYNC: + rc = RTFileAioReqPrepareFlush(hReq, pHandle->u.hFile, pvUser); + break; + default: + AssertMsgFailedReturn(("Invalid I/O queue operation: %d\n", enmOp), VERR_INTERNAL_ERROR); + } + + if (RT_SUCCESS(rc)) + pThis->pahReqsToCommit[pThis->cReqsToCommit++] = hReq; + else + { + int rc2 = RTFileAioReqDestroy(hReq); + Assert(rc2); RT_NOREF(rc2); + } + } + + return rc; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnCommit} */ +static DECLCALLBACK(int) rtIoQueueAioFileProv_Commit(RTIOQUEUEPROV hIoQueueProv, uint32_t *pcReqsCommitted) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + int rc = RTFileAioCtxSubmit(pThis->hAioCtx, pThis->pahReqsToCommit, pThis->cReqsToCommit); + if (RT_SUCCESS(rc)) + { + *pcReqsCommitted = pThis->cReqsToCommit; + pThis->cReqsToCommit = 0; + } + + return rc; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWait} */ +static DECLCALLBACK(int) rtIoQueueAioFileProv_EvtWait(RTIOQUEUEPROV hIoQueueProv, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt, + uint32_t cMinWait, uint32_t *pcCEvt, uint32_t fFlags) +{ + RT_NOREF(fFlags); + + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + int rc = VINF_SUCCESS; + uint32_t idxCEvt = 0; + + while ( RT_SUCCESS(rc) + && cMinWait + && cCEvt) + { + RTFILEAIOREQ ahReqs[64]; + uint32_t cReqsCompleted = 0; + + rc = RTFileAioCtxWait(pThis->hAioCtx, cMinWait, RT_INDEFINITE_WAIT, + &ahReqs[0], RT_MIN(RT_ELEMENTS(ahReqs), cCEvt), &cReqsCompleted); + if (RT_SUCCESS(rc)) + { + for (unsigned i = 0; i < cReqsCompleted; i++) + { + RTFILEAIOREQ hReq = ahReqs[i]; + + paCEvt[idxCEvt].rcReq = RTFileAioReqGetRC(hReq, &paCEvt[idxCEvt].cbXfered); + paCEvt[idxCEvt].pvUser = RTFileAioReqGetUser(hReq); + idxCEvt++; + + /* Try to insert the free request into the cache. */ + uint32_t cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree); + if (cReqsFree < pThis->cReqsFreeMax) + { + do + { + cReqsFree = ASMAtomicReadU32(&pThis->cReqsFree); + pThis->pahReqsFree[pThis->cReqsFree] = hReq; + } while (!ASMAtomicCmpXchgU32(&pThis->cReqsFree, cReqsFree + 1, cReqsFree)); + } + else + rc = RTFileAioReqDestroy(hReq); + } + + cCEvt -= cReqsCompleted; + cMinWait -= RT_MIN(cMinWait, cReqsCompleted); + } + } + + *pcCEvt = idxCEvt; + return rc; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWaitWakeup} */ +static DECLCALLBACK(int) rtIoQueueAioFileProv_EvtWaitWakeup(RTIOQUEUEPROV hIoQueueProv) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + return RTFileAioCtxWakeup(pThis->hAioCtx); +} + + +/** + * Async file I/O queue provider virtual method table. + */ +RT_DECL_DATA_CONST(RTIOQUEUEPROVVTABLE const) g_RTIoQueueAioFileProv = +{ + /** uVersion */ + RTIOQUEUEPROVVTABLE_VERSION, + /** pszId */ + "AioFile", + /** cbIoQueueProv */ + sizeof(RTIOQUEUEPROVINT), + /** enmHnd */ + RTHANDLETYPE_FILE, + /** fFlags */ + 0, + /** pfnIsSupported */ + rtIoQueueAioFileProv_IsSupported, + /** pfnQueueInit */ + rtIoQueueAioFileProv_QueueInit, + /** pfnQueueDestroy */ + rtIoQueueAioFileProv_QueueDestroy, + /** pfnHandleRegister */ + rtIoQueueAioFileProv_HandleRegister, + /** pfnHandleDeregister */ + rtIoQueueAioFileProv_HandleDeregister, + /** pfnReqPrepare */ + rtIoQueueAioFileProv_ReqPrepare, + /** pfnReqPrepareSg */ + NULL, + /** pfnCommit */ + rtIoQueueAioFileProv_Commit, + /** pfnEvtWait */ + rtIoQueueAioFileProv_EvtWait, + /** pfnEvtWaitWakeup */ + rtIoQueueAioFileProv_EvtWaitWakeup, + /** uEndMarker */ + RTIOQUEUEPROVVTABLE_VERSION +}; + diff --git a/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp b/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp new file mode 100644 index 00000000..826fdabc --- /dev/null +++ b/src/VBox/Runtime/common/ioqueue/ioqueue-stdfile-provider.cpp @@ -0,0 +1,539 @@ +/* $Id: ioqueue-stdfile-provider.cpp $ */ +/** @file + * IPRT - I/O queue, Standard file provider. + */ + +/* + * Copyright (C) 2019-2020 Oracle Corporation + * + * This file is part of VirtualBox Open Source Edition (OSE), as + * available from http://www.virtualbox.org. This file is free software; + * you can redistribute it and/or modify it under the terms of the GNU + * General Public License (GPL) as published by the Free Software + * Foundation, in version 2 as it comes in the "COPYING" file of the + * VirtualBox OSE distribution. VirtualBox OSE is distributed in the + * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. + * + * The contents of this file may alternatively be used under the terms + * of the Common Development and Distribution License Version 1.0 + * (CDDL) only, as it comes in the "COPYING.CDDL" file of the + * VirtualBox OSE distribution, in which case the provisions of the + * CDDL are applicable instead of those of the GPL. + * + * You may elect to license modified versions of this file under the + * terms and conditions of either the GPL or the CDDL or both. + */ + + +/********************************************************************************************************************************* +* Header Files * +*********************************************************************************************************************************/ +#define LOG_GROUP RTLOGGROUP_IOQUEUE +#include <iprt/ioqueue.h> + +#include <iprt/asm.h> +#include <iprt/errcore.h> +#include <iprt/file.h> +#include <iprt/log.h> +#include <iprt/mem.h> +#include <iprt/semaphore.h> +#include <iprt/string.h> +#include <iprt/thread.h> + +#include "internal/ioqueue.h" + + +/********************************************************************************************************************************* +* Defined Constants And Macros * +*********************************************************************************************************************************/ + +/** The I/O queue worker thread needs to wake up the waiting thread when requests completed. */ +#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP RT_BIT(0) +/** The waiting thread was interrupted by the external wakeup call. */ +#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR RT_BIT(1) +#define RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR_BIT 1 +/** The I/O queue worker thread needs to be woken up to process new requests. */ +#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP RT_BIT(2) +#define RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT 2 + + +/********************************************************************************************************************************* +* Structures and Typedefs * +*********************************************************************************************************************************/ + + +/** + * Submission queue entry. + */ +typedef struct RTIOQUEUESSQENTRY +{ + /** The file to work on. */ + RTFILE hFile; + /** I/O operation. */ + RTIOQUEUEOP enmOp; + /** Start offset. */ + uint64_t off; + /** Additional request flags. */ + uint32_t fReqFlags; + /** Size of the request. */ + size_t cbReq; + /** Opaque user data passed on completion. */ + void *pvUser; + /** Flag whether this is a S/G or standard request. */ + bool fSg; + /** Type dependent data. */ + union + { + /** Pointer to buffer for non S/G requests. */ + void *pvBuf; + /** Pointer to S/G buffer. */ + PCRTSGBUF pSgBuf; + } u; +} RTIOQUEUESSQENTRY; +/** Pointer to a submission queue entry. */ +typedef RTIOQUEUESSQENTRY *PRTIOQUEUESSQENTRY; +/** Pointer to a constant submission queue entry. */ +typedef const RTIOQUEUESSQENTRY *PCRTIOQUEUESSQENTRY; + + +/** + * Internal I/O queue provider instance data. + */ +typedef struct RTIOQUEUEPROVINT +{ + /** Size of the submission queue in entries. */ + uint32_t cSqEntries; + /** Size of the completion queue in entries. */ + uint32_t cCqEntries; + /** Pointer to the submission queue base. */ + PRTIOQUEUESSQENTRY paSqEntryBase; + /** Submission queue producer index. */ + volatile uint32_t idxSqProd; + /** Submission queue producer value for any uncommitted requests. */ + uint32_t idxSqProdUncommit; + /** Submission queue consumer index. */ + volatile uint32_t idxSqCons; + /** Pointer to the completion queue base. */ + PRTIOQUEUECEVT paCqEntryBase; + /** Completion queue producer index. */ + volatile uint32_t idxCqProd; + /** Completion queue consumer index. */ + volatile uint32_t idxCqCons; + /** Various state flags for synchronizing the worker thread with other participants. */ + volatile uint32_t fState; + /** The worker thread handle. */ + RTTHREAD hThrdWork; + /** Event semaphore the worker thread waits on for work. */ + RTSEMEVENT hSemEvtWorker; + /** Event semaphore the caller waits for completion events. */ + RTSEMEVENT hSemEvtWaitEvts; + /** Flag whether to shutdown the worker thread. */ + volatile bool fShutdown; +} RTIOQUEUEPROVINT; +/** Pointer to the internal I/O queue provider instance data. */ +typedef RTIOQUEUEPROVINT *PRTIOQUEUEPROVINT; + + +/********************************************************************************************************************************* +* Internal Functions * +*********************************************************************************************************************************/ + + +/** + * Processes the given submission queue entry and reports back the result in the completion queue. + * + * @returns nothing. + * @param pSqEntry The submission queue entry to process. + * @param pCqEntry The comppletion queue entry to store the result in. + */ +static void rtIoQueueStdFileProv_SqEntryProcess(PCRTIOQUEUESSQENTRY pSqEntry, PRTIOQUEUECEVT pCqEntry) +{ + int rcReq = VINF_SUCCESS; + + switch (pSqEntry->enmOp) + { + case RTIOQUEUEOP_READ: + if (!pSqEntry->fSg) + rcReq = RTFileReadAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL); + else + { + RTSGBUF SgBuf; + RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf); + rcReq = RTFileSgReadAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL); + } + break; + case RTIOQUEUEOP_WRITE: + if (!pSqEntry->fSg) + rcReq = RTFileWriteAt(pSqEntry->hFile, pSqEntry->off, pSqEntry->u.pvBuf, pSqEntry->cbReq, NULL); + else + { + RTSGBUF SgBuf; + RTSgBufClone(&SgBuf, pSqEntry->u.pSgBuf); + rcReq = RTFileSgWriteAt(pSqEntry->hFile, pSqEntry->off, &SgBuf, pSqEntry->cbReq, NULL); + } + break; + case RTIOQUEUEOP_SYNC: + rcReq = RTFileFlush(pSqEntry->hFile); + break; + default: + AssertMsgFailedReturnVoid(("Invalid I/O queue operation: %d\n", pSqEntry->enmOp)); + } + + /* Write the result back into the completion queue. */ + pCqEntry->rcReq = rcReq; + pCqEntry->pvUser = pSqEntry->pvUser; + pCqEntry->cbXfered = RT_SUCCESS(rcReq) ? pSqEntry->cbReq : 0; +} + + +/** + * The main I/O queue worker loop which processes the incoming I/O requests. + */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_WorkerLoop(RTTHREAD hThrdSelf, void *pvUser) +{ + PRTIOQUEUEPROVINT pThis = (PRTIOQUEUEPROVINT)pvUser; + + /* Signal that we started up. */ + int rc = RTThreadUserSignal(hThrdSelf); + AssertRC(rc); + + while (!ASMAtomicReadBool(&pThis->fShutdown)) + { + /* Wait for some work. */ + ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP); + uint32_t idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd); + uint32_t idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons); + uint32_t idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons); + + if (idxSqCons == idxSqProd) + { + rc = RTSemEventWait(pThis->hSemEvtWorker, RT_INDEFINITE_WAIT); + AssertRC(rc); + + idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd); + idxSqCons = ASMAtomicReadU32(&pThis->idxSqCons); + idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons); + } + + ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT); + + /* Process all requests. */ + uint32_t cCqFree = 0; + if (idxCqCons > pThis->idxCqProd) + cCqFree = pThis->cCqEntries - (pThis->cCqEntries - idxCqCons) - pThis->idxCqProd; + else + cCqFree = pThis->cCqEntries - pThis->idxCqProd - idxCqCons; + do + { + while ( idxSqCons != idxSqProd + && cCqFree) + { + PCRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[idxSqCons]; + PRTIOQUEUECEVT pCqEntry = &pThis->paCqEntryBase[pThis->idxCqProd]; + + rtIoQueueStdFileProv_SqEntryProcess(pSqEntry, pCqEntry); + ASMWriteFence(); + + idxSqCons = (idxSqCons + 1) % pThis->cSqEntries; + cCqFree--; + pThis->idxCqProd = (pThis->idxCqProd + 1) % pThis->cCqEntries; + ASMAtomicWriteU32(&pThis->idxSqCons, idxSqCons); + ASMWriteFence(); + if (ASMAtomicReadU32(&pThis->fState) & RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP) + { + rc = RTSemEventSignal(pThis->hSemEvtWaitEvts); + AssertRC(rc); + } + } + + idxSqProd = ASMAtomicReadU32(&pThis->idxSqProd); + } while ( idxSqCons != idxSqProd + && cCqFree); + } + + return VINF_SUCCESS; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnIsSupported} */ +static DECLCALLBACK(bool) rtIoQueueStdFileProv_IsSupported(void) +{ + /* The common code/public API already checked for the proper handle type. */ + return true; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueInit} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_QueueInit(RTIOQUEUEPROV hIoQueueProv, uint32_t fFlags, + uint32_t cSqEntries, uint32_t cCqEntries) +{ + RT_NOREF(fFlags); + + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + int rc = VINF_SUCCESS; + + cSqEntries++; + cCqEntries++; + + pThis->cSqEntries = cSqEntries; + pThis->cCqEntries = cCqEntries; + pThis->idxSqProd = 0; + pThis->idxSqProdUncommit = 0; + pThis->idxSqCons = 0; + pThis->idxCqProd = 0; + pThis->idxCqCons = 0; + pThis->fShutdown = false; + pThis->fState = 0; + + pThis->paSqEntryBase = (PRTIOQUEUESSQENTRY)RTMemAllocZ(cSqEntries * sizeof(RTIOQUEUESSQENTRY)); + if (RT_LIKELY(pThis->paSqEntryBase)) + { + pThis->paCqEntryBase = (PRTIOQUEUECEVT)RTMemAllocZ(cCqEntries * sizeof(RTIOQUEUECEVT)); + if (RT_LIKELY(pThis->paSqEntryBase)) + { + rc = RTSemEventCreate(&pThis->hSemEvtWorker); + if (RT_SUCCESS(rc)) + { + rc = RTSemEventCreate(&pThis->hSemEvtWaitEvts); + if (RT_SUCCESS(rc)) + { + /* Spin up the worker thread. */ + rc = RTThreadCreate(&pThis->hThrdWork, rtIoQueueStdFileProv_WorkerLoop, pThis, 0, RTTHREADTYPE_IO, RTTHREADFLAGS_WAITABLE, + "IoQ-StdFile"); + if (RT_SUCCESS(rc)) + { + rc = RTThreadUserWait(pThis->hThrdWork, 10 * RT_MS_1SEC); + AssertRC(rc); + + return VINF_SUCCESS; + } + + RTSemEventDestroy(pThis->hSemEvtWaitEvts); + } + + RTSemEventDestroy(pThis->hSemEvtWorker); + } + + RTMemFree(pThis->paCqEntryBase); + } + else + rc = VERR_NO_MEMORY; + + RTMemFree(pThis->paSqEntryBase); + } + else + rc = VERR_NO_MEMORY; + + return rc; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnQueueDestroy} */ +static DECLCALLBACK(void) rtIoQueueStdFileProv_QueueDestroy(RTIOQUEUEPROV hIoQueueProv) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + ASMAtomicXchgBool(&pThis->fShutdown, true); + RTSemEventSignal(pThis->hSemEvtWorker); + + int rc = RTThreadWait(pThis->hThrdWork, 60 * RT_MS_1SEC, NULL); + AssertRC(rc); + + RTSemEventDestroy(pThis->hSemEvtWaitEvts); + RTSemEventDestroy(pThis->hSemEvtWorker); + RTMemFree(pThis->paCqEntryBase); + RTMemFree(pThis->paSqEntryBase); + RT_BZERO(pThis, sizeof(*pThis)); +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleRegister} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleRegister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle) +{ + RT_NOREF(hIoQueueProv, pHandle); + + /* Nothing to do here. */ + return VINF_SUCCESS; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnHandleDeregister} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_HandleDeregister(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle) +{ + RT_NOREF(hIoQueueProv, pHandle); + + /* Nothing to do here. */ + return VINF_SUCCESS; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepare} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepare(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp, + uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags, + void *pvUser) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[pThis->idxSqProdUncommit]; + + pSqEntry->hFile = pHandle->u.hFile; + pSqEntry->enmOp = enmOp; + pSqEntry->off = off; + pSqEntry->fReqFlags = fReqFlags; + pSqEntry->cbReq = cbBuf; + pSqEntry->pvUser = pvUser; + pSqEntry->fSg = false; + pSqEntry->u.pvBuf = pvBuf; + + pThis->idxSqProdUncommit = (pThis->idxSqProdUncommit + 1) % pThis->cSqEntries; + return VINF_SUCCESS; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnReqPrepareSg} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_ReqPrepareSg(RTIOQUEUEPROV hIoQueueProv, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp, + uint64_t off, PCRTSGBUF pSgBuf, size_t cbSg, uint32_t fReqFlags, + void *pvUser) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + PRTIOQUEUESSQENTRY pSqEntry = &pThis->paSqEntryBase[pThis->idxSqProdUncommit]; + + pSqEntry->hFile = pHandle->u.hFile; + pSqEntry->enmOp = enmOp; + pSqEntry->off = off; + pSqEntry->fReqFlags = fReqFlags; + pSqEntry->cbReq = cbSg; + pSqEntry->pvUser = pvUser; + pSqEntry->fSg = true; + pSqEntry->u.pSgBuf = pSgBuf; + + pThis->idxSqProdUncommit = (pThis->idxSqProdUncommit + 1) % pThis->cSqEntries; + return VINF_SUCCESS; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnCommit} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_Commit(RTIOQUEUEPROV hIoQueueProv, uint32_t *pcReqsCommitted) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + if (pThis->idxSqProd > pThis->idxSqProdUncommit) + *pcReqsCommitted = pThis->cSqEntries - pThis->idxSqProd + pThis->idxSqProdUncommit; + else + *pcReqsCommitted = pThis->idxSqProdUncommit - pThis->idxSqProd; + + ASMWriteFence(); + ASMAtomicWriteU32(&pThis->idxSqProd, pThis->idxSqProdUncommit); + return RTSemEventSignal(pThis->hSemEvtWorker); +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWait} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWait(RTIOQUEUEPROV hIoQueueProv, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt, + uint32_t cMinWait, uint32_t *pcCEvt, uint32_t fFlags) +{ + RT_NOREF(fFlags); + + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + int rc = VINF_SUCCESS; + uint32_t idxCEvt = 0; + + while ( RT_SUCCESS(rc) + && cMinWait + && cCEvt) + { + ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_NEED_WAKEUP); + uint32_t idxCqProd = ASMAtomicReadU32(&pThis->idxCqProd); + uint32_t idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons); + + if (idxCqCons == idxCqProd) + { + rc = RTSemEventWait(pThis->hSemEvtWaitEvts, RT_INDEFINITE_WAIT); + AssertRC(rc); + if (ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR_BIT)) + { + rc = VERR_INTERRUPTED; + ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT); + break; + } + + idxCqProd = ASMAtomicReadU32(&pThis->idxCqProd); + idxCqCons = ASMAtomicReadU32(&pThis->idxCqCons); + } + + ASMAtomicBitTestAndClear(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_WORKER_NEED_WAKEUP_BIT); + + /* Process all requests. */ + while ( idxCqCons != idxCqProd + && cCEvt) + { + PRTIOQUEUECEVT pCqEntry = &pThis->paCqEntryBase[idxCqCons]; + + paCEvt[idxCEvt].rcReq = pCqEntry->rcReq; + paCEvt[idxCEvt].pvUser = pCqEntry->pvUser; + paCEvt[idxCEvt].cbXfered = pCqEntry->cbXfered; + ASMReadFence(); + + idxCEvt++; + cCEvt--; + cMinWait--; + + idxCqCons = (idxCqCons + 1) % pThis->cCqEntries; + pThis->idxCqCons = (pThis->idxCqCons + 1) % pThis->cCqEntries; + ASMWriteFence(); + } + } + + *pcCEvt = idxCEvt; + return rc; +} + + +/** @interface_method_impl{RTIOQUEUEPROVVTABLE,pfnEvtWaitWakeup} */ +static DECLCALLBACK(int) rtIoQueueStdFileProv_EvtWaitWakeup(RTIOQUEUEPROV hIoQueueProv) +{ + PRTIOQUEUEPROVINT pThis = hIoQueueProv; + + ASMAtomicOrU32(&pThis->fState, RTIOQUEUE_STDFILE_PROV_STATE_F_EVTWAIT_INTR); + return RTSemEventSignal(pThis->hSemEvtWaitEvts); +} + + +/** + * Standard file I/O queue provider virtual method table. + */ +RT_DECL_DATA_CONST(RTIOQUEUEPROVVTABLE const) g_RTIoQueueStdFileProv = +{ + /** uVersion */ + RTIOQUEUEPROVVTABLE_VERSION, + /** pszId */ + "StdFile", + /** cbIoQueueProv */ + sizeof(RTIOQUEUEPROVINT), + /** enmHnd */ + RTHANDLETYPE_FILE, + /** fFlags */ + 0, + /** pfnIsSupported */ + rtIoQueueStdFileProv_IsSupported, + /** pfnQueueInit */ + rtIoQueueStdFileProv_QueueInit, + /** pfnQueueDestroy */ + rtIoQueueStdFileProv_QueueDestroy, + /** pfnHandleRegister */ + rtIoQueueStdFileProv_HandleRegister, + /** pfnHandleDeregister */ + rtIoQueueStdFileProv_HandleDeregister, + /** pfnReqPrepare */ + rtIoQueueStdFileProv_ReqPrepare, + /** pfnReqPrepareSg */ + rtIoQueueStdFileProv_ReqPrepareSg, + /** pfnCommit */ + rtIoQueueStdFileProv_Commit, + /** pfnEvtWait */ + rtIoQueueStdFileProv_EvtWait, + /** pfnEvtWaitWakeup */ + rtIoQueueStdFileProv_EvtWaitWakeup, + /** uEndMarker */ + RTIOQUEUEPROVVTABLE_VERSION +}; + diff --git a/src/VBox/Runtime/common/ioqueue/ioqueuebase.cpp b/src/VBox/Runtime/common/ioqueue/ioqueuebase.cpp new file mode 100644 index 00000000..28ecf1af --- /dev/null +++ b/src/VBox/Runtime/common/ioqueue/ioqueuebase.cpp @@ -0,0 +1,287 @@ +/* $Id: ioqueuebase.cpp $ */ +/** @file + * IPRT - I/O queue, Base/Public API. + */ + +/* + * Copyright (C) 2019-2020 Oracle Corporation + * + * This file is part of VirtualBox Open Source Edition (OSE), as + * available from http://www.virtualbox.org. This file is free software; + * you can redistribute it and/or modify it under the terms of the GNU + * General Public License (GPL) as published by the Free Software + * Foundation, in version 2 as it comes in the "COPYING" file of the + * VirtualBox OSE distribution. VirtualBox OSE is distributed in the + * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. + * + * The contents of this file may alternatively be used under the terms + * of the Common Development and Distribution License Version 1.0 + * (CDDL) only, as it comes in the "COPYING.CDDL" file of the + * VirtualBox OSE distribution, in which case the provisions of the + * CDDL are applicable instead of those of the GPL. + * + * You may elect to license modified versions of this file under the + * terms and conditions of either the GPL or the CDDL or both. + */ + + +/********************************************************************************************************************************* +* Header Files * +*********************************************************************************************************************************/ +#define LOG_GROUP RTLOGGROUP_IOQUEUE +#include <iprt/ioqueue.h> + +#include <iprt/asm.h> +#include <iprt/err.h> +#include <iprt/log.h> +#include <iprt/mem.h> +#include <iprt/semaphore.h> +#include <iprt/string.h> + +#include "internal/ioqueue.h" + + +/********************************************************************************************************************************* +* Defined Constants And Macros * +*********************************************************************************************************************************/ + + + +/********************************************************************************************************************************* +* Structures and Typedefs * +*********************************************************************************************************************************/ + +/** + * Internal I/O queue instance data. + */ +typedef struct RTIOQUEUEINT +{ + /** Magic identifying the I/O queue structure. */ + uint32_t u32Magic; + /** Pointer to the provider vtable. */ + PCRTIOQUEUEPROVVTABLE pVTbl; + /** I/O queue provider instance handle. */ + RTIOQUEUEPROV hIoQueueProv; + /** Maximum number of submission queue entries - constant. */ + uint32_t cSqEntries; + /** Maximum number of completion queue entries - constant. */ + uint32_t cCqEntries; + /** Number of currently committed and not completed requests. */ + volatile uint32_t cReqsCommitted; + /** Number of prepared requests. */ + volatile uint32_t cReqsPrepared; + /** Start of the provider specific instance data - vvariable in size. */ + uint8_t abInst[1]; +} RTIOQUEUEINT; +/** Pointer to the internal I/O queue instance data. */ +typedef RTIOQUEUEINT *PRTIOQUEUEINT; + + +/********************************************************************************************************************************* +* Global Variables * +*********************************************************************************************************************************/ +/** Array of I/O queue providers we know about, order is important for each type. + * The best suited ones for each platform should come first. + */ +static PCRTIOQUEUEPROVVTABLE g_apIoQueueProviders[] = +{ +#if defined(RT_OS_LINUX) + &g_RTIoQueueLnxIoURingProv, +#endif +#ifndef RT_OS_OS2 + &g_RTIoQueueAioFileProv, +#endif + &g_RTIoQueueStdFileProv +}; + + +/********************************************************************************************************************************* +* Internal Functions * +*********************************************************************************************************************************/ + + +RTDECL(PCRTIOQUEUEPROVVTABLE) RTIoQueueProviderGetBestForHndType(RTHANDLETYPE enmHnd) +{ + /* Go through the array and pick the first supported provider for the given handle type. */ + for (unsigned i = 0; i < RT_ELEMENTS(g_apIoQueueProviders); i++) + { + PCRTIOQUEUEPROVVTABLE pIoQueueProv = g_apIoQueueProviders[i]; + if ( pIoQueueProv->enmHnd == enmHnd + && pIoQueueProv->pfnIsSupported()) + return pIoQueueProv; + } + + return NULL; +} + + +RTDECL(PCRTIOQUEUEPROVVTABLE) RTIoQueueProviderGetById(const char *pszId) +{ + for (unsigned i = 0; i < RT_ELEMENTS(g_apIoQueueProviders); i++) + { + PCRTIOQUEUEPROVVTABLE pIoQueueProv = g_apIoQueueProviders[i]; + if (!strcmp(pIoQueueProv->pszId, pszId)) + return pIoQueueProv; + } + + return NULL; +} + + +RTDECL(int) RTIoQueueCreate(PRTIOQUEUE phIoQueue, PCRTIOQUEUEPROVVTABLE pProvVTable, + uint32_t fFlags, uint32_t cSqEntries, uint32_t cCqEntries) +{ + AssertPtrReturn(phIoQueue, VERR_INVALID_POINTER); + AssertPtrReturn(pProvVTable, VERR_INVALID_POINTER); + AssertReturn(!fFlags, VERR_INVALID_PARAMETER); + AssertReturn(cSqEntries > 0, VERR_INVALID_PARAMETER); + AssertReturn(cCqEntries > 0, VERR_INVALID_PARAMETER); + + int rc = VINF_SUCCESS; + PRTIOQUEUEINT pThis = (PRTIOQUEUEINT)RTMemAllocZ(RT_UOFFSETOF_DYN(RTIOQUEUEINT, abInst[pProvVTable->cbIoQueueProv])); + if (RT_LIKELY(pThis)) + { + pThis->pVTbl = pProvVTable; + pThis->hIoQueueProv = (RTIOQUEUEPROV)&pThis->abInst[0]; + pThis->cSqEntries = cSqEntries; + pThis->cCqEntries = cCqEntries; + pThis->cReqsCommitted = 0; + pThis->cReqsPrepared = 0; + + rc = pThis->pVTbl->pfnQueueInit(pThis->hIoQueueProv, fFlags, cSqEntries, cCqEntries); + if (RT_SUCCESS(rc)) + { + *phIoQueue = pThis; + return VINF_SUCCESS; + } + + RTMemFree(pThis); + } + else + rc = VERR_NO_MEMORY; + + return rc; +} + + +RTDECL(int) RTIoQueueDestroy(RTIOQUEUE hIoQueue) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(ASMAtomicReadU32(&pThis->cReqsCommitted) == 0, VERR_IOQUEUE_BUSY); + + pThis->pVTbl->pfnQueueDestroy(pThis->hIoQueueProv); + RTMemFree(pThis); + return VINF_SUCCESS; +} + + +RTDECL(int) RTIoQueueHandleRegister(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + + /** @todo Efficiently check that handle wasn't registered previously. */ + return pThis->pVTbl->pfnHandleRegister(pThis->hIoQueueProv, pHandle); +} + + +RTDECL(int) RTIoQueueHandleDeregister(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + + /** @todo Efficiently check that handle was registered previously. */ + return pThis->pVTbl->pfnHandleDeregister(pThis->hIoQueueProv, pHandle); +} + + +RTDECL(int) RTIoQueueRequestPrepare(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp, + uint64_t off, void *pvBuf, size_t cbBuf, uint32_t fReqFlags, + void *pvUser) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pHandle->enmType == pThis->pVTbl->enmHnd, VERR_INVALID_HANDLE); + + /** @todo Efficiently check that handle was registered previously. */ + int rc = pThis->pVTbl->pfnReqPrepare(pThis->hIoQueueProv, pHandle, enmOp, off, pvBuf, cbBuf, + fReqFlags, pvUser); + if (RT_SUCCESS(rc)) + ASMAtomicIncU32(&pThis->cReqsPrepared); + + return rc; +} + + +RTDECL(int) RTIoQueueRequestPrepareSg(RTIOQUEUE hIoQueue, PCRTHANDLE pHandle, RTIOQUEUEOP enmOp, + uint64_t off, PCRTSGBUF pSgBuf, size_t cbSg, uint32_t fReqFlags, + void *pvUser) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pHandle->enmType == pThis->pVTbl->enmHnd, VERR_INVALID_HANDLE); + + /** @todo Efficiently check that handle was registered previously. */ + int rc = pThis->pVTbl->pfnReqPrepareSg(pThis->hIoQueueProv, pHandle, enmOp, off, pSgBuf, cbSg, + fReqFlags, pvUser); + if (RT_SUCCESS(rc)) + ASMAtomicIncU32(&pThis->cReqsPrepared); + + return rc; +} + + +RTDECL(int) RTIoQueueCommit(RTIOQUEUE hIoQueue) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(ASMAtomicReadU32(&pThis->cReqsPrepared) > 0, VERR_IOQUEUE_EMPTY); + + uint32_t cReqsPreparedOld = 0; + uint32_t cReqsCommitted = 0; + int rc = VINF_SUCCESS; + do + { + rc = pThis->pVTbl->pfnCommit(pThis->hIoQueueProv, &cReqsCommitted); + if (RT_SUCCESS(rc)) + { + ASMAtomicAddU32(&pThis->cReqsCommitted, cReqsCommitted); + cReqsPreparedOld = ASMAtomicSubU32(&pThis->cReqsPrepared, cReqsCommitted); + } + } while (RT_SUCCESS(rc) && cReqsPreparedOld - cReqsCommitted > 0); + + return rc; +} + + +RTDECL(int) RTIoQueueEvtWait(RTIOQUEUE hIoQueue, PRTIOQUEUECEVT paCEvt, uint32_t cCEvt, uint32_t cMinWait, + uint32_t *pcCEvt, uint32_t fFlags) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertPtrReturn(paCEvt, VERR_INVALID_POINTER); + AssertReturn(cCEvt > 0, VERR_INVALID_PARAMETER); + AssertReturn(cMinWait > 0, VERR_INVALID_PARAMETER); + AssertPtrReturn(pcCEvt, VERR_INVALID_POINTER); + AssertReturn(!fFlags, VERR_INVALID_PARAMETER); + AssertReturn(ASMAtomicReadU32(&pThis->cReqsCommitted) > 0, VERR_IOQUEUE_EMPTY); + + *pcCEvt = 0; + int rc = pThis->pVTbl->pfnEvtWait(pThis->hIoQueueProv, paCEvt, cCEvt, cMinWait, pcCEvt, fFlags); + if ( (RT_SUCCESS(rc) || rc == VERR_INTERRUPTED) + && *pcCEvt > 0) + ASMAtomicSubU32(&pThis->cReqsCommitted, *pcCEvt); + + return rc; +} + + +RTDECL(int) RTIoQueueEvtWaitWakeup(RTIOQUEUE hIoQueue) +{ + PRTIOQUEUEINT pThis = hIoQueue; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + + return pThis->pVTbl->pfnEvtWaitWakeup(pThis->hIoQueueProv); +} + |