diff options
Diffstat (limited to 'src/VBox/Runtime/r3/os2/pipe-os2.cpp')
-rw-r--r-- | src/VBox/Runtime/r3/os2/pipe-os2.cpp | 1061 |
1 files changed, 1061 insertions, 0 deletions
diff --git a/src/VBox/Runtime/r3/os2/pipe-os2.cpp b/src/VBox/Runtime/r3/os2/pipe-os2.cpp new file mode 100644 index 00000000..b295afb0 --- /dev/null +++ b/src/VBox/Runtime/r3/os2/pipe-os2.cpp @@ -0,0 +1,1061 @@ +/* $Id: pipe-os2.cpp $ */ +/** @file + * IPRT - Anonymous Pipes, OS/2 Implementation. + */ + +/* + * Copyright (C) 2010-2019 Oracle Corporation + * + * This file is part of VirtualBox Open Source Edition (OSE), as + * available from http://www.virtualbox.org. This file is free software; + * you can redistribute it and/or modify it under the terms of the GNU + * General Public License (GPL) as published by the Free Software + * Foundation, in version 2 as it comes in the "COPYING" file of the + * VirtualBox OSE distribution. VirtualBox OSE is distributed in the + * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. + * + * The contents of this file may alternatively be used under the terms + * of the Common Development and Distribution License Version 1.0 + * (CDDL) only, as it comes in the "COPYING.CDDL" file of the + * VirtualBox OSE distribution, in which case the provisions of the + * CDDL are applicable instead of those of the GPL. + * + * You may elect to license modified versions of this file under the + * terms and conditions of either the GPL or the CDDL or both. + */ + + +/********************************************************************************************************************************* +* Header Files * +*********************************************************************************************************************************/ +#define INCL_ERRORS +#define INCL_DOSSEMAPHORES +#include <os2.h> + +#include <iprt/pipe.h> +#include "internal/iprt.h" + +#include <iprt/asm.h> +#include <iprt/assert.h> +#include <iprt/critsect.h> +#include <iprt/err.h> +#include <iprt/mem.h> +#include <iprt/string.h> +#include <iprt/poll.h> +#include <iprt/process.h> +#include <iprt/thread.h> +#include <iprt/time.h> +#include "internal/pipe.h" +#include "internal/magics.h" + + +/********************************************************************************************************************************* +* Defined Constants And Macros * +*********************************************************************************************************************************/ +/** The pipe buffer size we prefer. */ +#define RTPIPE_OS2_SIZE _32K + + +/********************************************************************************************************************************* +* Structures and Typedefs * +*********************************************************************************************************************************/ +typedef struct RTPIPEINTERNAL +{ + /** Magic value (RTPIPE_MAGIC). */ + uint32_t u32Magic; + /** The pipe handle. */ + HPIPE hPipe; + /** Set if this is the read end, clear if it's the write end. */ + bool fRead; + /** Whether the pipe is in blocking or non-blocking mode. */ + bool fBlocking; + /** Set if the pipe is broken. */ + bool fBrokenPipe; + /** Usage counter. */ + uint32_t cUsers; + + /** The event semaphore associated with the pipe. */ + HEV hev; + /** The handle of the poll set currently polling on this pipe. + * We can only have one poller at the time (lazy bird). */ + RTPOLLSET hPollSet; + /** Critical section protecting the above members. + * (Taking the lazy/simple approach.) */ + RTCRITSECT CritSect; + +} RTPIPEINTERNAL; + + +/** + * Ensures that the pipe has a semaphore associated with it. + * + * @returns VBox status code. + * @param pThis The pipe. + */ +static int rtPipeOs2EnsureSem(RTPIPEINTERNAL *pThis) +{ + if (pThis->hev != NULLHANDLE) + return VINF_SUCCESS; + + HEV hev; + APIRET orc = DosCreateEventSem(NULL, &hev, DC_SEM_SHARED, FALSE); + if (orc == NO_ERROR) + { + orc = DosSetNPipeSem(pThis->hPipe, (HSEM)hev, 1); + if (orc == NO_ERROR) + { + pThis->hev = hev; + return VINF_SUCCESS; + } + + DosCloseEventSem(hev); + } + return RTErrConvertFromOS2(orc); +} + + +RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags) +{ + AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER); + AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER); + AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER); + + /* + * Try create and connect a pipe pair. + */ + APIRET orc; + HPIPE hPipeR; + HFILE hPipeW; + int rc; + for (;;) + { + static volatile uint32_t g_iNextPipe = 0; + char szName[128]; + RTStrPrintf(szName, sizeof(szName), "\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe)); + + /* + * Create the read end of the pipe. + */ + ULONG fPipeMode = 1 /*instance*/ | NP_TYPE_BYTE | NP_READMODE_BYTE | NP_NOWAIT; + ULONG fOpenMode = NP_ACCESS_DUPLEX | NP_WRITEBEHIND; + if (fFlags & RTPIPE_C_INHERIT_READ) + fOpenMode |= NP_INHERIT; + else + fOpenMode |= NP_NOINHERIT; + orc = DosCreateNPipe((PSZ)szName, &hPipeR, fOpenMode, fPipeMode, RTPIPE_OS2_SIZE, RTPIPE_OS2_SIZE, NP_DEFAULT_WAIT); + if (orc == NO_ERROR) + { + orc = DosConnectNPipe(hPipeR); + if (orc == ERROR_PIPE_NOT_CONNECTED || orc == NO_ERROR) + { + /* + * Connect to the pipe (the write end), attach sem below. + */ + ULONG ulAction = 0; + ULONG fOpenW = OPEN_ACTION_FAIL_IF_NEW | OPEN_ACTION_OPEN_IF_EXISTS; + ULONG fModeW = OPEN_ACCESS_WRITEONLY | OPEN_SHARE_DENYNONE | OPEN_FLAGS_FAIL_ON_ERROR; + if (!(fFlags & RTPIPE_C_INHERIT_WRITE)) + fModeW |= OPEN_FLAGS_NOINHERIT; + orc = DosOpen((PSZ)szName, &hPipeW, &ulAction, 0 /*cbFile*/, FILE_NORMAL, + fOpenW, fModeW, NULL /*peaop2*/); + if (orc == NO_ERROR) + break; + } + DosClose(hPipeR); + } + if ( orc != ERROR_PIPE_BUSY /* already exist - compatible */ + && orc != ERROR_ACCESS_DENIED /* already exist - incompatible (?) */) + return RTErrConvertFromOS2(orc); + /* else: try again with a new name */ + } + + /* + * Create the two handles. + */ + RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); + if (pThisR) + { + RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); + if (pThisW) + { + /* Crit sects. */ + rc = RTCritSectInit(&pThisR->CritSect); + if (RT_SUCCESS(rc)) + { + rc = RTCritSectInit(&pThisW->CritSect); + if (RT_SUCCESS(rc)) + { + /* Initialize the structures. */ + pThisR->u32Magic = RTPIPE_MAGIC; + pThisW->u32Magic = RTPIPE_MAGIC; + pThisR->hPipe = hPipeR; + pThisW->hPipe = hPipeW; + pThisR->hev = NULLHANDLE; + pThisW->hev = NULLHANDLE; + pThisR->fRead = true; + pThisW->fRead = false; + pThisR->fBlocking = false; + pThisW->fBlocking = true; + //pThisR->fBrokenPipe = false; + //pThisW->fBrokenPipe = false; + //pThisR->cUsers = 0; + //pThisW->cUsers = 0; + pThisR->hPollSet = NIL_RTPOLLSET; + pThisW->hPollSet = NIL_RTPOLLSET; + + *phPipeRead = pThisR; + *phPipeWrite = pThisW; + return VINF_SUCCESS; + } + + RTCritSectDelete(&pThisR->CritSect); + } + RTMemFree(pThisW); + } + else + rc = VERR_NO_MEMORY; + RTMemFree(pThisR); + } + else + rc = VERR_NO_MEMORY; + + /* Don't call DosDisConnectNPipe! */ + DosClose(hPipeW); + DosClose(hPipeR); + return rc; +} + + +RTDECL(int) RTPipeClose(RTPIPE hPipe) +{ + RTPIPEINTERNAL *pThis = hPipe; + if (pThis == NIL_RTPIPE) + return VINF_SUCCESS; + AssertPtrReturn(pThis, VERR_INVALID_PARAMETER); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + + /* + * Do the cleanup. + */ + AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE); + RTCritSectEnter(&pThis->CritSect); + Assert(pThis->cUsers == 0); + + /* Don't call DosDisConnectNPipe! */ + DosClose(pThis->hPipe); + pThis->hPipe = (HPIPE)-1; + + if (pThis->hev != NULLHANDLE) + { + DosCloseEventSem(pThis->hev); + pThis->hev = NULLHANDLE; + } + + RTCritSectLeave(&pThis->CritSect); + RTCritSectDelete(&pThis->CritSect); + + RTMemFree(pThis); + + return VINF_SUCCESS; +} + + +RTDECL(int) RTPipeFromNative(PRTPIPE phPipe, RTHCINTPTR hNativePipe, uint32_t fFlags) +{ + AssertPtrReturn(phPipe, VERR_INVALID_POINTER); + AssertReturn(!(fFlags & ~RTPIPE_N_VALID_MASK), VERR_INVALID_PARAMETER); + AssertReturn(!!(fFlags & RTPIPE_N_READ) != !!(fFlags & RTPIPE_N_WRITE), VERR_INVALID_PARAMETER); + + /* + * Get and validate the pipe handle info. + */ + HPIPE hNative = (HPIPE)hNativePipe; + ULONG ulType = 0; + ULONG ulAttr = 0; + APIRET orc = DosQueryHType(hNative, &ulType, &ulAttr); + AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc)); + AssertReturn((ulType & 0x7) == HANDTYPE_PIPE, VERR_INVALID_HANDLE); + +#if 0 + union + { + PIPEINFO PipeInfo; + uint8_t abPadding[sizeof(PIPEINFO) + 127]; + } Buf; + orc = DosQueryNPipeInfo(hNative, 1, &Buf, sizeof(Buf)); + if (orc != NO_ERROR) + { + /* Sorry, anonymous pips are not supported. */ + AssertMsgFailed(("%d\n", orc)); + return VERR_INVALID_HANDLE; + } + AssertReturn(Buf.PipeInfo.cbMaxInst == 1, VERR_INVALID_HANDLE); +#endif + + ULONG fPipeState = 0; + orc = DosQueryNPHState(hNative, &fPipeState); + if (orc != NO_ERROR) + { + /* Sorry, anonymous pips are not supported. */ + AssertMsgFailed(("%d\n", orc)); + return VERR_INVALID_HANDLE; + } + AssertReturn(!(fPipeState & NP_TYPE_MESSAGE), VERR_INVALID_HANDLE); + AssertReturn(!(fPipeState & NP_READMODE_MESSAGE), VERR_INVALID_HANDLE); + AssertReturn((fPipeState & 0xff) == 1, VERR_INVALID_HANDLE); + + ULONG fFileState = 0; + orc = DosQueryFHState(hNative, &fFileState); + AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), VERR_INVALID_HANDLE); + AssertMsgReturn( (fFileState & 0x3) == (fFlags & RTPIPE_N_READ ? OPEN_ACCESS_READONLY : OPEN_ACCESS_WRITEONLY) + || (fFileState & 0x3) == OPEN_ACCESS_READWRITE + , ("%#x\n", fFileState), VERR_INVALID_HANDLE); + + /* + * Looks kind of OK. Fix the inherit flag. + */ + orc = DosSetFHState(hNative, (fFileState & (OPEN_FLAGS_WRITE_THROUGH | OPEN_FLAGS_FAIL_ON_ERROR | OPEN_FLAGS_NO_CACHE)) + | (fFlags & RTPIPE_N_INHERIT ? 0 : OPEN_FLAGS_NOINHERIT)); + AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc)); + + + /* + * Create a handle so we can try rtPipeQueryInfo on it + * and see if we need to duplicate it to make that call work. + */ + RTPIPEINTERNAL *pThis = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL)); + if (!pThis) + return VERR_NO_MEMORY; + int rc = RTCritSectInit(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + pThis->u32Magic = RTPIPE_MAGIC; + pThis->hPipe = hNative; + pThis->hev = NULLHANDLE; + pThis->fRead = !!(fFlags & RTPIPE_N_READ); + pThis->fBlocking = !(fPipeState & NP_NOWAIT); + //pThis->fBrokenPipe = false; + //pThis->cUsers = 0; + pThis->hPollSet = NIL_RTPOLLSET; + + *phPipe = pThis; + return VINF_SUCCESS; + + //RTCritSectDelete(&pThis->CritSect); + } + RTMemFree(pThis); + return rc; +} + +RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, -1); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, -1); + + return (RTHCINTPTR)pThis->hPipe; +} + +/** + * Prepare blocking mode. + * + * @returns IPRT status code. + * @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is + * attempted. + * + * @param pThis The pipe handle. + * + * @remarks Caller owns the critical section. + */ +static int rtPipeTryBlocking(RTPIPEINTERNAL *pThis) +{ + if (!pThis->fBlocking) + { + if (pThis->cUsers != 0) + return VERR_WRONG_ORDER; + + APIRET orc = DosSetNPHState(pThis->hPipe, NP_WAIT | NP_READMODE_BYTE); + if (orc != NO_ERROR) + { + if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED) + return RTErrConvertFromOS2(orc); + pThis->fBrokenPipe = true; + } + pThis->fBlocking = true; + } + + pThis->cUsers++; + return VINF_SUCCESS; +} + + +/** + * Prepare non-blocking mode. + * + * @returns IPRT status code. + * @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is + * attempted. + * + * @param pThis The pipe handle. + */ +static int rtPipeTryNonBlocking(RTPIPEINTERNAL *pThis) +{ + if (pThis->fBlocking) + { + if (pThis->cUsers != 0) + return VERR_WRONG_ORDER; + + APIRET orc = DosSetNPHState(pThis->hPipe, NP_NOWAIT | NP_READMODE_BYTE); + if (orc != NO_ERROR) + { + if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED) + return RTErrConvertFromOS2(orc); + pThis->fBrokenPipe = true; + } + pThis->fBlocking = false; + } + + pThis->cUsers++; + return VINF_SUCCESS; +} + + +/** + * Checks if the read pipe has been broken. + * + * @returns true if broken, false if no. + * @param pThis The pipe handle (read). + */ +static bool rtPipeOs2IsBroken(RTPIPEINTERNAL *pThis) +{ + Assert(pThis->fRead); + +#if 0 + /* + * Query it via the semaphore. Not sure how fast this is... + */ + PIPESEMSTATE aStates[3]; RT_ZERO(aStates); + APIRET orc = DosQueryNPipeSemState(pThis->hev, &aStates[0], sizeof(aStates)); + if (orc == NO_ERROR) + { + if (aStates[0].fStatus == NPSS_CLOSE) + return true; + if (aStates[0].fStatus == NPSS_RDATA) + return false; + } + AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus)); + + /* + * Fall back / alternative method. + */ +#endif + ULONG cbActual = 0; + ULONG ulState = 0; + AVAILDATA Avail = { 0, 0 }; + APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState); + if (orc != NO_ERROR) + { + if (orc != ERROR_PIPE_BUSY) + AssertMsgFailed(("%d\n", orc)); + return false; + } + + return ulState != NP_STATE_CONNECTED; +} + + +RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + AssertReturn(pThis->fRead, VERR_ACCESS_DENIED); + AssertPtr(pcbRead); + AssertPtr(pvBuf); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + rc = rtPipeTryNonBlocking(pThis); + if (RT_SUCCESS(rc)) + { + RTCritSectLeave(&pThis->CritSect); + + ULONG cbActual = 0; + APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual); + if (orc == NO_ERROR) + { + if (cbActual || !cbToRead || !rtPipeOs2IsBroken(pThis)) + *pcbRead = cbActual; + else + rc = VERR_BROKEN_PIPE; + } + else if (orc == ERROR_NO_DATA) + { + *pcbRead = 0; + rc = VINF_TRY_AGAIN; + } + else + rc = RTErrConvertFromOS2(orc); + + RTCritSectEnter(&pThis->CritSect); + if (rc == VERR_BROKEN_PIPE) + pThis->fBrokenPipe = true; + pThis->cUsers--; + } + else + rc = VERR_WRONG_ORDER; + RTCritSectLeave(&pThis->CritSect); + } + return rc; +} + + +RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + AssertReturn(pThis->fRead, VERR_ACCESS_DENIED); + AssertPtr(pvBuf); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + rc = rtPipeTryBlocking(pThis); + if (RT_SUCCESS(rc)) + { + RTCritSectLeave(&pThis->CritSect); + + size_t cbTotalRead = 0; + while (cbToRead > 0) + { + ULONG cbActual = 0; + APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual); + if (orc != NO_ERROR) + { + rc = RTErrConvertFromOS2(orc); + break; + } + if (!cbActual && rtPipeOs2IsBroken(pThis)) + { + rc = VERR_BROKEN_PIPE; + break; + } + + /* advance */ + pvBuf = (char *)pvBuf + cbActual; + cbTotalRead += cbActual; + cbToRead -= cbActual; + } + + if (pcbRead) + { + *pcbRead = cbTotalRead; + if ( RT_FAILURE(rc) + && cbTotalRead) + rc = VINF_SUCCESS; + } + + RTCritSectEnter(&pThis->CritSect); + if (rc == VERR_BROKEN_PIPE) + pThis->fBrokenPipe = true; + pThis->cUsers--; + } + else + rc = VERR_WRONG_ORDER; + RTCritSectLeave(&pThis->CritSect); + } + return rc; +} + + +/** + * Gets the available write buffer size of the pipe. + * + * @returns Number of bytes, 1 on failure. + * @param pThis The pipe handle. + */ +static ULONG rtPipeOs2GetSpace(RTPIPEINTERNAL *pThis) +{ + Assert(!pThis->fRead); + +#if 0 /* Not sure which is more efficient, neither are really optimal, I fear. */ + /* + * Query via semaphore state. + * This will walk the list of active named pipes... + */ + /** @todo Check how hev and hpipe are associated, if complicated, use the + * alternative method below. */ + PIPESEMSTATE aStates[3]; RT_ZERO(aStates); + APIRET orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates)); + if (orc == NO_ERROR) + { + if (aStates[0].fStatus == NPSS_WSPACE) + return aStates[0].usAvail; + if (aStates[1].fStatus == NPSS_WSPACE) + return aStates[1].usAvail; + return 0; + } + AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus)); + +#else + /* + * Query via the pipe info. + * This will have to lookup and store the pipe name. + */ + union + { + PIPEINFO PipeInfo; + uint8_t abPadding[sizeof(PIPEINFO) + 127]; + } Buf; + APIRET orc = DosQueryNPipeInfo(pThis->hPipe, 1, &Buf, sizeof(Buf)); + if (orc == NO_ERROR) + return Buf.PipeInfo.cbOut; + AssertMsgFailed(("%d\n", orc)); +#endif + + return 1; +} + + +RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); + AssertPtr(pcbWritten); + AssertPtr(pvBuf); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + rc = rtPipeTryNonBlocking(pThis); + if (RT_SUCCESS(rc)) + { + if (cbToWrite > 0) + { + ULONG cbActual = 0; + APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual); + if (orc == NO_ERROR && cbActual == 0) + { + /* Retry with the request adjusted to the available buffer space. */ + ULONG cbAvail = rtPipeOs2GetSpace(pThis); + orc = DosWrite(pThis->hPipe, pvBuf, RT_MIN(cbAvail, cbToWrite), &cbActual); + } + + if (orc == NO_ERROR) + { + *pcbWritten = cbActual; + if (cbActual == 0) + rc = VINF_TRY_AGAIN; + } + else + { + rc = RTErrConvertFromOS2(orc); + if (rc == VERR_PIPE_NOT_CONNECTED) + rc = VERR_BROKEN_PIPE; + } + } + else + *pcbWritten = 0; + + if (rc == VERR_BROKEN_PIPE) + pThis->fBrokenPipe = true; + pThis->cUsers--; + } + else + rc = VERR_WRONG_ORDER; + RTCritSectLeave(&pThis->CritSect); + } + return rc; +} + + +RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); + AssertPtr(pvBuf); + AssertPtrNull(pcbWritten); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + rc = rtPipeTryBlocking(pThis); + if (RT_SUCCESS(rc)) + { + RTCritSectLeave(&pThis->CritSect); + + size_t cbTotalWritten = 0; + while (cbToWrite > 0) + { + ULONG cbActual = 0; + APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual); + if (orc != NO_ERROR) + { + rc = RTErrConvertFromOS2(orc); + if (rc == VERR_PIPE_NOT_CONNECTED) + rc = VERR_BROKEN_PIPE; + break; + } + pvBuf = (char const *)pvBuf + cbActual; + cbToWrite -= cbActual; + cbTotalWritten += cbActual; + } + + if (pcbWritten) + { + *pcbWritten = cbTotalWritten; + if ( RT_FAILURE(rc) + && cbTotalWritten) + rc = VINF_SUCCESS; + } + + RTCritSectEnter(&pThis->CritSect); + if (rc == VERR_BROKEN_PIPE) + pThis->fBrokenPipe = true; + pThis->cUsers--; + } + else + rc = VERR_WRONG_ORDER; + RTCritSectLeave(&pThis->CritSect); + } + return rc; +} + + +RTDECL(int) RTPipeFlush(RTPIPE hPipe) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED); + + APIRET orc = DosResetBuffer(pThis->hPipe); + if (orc != NO_ERROR) + { + int rc = RTErrConvertFromOS2(orc); + if (rc == VERR_BROKEN_PIPE) + { + RTCritSectEnter(&pThis->CritSect); + pThis->fBrokenPipe = true; + RTCritSectLeave(&pThis->CritSect); + } + return rc; + } + return VINF_SUCCESS; +} + + +RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + + uint64_t const StartMsTS = RTTimeMilliTS(); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_FAILURE(rc)) + return rc; + + rc = rtPipeOs2EnsureSem(pThis); + if (RT_SUCCESS(rc) && cMillies > 0) + { + /* Stop polling attempts if we might block. */ + if (pThis->hPollSet == NIL_RTPOLLSET) + pThis->hPollSet = (RTPOLLSET)(uintptr_t)0xbeef0042; + else + rc = VERR_WRONG_ORDER; + } + if (RT_SUCCESS(rc)) + { + for (unsigned iLoop = 0;; iLoop++) + { + /* + * Check the handle state. + */ + APIRET orc; + if (cMillies > 0) + { + ULONG ulIgnore; + orc = DosResetEventSem(pThis->hev, &ulIgnore); + AssertMsg(orc == NO_ERROR || orc == ERROR_ALREADY_RESET, ("%d\n", orc)); + } + + PIPESEMSTATE aStates[4]; RT_ZERO(aStates); + orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates)); + if (orc != NO_ERROR) + { + rc = RTErrConvertFromOS2(orc); + break; + } + int i = 0; + if (pThis->fRead) + while (aStates[i].fStatus == NPSS_WSPACE) + i++; + else + while (aStates[i].fStatus == NPSS_RDATA) + i++; + if (aStates[i].fStatus == NPSS_CLOSE) + break; + Assert(aStates[i].fStatus == NPSS_WSPACE || aStates[i].fStatus == NPSS_RDATA || aStates[i].fStatus == NPSS_EOI); + if ( aStates[i].fStatus != NPSS_EOI + && aStates[i].usAvail > 0) + break; + + /* + * Check for timeout. + */ + ULONG cMsMaxWait = SEM_INDEFINITE_WAIT; + if (cMillies != RT_INDEFINITE_WAIT) + { + uint64_t cElapsed = RTTimeMilliTS() - StartMsTS; + if (cElapsed >= cMillies) + { + rc = VERR_TIMEOUT; + break; + } + cMsMaxWait = cMillies - (uint32_t)cElapsed; + } + + /* + * Wait. + */ + RTCritSectLeave(&pThis->CritSect); + orc = DosWaitEventSem(pThis->hev, cMsMaxWait); + RTCritSectEnter(&pThis->CritSect); + if (orc != NO_ERROR && orc != ERROR_TIMEOUT && orc != ERROR_SEM_TIMEOUT ) + { + rc = RTErrConvertFromOS2(orc); + break; + } + } + + if (rc == VERR_BROKEN_PIPE) + pThis->fBrokenPipe = true; + if (cMillies > 0) + pThis->hPollSet = NIL_RTPOLLSET; + } + + RTCritSectLeave(&pThis->CritSect); + return rc; +} + + +RTDECL(int) RTPipeQueryReadable(RTPIPE hPipe, size_t *pcbReadable) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + AssertReturn(pThis->fRead, VERR_PIPE_NOT_READ); + AssertPtrReturn(pcbReadable, VERR_INVALID_POINTER); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_FAILURE(rc)) + return rc; + + ULONG cbActual = 0; + ULONG ulState = 0; + AVAILDATA Avail = { 0, 0 }; + APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState); + if (orc == NO_ERROR) + { + if (Avail.cbpipe > 0 || ulState == NP_STATE_CONNECTED) + *pcbReadable = Avail.cbpipe; + else + rc = VERR_PIPE_NOT_CONNECTED; /*??*/ + } + else + rc = RTErrConvertFromOS2(orc); + + RTCritSectLeave(&pThis->CritSect); + return rc; +} + + +RTDECL(int) RTPipeQueryInfo(RTPIPE hPipe, PRTFSOBJINFO pObjInfo, RTFSOBJATTRADD enmAddAttr) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, 0); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0); + + int rc = RTCritSectEnter(&pThis->CritSect); + AssertRCReturn(rc, 0); + + rtPipeFakeQueryInfo(pObjInfo, enmAddAttr, pThis->fRead); + + if (pThis->fRead) + { + ULONG cbActual = 0; + ULONG ulState = 0; + AVAILDATA Avail = { 0, 0 }; + APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState); + if (orc == NO_ERROR && (Avail.cbpipe > 0 || ulState == NP_STATE_CONNECTED)) + pObjInfo->cbObject = Avail.cbpipe; + } + else + pObjInfo->cbObject = rtPipeOs2GetSpace(pThis); + pObjInfo->cbAllocated = RTPIPE_OS2_SIZE; /** @todo this isn't necessarily true if we didn't create it... but, whatever */ + + RTCritSectLeave(&pThis->CritSect); + return VINF_SUCCESS; +} + + +int rtPipePollGetHandle(RTPIPE hPipe, uint32_t fEvents, PRTHCINTPTR phNative) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE); + + AssertReturn(!(fEvents & RTPOLL_EVT_READ) || pThis->fRead, VERR_INVALID_PARAMETER); + AssertReturn(!(fEvents & RTPOLL_EVT_WRITE) || !pThis->fRead, VERR_INVALID_PARAMETER); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + rc = rtPipeOs2EnsureSem(pThis); + if (RT_SUCCESS(rc)) + *phNative = (RTHCINTPTR)pThis->hev; + RTCritSectLeave(&pThis->CritSect); + } + return rc; +} + + +/** + * Checks for pending events. + * + * @returns Event mask or 0. + * @param pThis The pipe handle. + * @param fEvents The desired events. + * @param fResetEvtSem Whether to reset the event semaphore. + */ +static uint32_t rtPipePollCheck(RTPIPEINTERNAL *pThis, uint32_t fEvents, bool fResetEvtSem) +{ + /* + * Reset the event semaphore if we're gonna wait. + */ + APIRET orc; + ULONG ulIgnore; + if (fResetEvtSem) + { + orc = DosResetEventSem(pThis->hev, &ulIgnore); + AssertMsg(orc == NO_ERROR || orc == ERROR_ALREADY_RESET, ("%d\n", orc)); + } + + /* + * Check for events. + */ + uint32_t fRetEvents = 0; + if (pThis->fBrokenPipe) + fRetEvents |= RTPOLL_EVT_ERROR; + else if (pThis->fRead) + { + ULONG cbActual = 0; + ULONG ulState = 0; + AVAILDATA Avail = { 0, 0 }; + orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState); + if (orc != NO_ERROR) + { + fRetEvents |= RTPOLL_EVT_ERROR; + if (orc == ERROR_BROKEN_PIPE || orc == ERROR_PIPE_NOT_CONNECTED) + pThis->fBrokenPipe = true; + } + else if (Avail.cbpipe > 0) + fRetEvents |= RTPOLL_EVT_READ; + else if (ulState != NP_STATE_CONNECTED) + { + fRetEvents |= RTPOLL_EVT_ERROR; + pThis->fBrokenPipe = true; + } + } + else + { + PIPESEMSTATE aStates[4]; RT_ZERO(aStates); + orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates)); + if (orc == NO_ERROR) + { + int i = 0; + while (aStates[i].fStatus == NPSS_RDATA) + i++; + if (aStates[i].fStatus == NPSS_CLOSE) + { + fRetEvents |= RTPOLL_EVT_ERROR; + pThis->fBrokenPipe = true; + } + else if ( aStates[i].fStatus == NPSS_WSPACE + && aStates[i].usAvail > 0) + fRetEvents |= RTPOLL_EVT_WRITE; + } + else + { + fRetEvents |= RTPOLL_EVT_ERROR; + if (orc == ERROR_BROKEN_PIPE || orc == ERROR_PIPE_NOT_CONNECTED) + pThis->fBrokenPipe = true; + } + } + + return fRetEvents & (fEvents | RTPOLL_EVT_ERROR); +} + + +uint32_t rtPipePollStart(RTPIPE hPipe, RTPOLLSET hPollSet, uint32_t fEvents, bool fFinalEntry, bool fNoWait) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, UINT32_MAX); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, UINT32_MAX); + + int rc = RTCritSectEnter(&pThis->CritSect); + AssertRCReturn(rc, UINT32_MAX); + + /* Check that this is the only current use of this pipe. */ + uint32_t fRetEvents; + if ( pThis->cUsers == 0 + || pThis->hPollSet == NIL_RTPOLLSET) + { + fRetEvents = rtPipePollCheck(pThis, fEvents, fNoWait); + if (!fRetEvents && !fNoWait) + { + /* Mark the set busy while waiting. */ + pThis->cUsers++; + pThis->hPollSet = hPollSet; + } + } + else + { + AssertFailed(); + fRetEvents = UINT32_MAX; + } + + RTCritSectLeave(&pThis->CritSect); + return fRetEvents; +} + + +uint32_t rtPipePollDone(RTPIPE hPipe, uint32_t fEvents, bool fFinalEntry, bool fHarvestEvents) +{ + RTPIPEINTERNAL *pThis = hPipe; + AssertPtrReturn(pThis, 0); + AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0); + + int rc = RTCritSectEnter(&pThis->CritSect); + AssertRCReturn(rc, 0); + + Assert(pThis->cUsers > 0); + + /* harvest events. */ + uint32_t fRetEvents = rtPipePollCheck(pThis, fEvents, false); + + /* update counters. */ + pThis->cUsers--; + pThis->hPollSet = NIL_RTPOLLSET; + + RTCritSectLeave(&pThis->CritSect); + return fRetEvents; +} |