diff options
Diffstat (limited to '')
-rw-r--r-- | src/VBox/Runtime/r3/posix/fileaio-posix.cpp | 1062 |
1 files changed, 1062 insertions, 0 deletions
diff --git a/src/VBox/Runtime/r3/posix/fileaio-posix.cpp b/src/VBox/Runtime/r3/posix/fileaio-posix.cpp new file mode 100644 index 00000000..7152e3f6 --- /dev/null +++ b/src/VBox/Runtime/r3/posix/fileaio-posix.cpp @@ -0,0 +1,1062 @@ +/* $Id: fileaio-posix.cpp $ */ +/** @file + * IPRT - File async I/O, native implementation for POSIX compliant host platforms. + */ + +/* + * Copyright (C) 2006-2020 Oracle Corporation + * + * This file is part of VirtualBox Open Source Edition (OSE), as + * available from http://www.virtualbox.org. This file is free software; + * you can redistribute it and/or modify it under the terms of the GNU + * General Public License (GPL) as published by the Free Software + * Foundation, in version 2 as it comes in the "COPYING" file of the + * VirtualBox OSE distribution. VirtualBox OSE is distributed in the + * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. + * + * The contents of this file may alternatively be used under the terms + * of the Common Development and Distribution License Version 1.0 + * (CDDL) only, as it comes in the "COPYING.CDDL" file of the + * VirtualBox OSE distribution, in which case the provisions of the + * CDDL are applicable instead of those of the GPL. + * + * You may elect to license modified versions of this file under the + * terms and conditions of either the GPL or the CDDL or both. + */ + + +/********************************************************************************************************************************* +* Header Files * +*********************************************************************************************************************************/ +#define LOG_GROUP RTLOGGROUP_DIR +#include <iprt/asm.h> +#include <iprt/file.h> +#include <iprt/mem.h> +#include <iprt/assert.h> +#include <iprt/string.h> +#include <iprt/err.h> +#include <iprt/log.h> +#include <iprt/thread.h> +#include <iprt/semaphore.h> +#include "internal/fileaio.h" + +#if defined(RT_OS_DARWIN) || defined(RT_OS_FREEBSD) +# include <sys/types.h> +# include <sys/sysctl.h> /* for sysctlbyname */ +#endif +#if defined(RT_OS_FREEBSD) +# include <fcntl.h> /* O_SYNC */ +#endif +#include <aio.h> +#include <errno.h> +#include <time.h> + +/* + * Linux does not define this value. + * Just define it with really big + * value. + */ +#ifndef AIO_LISTIO_MAX +# define AIO_LISTIO_MAX UINT32_MAX +#endif + +#if 0 /* Only used for debugging */ +# undef AIO_LISTIO_MAX +# define AIO_LISTIO_MAX 16 +#endif + +/** Invalid entry in the waiting array. */ +#define RTFILEAIOCTX_WAIT_ENTRY_INVALID (~0U) + +/** No-op replacement for rtFileAioCtxDump for non debug builds */ +#ifndef LOG_ENABLED +# define rtFileAioCtxDump(pCtxInt) do {} while (0) +#endif + + +/********************************************************************************************************************************* +* Structures and Typedefs * +*********************************************************************************************************************************/ +/** + * Async I/O request state. + */ +typedef struct RTFILEAIOREQINTERNAL +{ + /** The aio control block. FIRST ELEMENT! */ + struct aiocb AioCB; + /** Next element in the chain. */ + struct RTFILEAIOREQINTERNAL *pNext; + /** Previous element in the chain. */ + struct RTFILEAIOREQINTERNAL *pPrev; + /** Current state the request is in. */ + RTFILEAIOREQSTATE enmState; + /** Flag whether this is a flush request. */ + bool fFlush; + /** Flag indicating if the request was canceled. */ + volatile bool fCanceled; + /** Opaque user data. */ + void *pvUser; + /** Number of bytes actually transferred. */ + size_t cbTransfered; + /** Status code. */ + int Rc; + /** Completion context we are assigned to. */ + struct RTFILEAIOCTXINTERNAL *pCtxInt; + /** Entry in the waiting list the request is in. */ + unsigned iWaitingList; + /** Magic value (RTFILEAIOREQ_MAGIC). */ + uint32_t u32Magic; +} RTFILEAIOREQINTERNAL, *PRTFILEAIOREQINTERNAL; + +/** + * Async I/O completion context state. + */ +typedef struct RTFILEAIOCTXINTERNAL +{ + /** Current number of requests active on this context. */ + volatile int32_t cRequests; + /** Maximum number of requests this context can handle. */ + uint32_t cMaxRequests; + /** The ID of the thread which is currently waiting for requests. */ + volatile RTTHREAD hThreadWait; + /** Flag whether the thread was woken up. */ + volatile bool fWokenUp; + /** Flag whether the thread is currently waiting in the syscall. */ + volatile bool fWaiting; + /** Flags given during creation. */ + uint32_t fFlags; + /** Magic value (RTFILEAIOCTX_MAGIC). */ + uint32_t u32Magic; + /** Flag whether the thread was woken up due to a internal event. */ + volatile bool fWokenUpInternal; + /** List of new requests which needs to be inserted into apReqs by the + * waiting thread. */ + volatile PRTFILEAIOREQINTERNAL apReqsNewHead[5]; + /** Special entry for requests which are canceled. Because only one + * request can be canceled at a time and the thread canceling the request + * has to wait we need only one entry. */ + volatile PRTFILEAIOREQINTERNAL pReqToCancel; + /** Event semaphore the canceling thread is waiting for completion of + * the operation. */ + RTSEMEVENT SemEventCancel; + /** Head of submitted elements waiting to get into the array. */ + PRTFILEAIOREQINTERNAL pReqsWaitHead; + /** Tail of submitted elements waiting to get into the array. */ + PRTFILEAIOREQINTERNAL pReqsWaitTail; + /** Maximum number of elements in the waiting array. */ + unsigned cReqsWaitMax; + /** First free slot in the waiting list. */ + unsigned iFirstFree; + /** List of requests we are currently waiting on. + * Size depends on cMaxRequests and AIO_LISTIO_MAX. */ + volatile PRTFILEAIOREQINTERNAL apReqs[1]; +} RTFILEAIOCTXINTERNAL, *PRTFILEAIOCTXINTERNAL; + +/** + * Internal worker for waking up the waiting thread. + */ +static void rtFileAioCtxWakeup(PRTFILEAIOCTXINTERNAL pCtxInt) +{ + /* + * Read the thread handle before the status flag. + * If we read the handle after the flag we might + * end up with an invalid handle because the thread + * waiting in RTFileAioCtxWakeup() might get scheduled + * before we read the flag and returns. + * We can ensure that the handle is valid if fWaiting is true + * when reading the handle before the status flag. + */ + RTTHREAD hThread; + ASMAtomicReadHandle(&pCtxInt->hThreadWait, &hThread); + bool fWaiting = ASMAtomicReadBool(&pCtxInt->fWaiting); + if (fWaiting) + { + /* + * If a thread waits the handle must be valid. + * It is possible that the thread returns from + * aio_suspend() before the signal is send. + * This is no problem because we already set fWokenUp + * to true which will let the thread return VERR_INTERRUPTED + * and the next call to RTFileAioCtxWait() will not + * return VERR_INTERRUPTED because signals are not saved + * and will simply vanish if the destination thread can't + * receive it. + */ + Assert(hThread != NIL_RTTHREAD); + RTThreadPoke(hThread); + } +} + +/** + * Internal worker processing events and inserting new requests into the waiting list. + */ +static int rtFileAioCtxProcessEvents(PRTFILEAIOCTXINTERNAL pCtxInt) +{ + int rc = VINF_SUCCESS; + + /* Process new requests first. */ + bool fWokenUp = ASMAtomicXchgBool(&pCtxInt->fWokenUpInternal, false); + if (fWokenUp) + { + for (unsigned iSlot = 0; iSlot < RT_ELEMENTS(pCtxInt->apReqsNewHead); iSlot++) + { + PRTFILEAIOREQINTERNAL pReqHead = ASMAtomicXchgPtrT(&pCtxInt->apReqsNewHead[iSlot], NULL, PRTFILEAIOREQINTERNAL); + + while ( (pCtxInt->iFirstFree < pCtxInt->cReqsWaitMax) + && pReqHead) + { + RTFIELAIOREQ_ASSERT_STATE(pReqHead, SUBMITTED); + pCtxInt->apReqs[pCtxInt->iFirstFree] = pReqHead; + pReqHead->iWaitingList = pCtxInt->iFirstFree; + pReqHead = pReqHead->pNext; + + /* Clear pointer to next and previous element just for safety. */ + pCtxInt->apReqs[pCtxInt->iFirstFree]->pNext = NULL; + pCtxInt->apReqs[pCtxInt->iFirstFree]->pPrev = NULL; + pCtxInt->iFirstFree++; + + Assert( (pCtxInt->iFirstFree <= pCtxInt->cMaxRequests) + && (pCtxInt->iFirstFree <= pCtxInt->cReqsWaitMax)); + } + + /* Append the rest to the wait list. */ + if (pReqHead) + { + RTFIELAIOREQ_ASSERT_STATE(pReqHead, SUBMITTED); + if (!pCtxInt->pReqsWaitHead) + { + Assert(!pCtxInt->pReqsWaitTail); + pCtxInt->pReqsWaitHead = pReqHead; + pReqHead->pPrev = NULL; + } + else + { + AssertPtr(pCtxInt->pReqsWaitTail); + + pCtxInt->pReqsWaitTail->pNext = pReqHead; + pReqHead->pPrev = pCtxInt->pReqsWaitTail; + } + + /* Update tail. */ + while (pReqHead->pNext) + { + RTFIELAIOREQ_ASSERT_STATE(pReqHead->pNext, SUBMITTED); + pReqHead = pReqHead->pNext; + } + + pCtxInt->pReqsWaitTail = pReqHead; + pCtxInt->pReqsWaitTail->pNext = NULL; + } + } + + /* Check if a request needs to be canceled. */ + PRTFILEAIOREQINTERNAL pReqToCancel = ASMAtomicReadPtrT(&pCtxInt->pReqToCancel, PRTFILEAIOREQINTERNAL); + if (pReqToCancel) + { + /* The request can be in the array waiting for completion or still in the list because it is full. */ + if (pReqToCancel->iWaitingList != RTFILEAIOCTX_WAIT_ENTRY_INVALID) + { + /* Put it out of the waiting list. */ + pCtxInt->apReqs[pReqToCancel->iWaitingList] = pCtxInt->apReqs[--pCtxInt->iFirstFree]; + pCtxInt->apReqs[pReqToCancel->iWaitingList]->iWaitingList = pReqToCancel->iWaitingList; + } + else + { + /* Unlink from the waiting list. */ + PRTFILEAIOREQINTERNAL pPrev = pReqToCancel->pPrev; + PRTFILEAIOREQINTERNAL pNext = pReqToCancel->pNext; + + if (pNext) + pNext->pPrev = pPrev; + else + { + /* We canceled the tail. */ + pCtxInt->pReqsWaitTail = pPrev; + } + + if (pPrev) + pPrev->pNext = pNext; + else + { + /* We canceled the head. */ + pCtxInt->pReqsWaitHead = pNext; + } + } + + ASMAtomicDecS32(&pCtxInt->cRequests); + AssertMsg(pCtxInt->cRequests >= 0, ("Canceled request not which is not in this context\n")); + RTSemEventSignal(pCtxInt->SemEventCancel); + } + } + else + { + if (ASMAtomicXchgBool(&pCtxInt->fWokenUp, false)) + rc = VERR_INTERRUPTED; + } + + return rc; +} + +RTR3DECL(int) RTFileAioGetLimits(PRTFILEAIOLIMITS pAioLimits) +{ + int rcBSD = 0; + AssertPtrReturn(pAioLimits, VERR_INVALID_POINTER); + +#if defined(RT_OS_DARWIN) + int cReqsOutstandingMax = 0; + size_t cbParameter = sizeof(int); + + rcBSD = sysctlbyname("kern.aioprocmax", /* name */ + &cReqsOutstandingMax, /* Where to store the old value. */ + &cbParameter, /* Size of the memory pointed to. */ + NULL, /* Where the new value is located. */ + 0); /* Where the size of the new value is stored. */ + if (rcBSD == -1) + return RTErrConvertFromErrno(errno); + + pAioLimits->cReqsOutstandingMax = cReqsOutstandingMax; + pAioLimits->cbBufferAlignment = 0; +#elif defined(RT_OS_FREEBSD) + /* + * The AIO API is implemented in a kernel module which is not + * loaded by default. + * If it is loaded there are additional sysctl parameters. + */ + int cReqsOutstandingMax = 0; + size_t cbParameter = sizeof(int); + + rcBSD = sysctlbyname("vfs.aio.max_aio_per_proc", /* name */ + &cReqsOutstandingMax, /* Where to store the old value. */ + &cbParameter, /* Size of the memory pointed to. */ + NULL, /* Where the new value is located. */ + 0); /* Where the size of the new value is stored. */ + if (rcBSD == -1) + { + /* ENOENT means the value is unknown thus the module is not loaded. */ + if (errno == ENOENT) + return VERR_NOT_SUPPORTED; + else + return RTErrConvertFromErrno(errno); + } + + pAioLimits->cReqsOutstandingMax = cReqsOutstandingMax; + pAioLimits->cbBufferAlignment = 0; +#else + pAioLimits->cReqsOutstandingMax = RTFILEAIO_UNLIMITED_REQS; + pAioLimits->cbBufferAlignment = 0; +#endif + + return VINF_SUCCESS; +} + +RTR3DECL(int) RTFileAioReqCreate(PRTFILEAIOREQ phReq) +{ + AssertPtrReturn(phReq, VERR_INVALID_POINTER); + + PRTFILEAIOREQINTERNAL pReqInt = (PRTFILEAIOREQINTERNAL)RTMemAllocZ(sizeof(RTFILEAIOREQINTERNAL)); + if (RT_UNLIKELY(!pReqInt)) + return VERR_NO_MEMORY; + + pReqInt->pCtxInt = NULL; + pReqInt->u32Magic = RTFILEAIOREQ_MAGIC; + pReqInt->iWaitingList = RTFILEAIOCTX_WAIT_ENTRY_INVALID; + RTFILEAIOREQ_SET_STATE(pReqInt, COMPLETED); + + *phReq = (RTFILEAIOREQ)pReqInt; + + return VINF_SUCCESS; +} + + +RTDECL(int) RTFileAioReqDestroy(RTFILEAIOREQ hReq) +{ + /* + * Validate the handle and ignore nil. + */ + if (hReq == NIL_RTFILEAIOREQ) + return VINF_SUCCESS; + PRTFILEAIOREQINTERNAL pReqInt = hReq; + RTFILEAIOREQ_VALID_RETURN(pReqInt); + RTFILEAIOREQ_NOT_STATE_RETURN_RC(pReqInt, SUBMITTED, VERR_FILE_AIO_IN_PROGRESS); + + /* + * Trash the magic and free it. + */ + ASMAtomicUoWriteU32(&pReqInt->u32Magic, ~RTFILEAIOREQ_MAGIC); + RTMemFree(pReqInt); + return VINF_SUCCESS; +} + +/** + * Worker setting up the request. + */ +DECLINLINE(int) rtFileAioReqPrepareTransfer(RTFILEAIOREQ hReq, RTFILE hFile, + unsigned uTransferDirection, + RTFOFF off, void *pvBuf, size_t cbTransfer, + void *pvUser) +{ + /* + * Validate the input. + */ + PRTFILEAIOREQINTERNAL pReqInt = hReq; + RTFILEAIOREQ_VALID_RETURN(pReqInt); + RTFILEAIOREQ_NOT_STATE_RETURN_RC(pReqInt, SUBMITTED, VERR_FILE_AIO_IN_PROGRESS); + Assert(hFile != NIL_RTFILE); + AssertPtr(pvBuf); + Assert(off >= 0); + Assert(cbTransfer > 0); + + memset(&pReqInt->AioCB, 0, sizeof(struct aiocb)); + pReqInt->fFlush = false; + pReqInt->AioCB.aio_lio_opcode = uTransferDirection; + pReqInt->AioCB.aio_fildes = RTFileToNative(hFile); + pReqInt->AioCB.aio_offset = off; + pReqInt->AioCB.aio_nbytes = cbTransfer; + pReqInt->AioCB.aio_buf = pvBuf; + pReqInt->pvUser = pvUser; + pReqInt->pCtxInt = NULL; + pReqInt->Rc = VERR_FILE_AIO_IN_PROGRESS; + RTFILEAIOREQ_SET_STATE(pReqInt, PREPARED); + + return VINF_SUCCESS; +} + + +RTDECL(int) RTFileAioReqPrepareRead(RTFILEAIOREQ hReq, RTFILE hFile, RTFOFF off, + void *pvBuf, size_t cbRead, void *pvUser) +{ + return rtFileAioReqPrepareTransfer(hReq, hFile, LIO_READ, + off, pvBuf, cbRead, pvUser); +} + + +RTDECL(int) RTFileAioReqPrepareWrite(RTFILEAIOREQ hReq, RTFILE hFile, RTFOFF off, + void const *pvBuf, size_t cbWrite, void *pvUser) +{ + return rtFileAioReqPrepareTransfer(hReq, hFile, LIO_WRITE, + off, (void *)pvBuf, cbWrite, pvUser); +} + + +RTDECL(int) RTFileAioReqPrepareFlush(RTFILEAIOREQ hReq, RTFILE hFile, void *pvUser) +{ + PRTFILEAIOREQINTERNAL pReqInt = (PRTFILEAIOREQINTERNAL)hReq; + + RTFILEAIOREQ_VALID_RETURN(pReqInt); + RTFILEAIOREQ_NOT_STATE_RETURN_RC(pReqInt, SUBMITTED, VERR_FILE_AIO_IN_PROGRESS); + Assert(hFile != NIL_RTFILE); + + pReqInt->fFlush = true; + pReqInt->AioCB.aio_fildes = RTFileToNative(hFile); + pReqInt->AioCB.aio_offset = 0; + pReqInt->AioCB.aio_nbytes = 0; + pReqInt->AioCB.aio_buf = NULL; + pReqInt->pvUser = pvUser; + pReqInt->Rc = VERR_FILE_AIO_IN_PROGRESS; + RTFILEAIOREQ_SET_STATE(pReqInt, PREPARED); + + return VINF_SUCCESS; +} + + +RTDECL(void *) RTFileAioReqGetUser(RTFILEAIOREQ hReq) +{ + PRTFILEAIOREQINTERNAL pReqInt = hReq; + RTFILEAIOREQ_VALID_RETURN_RC(pReqInt, NULL); + + return pReqInt->pvUser; +} + + +RTDECL(int) RTFileAioReqCancel(RTFILEAIOREQ hReq) +{ + PRTFILEAIOREQINTERNAL pReqInt = hReq; + RTFILEAIOREQ_VALID_RETURN(pReqInt); + RTFILEAIOREQ_STATE_RETURN_RC(pReqInt, SUBMITTED, VERR_FILE_AIO_NOT_SUBMITTED); + + ASMAtomicXchgBool(&pReqInt->fCanceled, true); + + int rcPosix = aio_cancel(pReqInt->AioCB.aio_fildes, &pReqInt->AioCB); + + if (rcPosix == AIO_CANCELED) + { + PRTFILEAIOCTXINTERNAL pCtxInt = pReqInt->pCtxInt; + /* + * Notify the waiting thread that the request was canceled. + */ + AssertMsg(VALID_PTR(pCtxInt), + ("Invalid state. Request was canceled but wasn't submitted\n")); + + Assert(!pCtxInt->pReqToCancel); + ASMAtomicWritePtr(&pCtxInt->pReqToCancel, pReqInt); + rtFileAioCtxWakeup(pCtxInt); + + /* Wait for acknowledge. */ + int rc = RTSemEventWait(pCtxInt->SemEventCancel, RT_INDEFINITE_WAIT); + AssertRC(rc); + + ASMAtomicWriteNullPtr(&pCtxInt->pReqToCancel); + pReqInt->Rc = VERR_FILE_AIO_CANCELED; + RTFILEAIOREQ_SET_STATE(pReqInt, COMPLETED); + return VINF_SUCCESS; + } + else if (rcPosix == AIO_ALLDONE) + return VERR_FILE_AIO_COMPLETED; + else if (rcPosix == AIO_NOTCANCELED) + return VERR_FILE_AIO_IN_PROGRESS; + else + return RTErrConvertFromErrno(errno); +} + + +RTDECL(int) RTFileAioReqGetRC(RTFILEAIOREQ hReq, size_t *pcbTransfered) +{ + PRTFILEAIOREQINTERNAL pReqInt = hReq; + RTFILEAIOREQ_VALID_RETURN(pReqInt); + RTFILEAIOREQ_NOT_STATE_RETURN_RC(pReqInt, SUBMITTED, VERR_FILE_AIO_IN_PROGRESS); + RTFILEAIOREQ_NOT_STATE_RETURN_RC(pReqInt, PREPARED, VERR_FILE_AIO_NOT_SUBMITTED); + AssertPtrNull(pcbTransfered); + + if ( (RT_SUCCESS(pReqInt->Rc)) + && (pcbTransfered)) + *pcbTransfered = pReqInt->cbTransfered; + + return pReqInt->Rc; +} + + +RTDECL(int) RTFileAioCtxCreate(PRTFILEAIOCTX phAioCtx, uint32_t cAioReqsMax, + uint32_t fFlags) +{ + PRTFILEAIOCTXINTERNAL pCtxInt; + unsigned cReqsWaitMax; + + AssertPtrReturn(phAioCtx, VERR_INVALID_POINTER); + AssertReturn(!(fFlags & ~RTFILEAIOCTX_FLAGS_VALID_MASK), VERR_INVALID_PARAMETER); + + if (cAioReqsMax == RTFILEAIO_UNLIMITED_REQS) + return VERR_OUT_OF_RANGE; + + cReqsWaitMax = RT_MIN(cAioReqsMax, AIO_LISTIO_MAX); + + pCtxInt = (PRTFILEAIOCTXINTERNAL)RTMemAllocZ( sizeof(RTFILEAIOCTXINTERNAL) + + cReqsWaitMax * sizeof(PRTFILEAIOREQINTERNAL)); + if (RT_UNLIKELY(!pCtxInt)) + return VERR_NO_MEMORY; + + /* Create event semaphore. */ + int rc = RTSemEventCreate(&pCtxInt->SemEventCancel); + if (RT_FAILURE(rc)) + { + RTMemFree(pCtxInt); + return rc; + } + + pCtxInt->u32Magic = RTFILEAIOCTX_MAGIC; + pCtxInt->cMaxRequests = cAioReqsMax; + pCtxInt->cReqsWaitMax = cReqsWaitMax; + pCtxInt->fFlags = fFlags; + *phAioCtx = (RTFILEAIOCTX)pCtxInt; + + return VINF_SUCCESS; +} + + +RTDECL(int) RTFileAioCtxDestroy(RTFILEAIOCTX hAioCtx) +{ + PRTFILEAIOCTXINTERNAL pCtxInt = hAioCtx; + + AssertPtrReturn(pCtxInt, VERR_INVALID_HANDLE); + + if (RT_UNLIKELY(pCtxInt->cRequests)) + return VERR_FILE_AIO_BUSY; + + RTSemEventDestroy(pCtxInt->SemEventCancel); + RTMemFree(pCtxInt); + + return VINF_SUCCESS; +} + + +RTDECL(uint32_t) RTFileAioCtxGetMaxReqCount(RTFILEAIOCTX hAioCtx) +{ + PRTFILEAIOCTXINTERNAL pCtxInt = hAioCtx; + + if (hAioCtx == NIL_RTFILEAIOCTX) + return RTFILEAIO_UNLIMITED_REQS; + return pCtxInt->cMaxRequests; +} + +RTDECL(int) RTFileAioCtxAssociateWithFile(RTFILEAIOCTX hAioCtx, RTFILE hFile) +{ + NOREF(hAioCtx); NOREF(hFile); + return VINF_SUCCESS; +} + +#ifdef LOG_ENABLED +/** + * Dumps the state of a async I/O context. + */ +static void rtFileAioCtxDump(PRTFILEAIOCTXINTERNAL pCtxInt) +{ + LogFlow(("cRequests=%d\n", pCtxInt->cRequests)); + LogFlow(("cMaxRequests=%u\n", pCtxInt->cMaxRequests)); + LogFlow(("hThreadWait=%#p\n", pCtxInt->hThreadWait)); + LogFlow(("fWokenUp=%RTbool\n", pCtxInt->fWokenUp)); + LogFlow(("fWaiting=%RTbool\n", pCtxInt->fWaiting)); + LogFlow(("fWokenUpInternal=%RTbool\n", pCtxInt->fWokenUpInternal)); + for (unsigned i = 0; i < RT_ELEMENTS(pCtxInt->apReqsNewHead); i++) + LogFlow(("apReqsNewHead[%u]=%#p\n", i, pCtxInt->apReqsNewHead[i])); + LogFlow(("pReqToCancel=%#p\n", pCtxInt->pReqToCancel)); + LogFlow(("pReqsWaitHead=%#p\n", pCtxInt->pReqsWaitHead)); + LogFlow(("pReqsWaitTail=%#p\n", pCtxInt->pReqsWaitTail)); + LogFlow(("cReqsWaitMax=%u\n", pCtxInt->cReqsWaitMax)); + LogFlow(("iFirstFree=%u\n", pCtxInt->iFirstFree)); + for (unsigned i = 0; i < pCtxInt->cReqsWaitMax; i++) + LogFlow(("apReqs[%u]=%#p\n", i, pCtxInt->apReqs[i])); +} +#endif + +RTDECL(int) RTFileAioCtxSubmit(RTFILEAIOCTX hAioCtx, PRTFILEAIOREQ pahReqs, size_t cReqs) +{ + int rc = VINF_SUCCESS; + PRTFILEAIOCTXINTERNAL pCtxInt = hAioCtx; + + /* Parameter checks */ + AssertPtrReturn(pCtxInt, VERR_INVALID_HANDLE); + AssertReturn(cReqs != 0, VERR_INVALID_POINTER); + AssertPtrReturn(pahReqs, VERR_INVALID_PARAMETER); + + rtFileAioCtxDump(pCtxInt); + + /* Check that we don't exceed the limit */ + if (ASMAtomicUoReadS32(&pCtxInt->cRequests) + cReqs > pCtxInt->cMaxRequests) + return VERR_FILE_AIO_LIMIT_EXCEEDED; + + PRTFILEAIOREQINTERNAL pHead = NULL; + + do + { + int rcPosix = 0; + size_t cReqsSubmit = 0; + size_t i = 0; + PRTFILEAIOREQINTERNAL pReqInt; + + while ( (i < cReqs) + && (i < AIO_LISTIO_MAX)) + { + pReqInt = pahReqs[i]; + if (RTFILEAIOREQ_IS_NOT_VALID(pReqInt)) + { + /* Undo everything and stop submitting. */ + for (size_t iUndo = 0; iUndo < i; iUndo++) + { + pReqInt = pahReqs[iUndo]; + RTFILEAIOREQ_SET_STATE(pReqInt, PREPARED); + pReqInt->pCtxInt = NULL; + + /* Unlink from the list again. */ + PRTFILEAIOREQINTERNAL pNext, pPrev; + pNext = pReqInt->pNext; + pPrev = pReqInt->pPrev; + if (pNext) + pNext->pPrev = pPrev; + if (pPrev) + pPrev->pNext = pNext; + else + pHead = pNext; + } + rc = VERR_INVALID_HANDLE; + break; + } + + pReqInt->pCtxInt = pCtxInt; + + if (pReqInt->fFlush) + break; + + /* Link them together. */ + pReqInt->pNext = pHead; + if (pHead) + pHead->pPrev = pReqInt; + pReqInt->pPrev = NULL; + pHead = pReqInt; + RTFILEAIOREQ_SET_STATE(pReqInt, SUBMITTED); + + cReqsSubmit++; + i++; + } + + if (cReqsSubmit) + { + rcPosix = lio_listio(LIO_NOWAIT, (struct aiocb **)pahReqs, cReqsSubmit, NULL); + if (RT_UNLIKELY(rcPosix < 0)) + { + size_t cReqsSubmitted = cReqsSubmit; + + if (errno == EAGAIN) + rc = VERR_FILE_AIO_INSUFFICIENT_RESSOURCES; + else + rc = RTErrConvertFromErrno(errno); + + /* Check which ones were not submitted. */ + for (i = 0; i < cReqsSubmit; i++) + { + pReqInt = pahReqs[i]; + + rcPosix = aio_error(&pReqInt->AioCB); + + if ((rcPosix != EINPROGRESS) && (rcPosix != 0)) + { + cReqsSubmitted--; + +#if defined(RT_OS_DARWIN) || defined(RT_OS_FREEBSD) + if (errno == EINVAL) +#else + if (rcPosix == EINVAL) +#endif + { + /* Was not submitted. */ + RTFILEAIOREQ_SET_STATE(pReqInt, PREPARED); + } + else + { + /* An error occurred. */ + RTFILEAIOREQ_SET_STATE(pReqInt, COMPLETED); + + /* + * Looks like Apple and glibc interpret the standard in different ways. + * glibc returns the error code which would be in errno but Apple returns + * -1 and sets errno to the appropriate value + */ +#if defined(RT_OS_DARWIN) || defined(RT_OS_FREEBSD) + Assert(rcPosix == -1); + pReqInt->Rc = RTErrConvertFromErrno(errno); +#elif defined(RT_OS_LINUX) + pReqInt->Rc = RTErrConvertFromErrno(rcPosix); +#endif + pReqInt->cbTransfered = 0; + } + /* Unlink from the list. */ + PRTFILEAIOREQINTERNAL pNext, pPrev; + pNext = pReqInt->pNext; + pPrev = pReqInt->pPrev; + if (pNext) + pNext->pPrev = pPrev; + if (pPrev) + pPrev->pNext = pNext; + else + pHead = pNext; + + pReqInt->pNext = NULL; + pReqInt->pPrev = NULL; + } + } + ASMAtomicAddS32(&pCtxInt->cRequests, cReqsSubmitted); + AssertMsg(pCtxInt->cRequests >= 0, ("Adding requests resulted in overflow\n")); + break; + } + + ASMAtomicAddS32(&pCtxInt->cRequests, cReqsSubmit); + AssertMsg(pCtxInt->cRequests >= 0, ("Adding requests resulted in overflow\n")); + cReqs -= cReqsSubmit; + pahReqs += cReqsSubmit; + } + + /* + * Check if we have a flush request now. + * If not we hit the AIO_LISTIO_MAX limit + * and will continue submitting requests + * above. + */ + if (cReqs && RT_SUCCESS_NP(rc)) + { + pReqInt = pahReqs[0]; + + if (pReqInt->fFlush) + { + /* + * lio_listio does not work with flush requests so + * we have to use aio_fsync directly. + */ + rcPosix = aio_fsync(O_SYNC, &pReqInt->AioCB); + if (RT_UNLIKELY(rcPosix < 0)) + { + if (errno == EAGAIN) + { + rc = VERR_FILE_AIO_INSUFFICIENT_RESSOURCES; + RTFILEAIOREQ_SET_STATE(pReqInt, PREPARED); + } + else + { + rc = RTErrConvertFromErrno(errno); + RTFILEAIOREQ_SET_STATE(pReqInt, COMPLETED); + pReqInt->Rc = rc; + } + pReqInt->cbTransfered = 0; + break; + } + + /* Link them together. */ + pReqInt->pNext = pHead; + if (pHead) + pHead->pPrev = pReqInt; + pReqInt->pPrev = NULL; + pHead = pReqInt; + RTFILEAIOREQ_SET_STATE(pReqInt, SUBMITTED); + + ASMAtomicIncS32(&pCtxInt->cRequests); + AssertMsg(pCtxInt->cRequests >= 0, ("Adding requests resulted in overflow\n")); + cReqs--; + pahReqs++; + } + } + } while ( cReqs + && RT_SUCCESS_NP(rc)); + + if (pHead) + { + /* + * Forward successfully submitted requests to the thread waiting for requests. + * We search for a free slot first and if we don't find one + * we will grab the first one and append our list to the existing entries. + */ + unsigned iSlot = 0; + while ( (iSlot < RT_ELEMENTS(pCtxInt->apReqsNewHead)) + && !ASMAtomicCmpXchgPtr(&pCtxInt->apReqsNewHead[iSlot], pHead, NULL)) + iSlot++; + + if (iSlot == RT_ELEMENTS(pCtxInt->apReqsNewHead)) + { + /* Nothing found. */ + PRTFILEAIOREQINTERNAL pOldHead = ASMAtomicXchgPtrT(&pCtxInt->apReqsNewHead[0], NULL, PRTFILEAIOREQINTERNAL); + + /* Find the end of the current head and link the old list to the current. */ + PRTFILEAIOREQINTERNAL pTail = pHead; + while (pTail->pNext) + pTail = pTail->pNext; + + pTail->pNext = pOldHead; + + ASMAtomicWritePtr(&pCtxInt->apReqsNewHead[0], pHead); + } + + /* Set the internal wakeup flag and wakeup the thread if possible. */ + bool fWokenUp = ASMAtomicXchgBool(&pCtxInt->fWokenUpInternal, true); + if (!fWokenUp) + rtFileAioCtxWakeup(pCtxInt); + } + + rtFileAioCtxDump(pCtxInt); + + return rc; +} + + +RTDECL(int) RTFileAioCtxWait(RTFILEAIOCTX hAioCtx, size_t cMinReqs, RTMSINTERVAL cMillies, + PRTFILEAIOREQ pahReqs, size_t cReqs, uint32_t *pcReqs) +{ + int rc = VINF_SUCCESS; + int cRequestsCompleted = 0; + PRTFILEAIOCTXINTERNAL pCtxInt = (PRTFILEAIOCTXINTERNAL)hAioCtx; + struct timespec Timeout; + struct timespec *pTimeout = NULL; + uint64_t StartNanoTS = 0; + + LogFlowFunc(("hAioCtx=%#p cMinReqs=%zu cMillies=%u pahReqs=%#p cReqs=%zu pcbReqs=%#p\n", + hAioCtx, cMinReqs, cMillies, pahReqs, cReqs, pcReqs)); + + /* Check parameters. */ + AssertPtrReturn(pCtxInt, VERR_INVALID_HANDLE); + AssertPtrReturn(pcReqs, VERR_INVALID_POINTER); + AssertPtrReturn(pahReqs, VERR_INVALID_POINTER); + AssertReturn(cReqs != 0, VERR_INVALID_PARAMETER); + AssertReturn(cReqs >= cMinReqs, VERR_OUT_OF_RANGE); + + rtFileAioCtxDump(pCtxInt); + + int32_t cRequestsWaiting = ASMAtomicReadS32(&pCtxInt->cRequests); + + if ( RT_UNLIKELY(cRequestsWaiting <= 0) + && !(pCtxInt->fFlags & RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS)) + return VERR_FILE_AIO_NO_REQUEST; + + if (RT_UNLIKELY(cMinReqs > (uint32_t)cRequestsWaiting)) + return VERR_INVALID_PARAMETER; + + if (cMillies != RT_INDEFINITE_WAIT) + { + Timeout.tv_sec = cMillies / 1000; + Timeout.tv_nsec = (cMillies % 1000) * 1000000; + pTimeout = &Timeout; + StartNanoTS = RTTimeNanoTS(); + } + + /* Wait for at least one. */ + if (!cMinReqs) + cMinReqs = 1; + + /* For the wakeup call. */ + Assert(pCtxInt->hThreadWait == NIL_RTTHREAD); + ASMAtomicWriteHandle(&pCtxInt->hThreadWait, RTThreadSelf()); + + /* Update the waiting list once before we enter the loop. */ + rc = rtFileAioCtxProcessEvents(pCtxInt); + + while ( cMinReqs + && RT_SUCCESS_NP(rc)) + { +#ifdef RT_STRICT + if (RT_UNLIKELY(!pCtxInt->iFirstFree)) + { + for (unsigned i = 0; i < pCtxInt->cReqsWaitMax; i++) + RTAssertMsg2Weak("wait[%d] = %#p\n", i, pCtxInt->apReqs[i]); + + AssertMsgFailed(("No request to wait for. pReqsWaitHead=%#p pReqsWaitTail=%#p\n", + pCtxInt->pReqsWaitHead, pCtxInt->pReqsWaitTail)); + } +#endif + + LogFlow(("Waiting for %d requests to complete\n", pCtxInt->iFirstFree)); + rtFileAioCtxDump(pCtxInt); + + ASMAtomicXchgBool(&pCtxInt->fWaiting, true); + int rcPosix = aio_suspend((const struct aiocb * const *)pCtxInt->apReqs, + pCtxInt->iFirstFree, pTimeout); + ASMAtomicXchgBool(&pCtxInt->fWaiting, false); + if (rcPosix < 0) + { + LogFlow(("aio_suspend failed %d nent=%u\n", errno, pCtxInt->iFirstFree)); + /* Check that this is an external wakeup event. */ + if (errno == EINTR) + rc = rtFileAioCtxProcessEvents(pCtxInt); + else + rc = RTErrConvertFromErrno(errno); + } + else + { + /* Requests finished. */ + unsigned iReqCurr = 0; + unsigned cDone = 0; + + /* Remove completed requests from the waiting list. */ + while ( (iReqCurr < pCtxInt->iFirstFree) + && (cDone < cReqs)) + { + PRTFILEAIOREQINTERNAL pReq = pCtxInt->apReqs[iReqCurr]; + int rcReq = aio_error(&pReq->AioCB); + + if (rcReq != EINPROGRESS) + { + /* Completed store the return code. */ + if (rcReq == 0) + { + pReq->Rc = VINF_SUCCESS; + /* Call aio_return() to free resources. */ + pReq->cbTransfered = aio_return(&pReq->AioCB); + } + else + { +#if defined(RT_OS_DARWIN) || defined(RT_OS_FREEBSD) + pReq->Rc = RTErrConvertFromErrno(errno); +#else + pReq->Rc = RTErrConvertFromErrno(rcReq); +#endif + } + + /* Mark the request as finished. */ + RTFILEAIOREQ_SET_STATE(pReq, COMPLETED); + cDone++; + + /* If there are other entries waiting put the head into the now free entry. */ + if (pCtxInt->pReqsWaitHead) + { + PRTFILEAIOREQINTERNAL pReqInsert = pCtxInt->pReqsWaitHead; + + pCtxInt->pReqsWaitHead = pReqInsert->pNext; + if (!pCtxInt->pReqsWaitHead) + { + /* List is empty now. Clear tail too. */ + pCtxInt->pReqsWaitTail = NULL; + } + + pReqInsert->iWaitingList = pReq->iWaitingList; + pCtxInt->apReqs[pReqInsert->iWaitingList] = pReqInsert; + iReqCurr++; + } + else + { + /* + * Move the last entry into the current position to avoid holes + * but only if it is not the last element already. + */ + if (pReq->iWaitingList < pCtxInt->iFirstFree - 1) + { + pCtxInt->apReqs[pReq->iWaitingList] = pCtxInt->apReqs[--pCtxInt->iFirstFree]; + pCtxInt->apReqs[pReq->iWaitingList]->iWaitingList = pReq->iWaitingList; + } + else + pCtxInt->iFirstFree--; + + pCtxInt->apReqs[pCtxInt->iFirstFree] = NULL; + } + + /* Put the request into the completed list. */ + pahReqs[cRequestsCompleted++] = pReq; + pReq->iWaitingList = RTFILEAIOCTX_WAIT_ENTRY_INVALID; + } + else + iReqCurr++; + } + + AssertMsg((cDone <= cReqs), ("Overflow cReqs=%u cMinReqs=%u cDone=%u\n", + cReqs, cDone)); + cReqs -= cDone; + cMinReqs = RT_MAX(cMinReqs, cDone) - cDone; + ASMAtomicSubS32(&pCtxInt->cRequests, cDone); + + AssertMsg(pCtxInt->cRequests >= 0, ("Finished more requests than currently active\n")); + + if (!cMinReqs) + break; + + if (cMillies != RT_INDEFINITE_WAIT) + { + uint64_t TimeDiff; + + /* Recalculate the timeout. */ + TimeDiff = RTTimeSystemNanoTS() - StartNanoTS; + Timeout.tv_sec = Timeout.tv_sec - (TimeDiff / 1000000); + Timeout.tv_nsec = Timeout.tv_nsec - (TimeDiff % 1000000); + } + + /* Check for new elements. */ + rc = rtFileAioCtxProcessEvents(pCtxInt); + } + } + + *pcReqs = cRequestsCompleted; + Assert(pCtxInt->hThreadWait == RTThreadSelf()); + ASMAtomicWriteHandle(&pCtxInt->hThreadWait, NIL_RTTHREAD); + + rtFileAioCtxDump(pCtxInt); + + return rc; +} + + +RTDECL(int) RTFileAioCtxWakeup(RTFILEAIOCTX hAioCtx) +{ + PRTFILEAIOCTXINTERNAL pCtxInt = hAioCtx; + RTFILEAIOCTX_VALID_RETURN(pCtxInt); + + /** @todo r=bird: Define the protocol for how to resume work after calling + * this function. */ + + bool fWokenUp = ASMAtomicXchgBool(&pCtxInt->fWokenUp, true); + if (!fWokenUp) + rtFileAioCtxWakeup(pCtxInt); + + return VINF_SUCCESS; +} + |