diff options
Diffstat (limited to '')
-rw-r--r-- | src/VBox/Runtime/r3/posix/localipc-posix.cpp | 1070 |
1 files changed, 1070 insertions, 0 deletions
diff --git a/src/VBox/Runtime/r3/posix/localipc-posix.cpp b/src/VBox/Runtime/r3/posix/localipc-posix.cpp new file mode 100644 index 00000000..1187b538 --- /dev/null +++ b/src/VBox/Runtime/r3/posix/localipc-posix.cpp @@ -0,0 +1,1070 @@ +/* $Id: localipc-posix.cpp $ */ +/** @file + * IPRT - Local IPC Server & Client, Posix. + */ + +/* + * Copyright (C) 2006-2020 Oracle Corporation + * + * This file is part of VirtualBox Open Source Edition (OSE), as + * available from http://www.virtualbox.org. This file is free software; + * you can redistribute it and/or modify it under the terms of the GNU + * General Public License (GPL) as published by the Free Software + * Foundation, in version 2 as it comes in the "COPYING" file of the + * VirtualBox OSE distribution. VirtualBox OSE is distributed in the + * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. + * + * The contents of this file may alternatively be used under the terms + * of the Common Development and Distribution License Version 1.0 + * (CDDL) only, as it comes in the "COPYING.CDDL" file of the + * VirtualBox OSE distribution, in which case the provisions of the + * CDDL are applicable instead of those of the GPL. + * + * You may elect to license modified versions of this file under the + * terms and conditions of either the GPL or the CDDL or both. + */ + + +/********************************************************************************************************************************* +* Header Files * +*********************************************************************************************************************************/ +#define LOG_GROUP RTLOGGROUP_LOCALIPC +#include "internal/iprt.h" +#include <iprt/localipc.h> + +#include <iprt/asm.h> +#include <iprt/assert.h> +#include <iprt/ctype.h> +#include <iprt/critsect.h> +#include <iprt/err.h> +#include <iprt/mem.h> +#include <iprt/log.h> +#include <iprt/poll.h> +#include <iprt/socket.h> +#include <iprt/string.h> +#include <iprt/time.h> + +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#ifndef RT_OS_OS2 +# include <sys/poll.h> +#endif +#include <errno.h> +#include <fcntl.h> +#include <signal.h> +#include <unistd.h> + +#include "internal/magics.h" +#include "internal/path.h" +#include "internal/socket.h" + + +/********************************************************************************************************************************* +* Structures and Typedefs * +*********************************************************************************************************************************/ +/** + * Local IPC service instance, POSIX. + */ +typedef struct RTLOCALIPCSERVERINT +{ + /** The magic (RTLOCALIPCSERVER_MAGIC). */ + uint32_t u32Magic; + /** The creation flags. */ + uint32_t fFlags; + /** Critical section protecting the structure. */ + RTCRITSECT CritSect; + /** The number of references to the instance. */ + uint32_t volatile cRefs; + /** Indicates that there is a pending cancel request. */ + bool volatile fCancelled; + /** The server socket. */ + RTSOCKET hSocket; + /** Thread currently listening for clients. */ + RTTHREAD hListenThread; + /** The name we bound the server to (native charset encoding). */ + struct sockaddr_un Name; +} RTLOCALIPCSERVERINT; +/** Pointer to a local IPC server instance (POSIX). */ +typedef RTLOCALIPCSERVERINT *PRTLOCALIPCSERVERINT; + + +/** + * Local IPC session instance, POSIX. + */ +typedef struct RTLOCALIPCSESSIONINT +{ + /** The magic (RTLOCALIPCSESSION_MAGIC). */ + uint32_t u32Magic; + /** Critical section protecting the structure. */ + RTCRITSECT CritSect; + /** The number of references to the instance. */ + uint32_t volatile cRefs; + /** Indicates that there is a pending cancel request. */ + bool volatile fCancelled; + /** Set if this is the server side, clear if the client. */ + bool fServerSide; + /** The client socket. */ + RTSOCKET hSocket; + /** Thread currently doing read related activites. */ + RTTHREAD hWriteThread; + /** Thread currently doing write related activies. */ + RTTHREAD hReadThread; +} RTLOCALIPCSESSIONINT; +/** Pointer to a local IPC session instance (Windows). */ +typedef RTLOCALIPCSESSIONINT *PRTLOCALIPCSESSIONINT; + + +/** Local IPC name prefix for portable names. */ +#define RTLOCALIPC_POSIX_NAME_PREFIX "/tmp/.iprt-localipc-" + + +/** + * Validates the user specified name. + * + * @returns IPRT status code. + * @param pszName The name to validate. + * @param fNative Whether it's a native name or a portable name. + */ +static int rtLocalIpcPosixValidateName(const char *pszName, bool fNative) +{ + AssertPtrReturn(pszName, VERR_INVALID_POINTER); + AssertReturn(*pszName, VERR_INVALID_NAME); + + if (!fNative) + { + for (;;) + { + char ch = *pszName++; + if (!ch) + break; + AssertReturn(!RT_C_IS_CNTRL(ch), VERR_INVALID_NAME); + AssertReturn((unsigned)ch < 0x80, VERR_INVALID_NAME); + AssertReturn(ch != '\\', VERR_INVALID_NAME); + AssertReturn(ch != '/', VERR_INVALID_NAME); + } + } + else + { + int rc = RTStrValidateEncoding(pszName); + AssertRCReturn(rc, rc); + } + + return VINF_SUCCESS; +} + + +/** + * Constructs a local (unix) domain socket name. + * + * @returns IPRT status code. + * @param pAddr The address structure to construct the name in. + * @param pcbAddr Where to return the address size. + * @param pszName The user specified name (valid). + * @param fNative Whether it's a native name or a portable name. + */ +static int rtLocalIpcPosixConstructName(struct sockaddr_un *pAddr, uint8_t *pcbAddr, const char *pszName, bool fNative) +{ + const char *pszNativeName; + int rc = rtPathToNative(&pszNativeName, pszName, NULL /*pszBasePath not support*/); + if (RT_SUCCESS(rc)) + { + size_t cchNativeName = strlen(pszNativeName); + size_t cbFull = !fNative ? cchNativeName + sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) : cchNativeName + 1; + if (cbFull <= sizeof(pAddr->sun_path)) + { + RT_ZERO(*pAddr); +#ifdef RT_OS_OS2 /* Size must be exactly right on OS/2. */ + *pcbAddr = sizeof(*pAddr); +#else + *pcbAddr = RT_UOFFSETOF(struct sockaddr_un, sun_path) + (uint8_t)cbFull; +#endif +#ifdef HAVE_SUN_LEN_MEMBER + pAddr->sun_len = *pcbAddr; +#endif + pAddr->sun_family = AF_LOCAL; + + if (!fNative) + { + memcpy(pAddr->sun_path, RTLOCALIPC_POSIX_NAME_PREFIX, sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1); + memcpy(&pAddr->sun_path[sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1], pszNativeName, cchNativeName + 1); + } + else + memcpy(pAddr->sun_path, pszNativeName, cchNativeName + 1); + } + else + rc = VERR_FILENAME_TOO_LONG; + rtPathFreeNative(pszNativeName, pszName); + } + return rc; +} + + + +RTDECL(int) RTLocalIpcServerCreate(PRTLOCALIPCSERVER phServer, const char *pszName, uint32_t fFlags) +{ + /* + * Parameter validation. + */ + AssertPtrReturn(phServer, VERR_INVALID_POINTER); + *phServer = NIL_RTLOCALIPCSERVER; + AssertReturn(!(fFlags & ~RTLOCALIPC_FLAGS_VALID_MASK), VERR_INVALID_FLAGS); + int rc = rtLocalIpcPosixValidateName(pszName, RT_BOOL(fFlags & RTLOCALIPC_FLAGS_NATIVE_NAME)); + if (RT_SUCCESS(rc)) + { + /* + * Allocate memory for the instance and initialize it. + */ + PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)RTMemAllocZ(sizeof(*pThis)); + if (pThis) + { + pThis->u32Magic = RTLOCALIPCSERVER_MAGIC; + pThis->fFlags = fFlags; + pThis->cRefs = 1; + pThis->fCancelled = false; + pThis->hListenThread = NIL_RTTHREAD; + rc = RTCritSectInit(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + /* + * Create the local (unix) socket and bind to it. + */ + rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/); + if (RT_SUCCESS(rc)) + { + RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/); + signal(SIGPIPE, SIG_IGN); /* Required on solaris, at least. */ + + uint8_t cbAddr; + rc = rtLocalIpcPosixConstructName(&pThis->Name, &cbAddr, pszName, + RT_BOOL(fFlags & RTLOCALIPC_FLAGS_NATIVE_NAME)); + if (RT_SUCCESS(rc)) + { + rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr); + if (rc == VERR_NET_ADDRESS_IN_USE) + { + unlink(pThis->Name.sun_path); + rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr); + } + if (RT_SUCCESS(rc)) + { + rc = rtSocketListen(pThis->hSocket, 16); + if (RT_SUCCESS(rc)) + { + LogFlow(("RTLocalIpcServerCreate: Created %p (%s)\n", pThis, pThis->Name.sun_path)); + *phServer = pThis; + return VINF_SUCCESS; + } + unlink(pThis->Name.sun_path); + } + } + RTSocketRelease(pThis->hSocket); + } + RTCritSectDelete(&pThis->CritSect); + } + RTMemFree(pThis); + } + else + rc = VERR_NO_MEMORY; + } + Log(("RTLocalIpcServerCreate: failed, rc=%Rrc\n", rc)); + return rc; +} + + +/** + * Retains a reference to the server instance. + * + * @returns + * @param pThis The server instance. + */ +DECLINLINE(void) rtLocalIpcServerRetain(PRTLOCALIPCSERVERINT pThis) +{ + uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs); + Assert(cRefs < UINT32_MAX / 2 && cRefs); RT_NOREF_PV(cRefs); +} + + +/** + * Server instance destructor. + * + * @returns VINF_OBJECT_DESTROYED + * @param pThis The server instance. + */ +static int rtLocalIpcServerDtor(PRTLOCALIPCSERVERINT pThis) +{ + pThis->u32Magic = ~RTLOCALIPCSERVER_MAGIC; + if (RTSocketRelease(pThis->hSocket) == 0) + Log(("rtLocalIpcServerDtor: Released socket\n")); + else + Log(("rtLocalIpcServerDtor: Socket still has references (impossible?)\n")); + RTCritSectDelete(&pThis->CritSect); + unlink(pThis->Name.sun_path); + RTMemFree(pThis); + return VINF_OBJECT_DESTROYED; +} + + +/** + * Releases a reference to the server instance. + * + * @returns VINF_SUCCESS if only release, VINF_OBJECT_DESTROYED if destroyed. + * @param pThis The server instance. + */ +DECLINLINE(int) rtLocalIpcServerRelease(PRTLOCALIPCSERVERINT pThis) +{ + uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs); + Assert(cRefs < UINT32_MAX / 2); + if (!cRefs) + return rtLocalIpcServerDtor(pThis); + return VINF_SUCCESS; +} + + +/** + * The core of RTLocalIpcServerCancel, used by both the destroy and cancel APIs. + * + * @returns IPRT status code + * @param pThis The server instance. + */ +static int rtLocalIpcServerCancel(PRTLOCALIPCSERVERINT pThis) +{ + RTCritSectEnter(&pThis->CritSect); + pThis->fCancelled = true; + Log(("rtLocalIpcServerCancel:\n")); + if (pThis->hListenThread != NIL_RTTHREAD) + RTThreadPoke(pThis->hListenThread); + RTCritSectLeave(&pThis->CritSect); + return VINF_SUCCESS; +} + + + +RTDECL(int) RTLocalIpcServerDestroy(RTLOCALIPCSERVER hServer) +{ + /* + * Validate input. + */ + if (hServer == NIL_RTLOCALIPCSERVER) + return VINF_SUCCESS; + PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE); + + /* + * Invalidate the server, releasing the caller's reference to the instance + * data and making sure any other thread in the listen API will wake up. + */ + AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSERVER_MAGIC, RTLOCALIPCSERVER_MAGIC), VERR_WRONG_ORDER); + + rtLocalIpcServerCancel(pThis); + return rtLocalIpcServerRelease(pThis); +} + + +RTDECL(int) RTLocalIpcServerCancel(RTLOCALIPCSERVER hServer) +{ + /* + * Validate input. + */ + PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE); + + /* + * Do the job. + */ + rtLocalIpcServerRetain(pThis); + rtLocalIpcServerCancel(pThis); + rtLocalIpcServerRelease(pThis); + return VINF_SUCCESS; +} + + +RTDECL(int) RTLocalIpcServerListen(RTLOCALIPCSERVER hServer, PRTLOCALIPCSESSION phClientSession) +{ + /* + * Validate input. + */ + PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE); + + /* + * Begin listening. + */ + rtLocalIpcServerRetain(pThis); + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + if (pThis->hListenThread == NIL_RTTHREAD) + { + pThis->hListenThread = RTThreadSelf(); + + /* + * The listening retry loop. + */ + for (;;) + { + if (!pThis->fCancelled) + { + rc = RTCritSectLeave(&pThis->CritSect); + AssertRCBreak(rc); + + struct sockaddr_un Addr; + size_t cbAddr = sizeof(Addr); + RTSOCKET hClient; + Log(("RTLocalIpcServerListen: Calling rtSocketAccept...\n")); + rc = rtSocketAccept(pThis->hSocket, &hClient, (struct sockaddr *)&Addr, &cbAddr); + Log(("RTLocalIpcServerListen: rtSocketAccept returns %Rrc.\n", rc)); + + int rc2 = RTCritSectEnter(&pThis->CritSect); + AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); + + if (RT_SUCCESS(rc)) + { + /* + * Create a client session. + */ + PRTLOCALIPCSESSIONINT pSession = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pSession)); + if (pSession) + { + pSession->u32Magic = RTLOCALIPCSESSION_MAGIC; + pSession->cRefs = 1; + pSession->fCancelled = false; + pSession->fServerSide = true; + pSession->hSocket = hClient; + pSession->hReadThread = NIL_RTTHREAD; + pSession->hWriteThread = NIL_RTTHREAD; + rc = RTCritSectInit(&pSession->CritSect); + if (RT_SUCCESS(rc)) + { + Log(("RTLocalIpcServerListen: Returning new client session: %p\n", pSession)); + *phClientSession = pSession; + break; + } + + RTMemFree(pSession); + } + else + rc = VERR_NO_MEMORY; + } + else if ( rc != VERR_INTERRUPTED + && rc != VERR_TRY_AGAIN) + break; + } + else + { + rc = VERR_CANCELLED; + break; + } + } + + pThis->hListenThread = NIL_RTTHREAD; + } + else + { + AssertFailed(); + rc = VERR_RESOURCE_BUSY; + } + int rc2 = RTCritSectLeave(&pThis->CritSect); + AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); + } + rtLocalIpcServerRelease(pThis); + + Log(("RTLocalIpcServerListen: returns %Rrc\n", rc)); + return rc; +} + + +RTDECL(int) RTLocalIpcSessionConnect(PRTLOCALIPCSESSION phSession, const char *pszName, uint32_t fFlags) +{ + /* + * Parameter validation. + */ + AssertPtrReturn(phSession, VERR_INVALID_POINTER); + *phSession = NIL_RTLOCALIPCSESSION; + + AssertReturn(!(fFlags & ~RTLOCALIPC_C_FLAGS_VALID_MASK), VERR_INVALID_FLAGS); + + int rc = rtLocalIpcPosixValidateName(pszName, RT_BOOL(fFlags & RTLOCALIPC_C_FLAGS_NATIVE_NAME)); + if (RT_SUCCESS(rc)) + { + /* + * Allocate memory for the instance and initialize it. + */ + PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pThis)); + if (pThis) + { + pThis->u32Magic = RTLOCALIPCSESSION_MAGIC; + pThis->cRefs = 1; + pThis->fCancelled = false; + pThis->fServerSide = false; + pThis->hSocket = NIL_RTSOCKET; + pThis->hReadThread = NIL_RTTHREAD; + pThis->hWriteThread = NIL_RTTHREAD; + rc = RTCritSectInit(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + /* + * Create the local (unix) socket and try connect to the server. + */ + rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/); + if (RT_SUCCESS(rc)) + { + RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/); + signal(SIGPIPE, SIG_IGN); /* Required on solaris, at least. */ + + struct sockaddr_un Addr; + uint8_t cbAddr; + rc = rtLocalIpcPosixConstructName(&Addr, &cbAddr, pszName, RT_BOOL(fFlags & RTLOCALIPC_C_FLAGS_NATIVE_NAME)); + if (RT_SUCCESS(rc)) + { + rc = rtSocketConnectRaw(pThis->hSocket, &Addr, cbAddr); + if (RT_SUCCESS(rc)) + { + *phSession = pThis; + Log(("RTLocalIpcSessionConnect: Returns new session %p\n", pThis)); + return VINF_SUCCESS; + } + } + RTSocketRelease(pThis->hSocket); + } + RTCritSectDelete(&pThis->CritSect); + } + RTMemFree(pThis); + } + else + rc = VERR_NO_MEMORY; + } + Log(("RTLocalIpcSessionConnect: returns %Rrc\n", rc)); + return rc; +} + + +/** + * Retains a reference to the session instance. + * + * @param pThis The server instance. + */ +DECLINLINE(void) rtLocalIpcSessionRetain(PRTLOCALIPCSESSIONINT pThis) +{ + uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs); + Assert(cRefs < UINT32_MAX / 2 && cRefs); RT_NOREF_PV(cRefs); +} + + +RTDECL(uint32_t) RTLocalIpcSessionRetain(RTLOCALIPCSESSION hSession) +{ + PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)hSession; + AssertPtrReturn(pThis, UINT32_MAX); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, UINT32_MAX); + + uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs); + Assert(cRefs < UINT32_MAX / 2 && cRefs); + return cRefs; +} + + +/** + * Session instance destructor. + * + * @returns VINF_OBJECT_DESTROYED + * @param pThis The server instance. + */ +static int rtLocalIpcSessionDtor(PRTLOCALIPCSESSIONINT pThis) +{ + pThis->u32Magic = ~RTLOCALIPCSESSION_MAGIC; + if (RTSocketRelease(pThis->hSocket) == 0) + Log(("rtLocalIpcSessionDtor: Released socket\n")); + else + Log(("rtLocalIpcSessionDtor: Socket still has references (impossible?)\n")); + RTCritSectDelete(&pThis->CritSect); + RTMemFree(pThis); + return VINF_OBJECT_DESTROYED; +} + + +/** + * Releases a reference to the session instance. + * + * @returns VINF_SUCCESS or VINF_OBJECT_DESTROYED as appropriate. + * @param pThis The session instance. + */ +DECLINLINE(int) rtLocalIpcSessionRelease(PRTLOCALIPCSESSIONINT pThis) +{ + uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs); + Assert(cRefs < UINT32_MAX / 2); + if (!cRefs) + return rtLocalIpcSessionDtor(pThis); + Log(("rtLocalIpcSessionRelease: %u refs left\n", cRefs)); + return VINF_SUCCESS; +} + + +RTDECL(uint32_t) RTLocalIpcSessionRelease(RTLOCALIPCSESSION hSession) +{ + if (hSession == NIL_RTLOCALIPCSESSION) + return 0; + + PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)hSession; + AssertPtrReturn(pThis, UINT32_MAX); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, UINT32_MAX); + + uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs); + Assert(cRefs < UINT32_MAX / 2); + if (cRefs) + Log(("RTLocalIpcSessionRelease: %u refs left\n", cRefs)); + else + rtLocalIpcSessionDtor(pThis); + return cRefs; +} + + +/** + * The core of RTLocalIpcSessionCancel, used by both the destroy and cancel APIs. + * + * @returns IPRT status code + * @param pThis The session instance. + */ +static int rtLocalIpcSessionCancel(PRTLOCALIPCSESSIONINT pThis) +{ + RTCritSectEnter(&pThis->CritSect); + pThis->fCancelled = true; + Log(("rtLocalIpcSessionCancel:\n")); + if (pThis->hReadThread != NIL_RTTHREAD) + RTThreadPoke(pThis->hReadThread); + if (pThis->hWriteThread != NIL_RTTHREAD) + RTThreadPoke(pThis->hWriteThread); + RTCritSectLeave(&pThis->CritSect); + return VINF_SUCCESS; +} + + +RTDECL(int) RTLocalIpcSessionClose(RTLOCALIPCSESSION hSession) +{ + /* + * Validate input. + */ + if (hSession == NIL_RTLOCALIPCSESSION) + return VINF_SUCCESS; + PRTLOCALIPCSESSIONINT pThis = hSession; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); + + /* + * Invalidate the session, releasing the caller's reference to the instance + * data and making sure any other thread in the listen API will wake up. + */ + Log(("RTLocalIpcSessionClose:\n")); + + rtLocalIpcSessionCancel(pThis); + return rtLocalIpcSessionRelease(pThis); +} + + +RTDECL(int) RTLocalIpcSessionCancel(RTLOCALIPCSESSION hSession) +{ + /* + * Validate input. + */ + PRTLOCALIPCSESSIONINT pThis = hSession; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); + + /* + * Do the job. + */ + rtLocalIpcSessionRetain(pThis); + rtLocalIpcSessionCancel(pThis); + rtLocalIpcSessionRelease(pThis); + return VINF_SUCCESS; +} + + +/** + * Checks if the socket has has a HUP condition after reading zero bytes. + * + * @returns true if HUP, false if no. + * @param pThis The IPC session handle. + */ +static bool rtLocalIpcPosixHasHup(PRTLOCALIPCSESSIONINT pThis) +{ + int fdNative = RTSocketToNative(pThis->hSocket); + +#if !defined(RT_OS_OS2) && !defined(RT_OS_SOLARIS) + struct pollfd PollFd; + RT_ZERO(PollFd); + PollFd.fd = fdNative; + PollFd.events = POLLHUP | POLLERR; + if (poll(&PollFd, 1, 0) <= 0) + return false; + if (!(PollFd.revents & (POLLHUP | POLLERR))) + return false; +#else /* RT_OS_OS2 || RT_OS_SOLARIS */ + /* + * OS/2: No native poll, do zero byte send to check for EPIPE. + * Solaris: We don't get POLLHUP. + */ + uint8_t bDummy; + ssize_t rcSend = send(fdNative, &bDummy, 0, 0); + if (rcSend >= 0 || (errno != EPIPE && errno != ECONNRESET)) + return false; +#endif /* RT_OS_OS2 || RT_OS_SOLARIS */ + + /* + * We've established EPIPE. Now make sure there aren't any last bytes to + * read that came in between the recv made by the caller and the disconnect. + */ + uint8_t bPeek; + ssize_t rcRecv = recv(fdNative, &bPeek, 1, MSG_DONTWAIT | MSG_PEEK); + return rcRecv <= 0; +} + + +RTDECL(int) RTLocalIpcSessionRead(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead) +{ + /* + * Validate input. + */ + PRTLOCALIPCSESSIONINT pThis = hSession; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); + + /* + * Do the job. + */ + rtLocalIpcSessionRetain(pThis); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + if (pThis->hReadThread == NIL_RTTHREAD) + { + pThis->hReadThread = RTThreadSelf(); + + for (;;) + { + if (!pThis->fCancelled) + { + rc = RTCritSectLeave(&pThis->CritSect); + AssertRCBreak(rc); + + rc = RTSocketRead(pThis->hSocket, pvBuf, cbToRead, pcbRead); + + /* Detect broken pipe. */ + if (rc == VINF_SUCCESS) + { + if (!pcbRead || *pcbRead) + { /* likely */ } + else if (rtLocalIpcPosixHasHup(pThis)) + rc = VERR_BROKEN_PIPE; + } + else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN) + rc = VERR_BROKEN_PIPE; + + int rc2 = RTCritSectEnter(&pThis->CritSect); + AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); + + if ( rc == VERR_INTERRUPTED + || rc == VERR_TRY_AGAIN) + continue; + } + else + rc = VERR_CANCELLED; + break; + } + + pThis->hReadThread = NIL_RTTHREAD; + } + int rc2 = RTCritSectLeave(&pThis->CritSect); + AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); + } + + rtLocalIpcSessionRelease(pThis); + return rc; +} + + +RTDECL(int) RTLocalIpcSessionReadNB(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead) +{ + /* + * Validate input. + */ + PRTLOCALIPCSESSIONINT pThis = hSession; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); + + /* + * Do the job. + */ + rtLocalIpcSessionRetain(pThis); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + if (pThis->hReadThread == NIL_RTTHREAD) + { + pThis->hReadThread = RTThreadSelf(); /* not really required, but whatever. */ + + for (;;) + { + if (!pThis->fCancelled) + { + rc = RTSocketReadNB(pThis->hSocket, pvBuf, cbToRead, pcbRead); + + /* Detect broken pipe. */ + if (rc == VINF_SUCCESS) + { + if (!pcbRead || *pcbRead) + { /* likely */ } + else if (rtLocalIpcPosixHasHup(pThis)) + rc = VERR_BROKEN_PIPE; + } + else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN) + rc = VERR_BROKEN_PIPE; + + if (rc == VERR_INTERRUPTED) + continue; + } + else + rc = VERR_CANCELLED; + break; + } + + pThis->hReadThread = NIL_RTTHREAD; + } + int rc2 = RTCritSectLeave(&pThis->CritSect); + AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); + } + + rtLocalIpcSessionRelease(pThis); + return rc; +} + + +RTDECL(int) RTLocalIpcSessionWrite(RTLOCALIPCSESSION hSession, const void *pvBuf, size_t cbToWrite) +{ + /* + * Validate input. + */ + PRTLOCALIPCSESSIONINT pThis = hSession; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); + + /* + * Do the job. + */ + rtLocalIpcSessionRetain(pThis); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + if (pThis->hWriteThread == NIL_RTTHREAD) + { + pThis->hWriteThread = RTThreadSelf(); + + for (;;) + { + if (!pThis->fCancelled) + { + rc = RTCritSectLeave(&pThis->CritSect); + AssertRCBreak(rc); + + rc = RTSocketWrite(pThis->hSocket, pvBuf, cbToWrite); + + int rc2 = RTCritSectEnter(&pThis->CritSect); + AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); + + if ( rc == VERR_INTERRUPTED + || rc == VERR_TRY_AGAIN) + continue; + } + else + rc = VERR_CANCELLED; + break; + } + + pThis->hWriteThread = NIL_RTTHREAD; + } + int rc2 = RTCritSectLeave(&pThis->CritSect); + AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); + } + + rtLocalIpcSessionRelease(pThis); + return rc; +} + + +RTDECL(int) RTLocalIpcSessionFlush(RTLOCALIPCSESSION hSession) +{ + /* + * Validate input. + */ + PRTLOCALIPCSESSIONINT pThis = hSession; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); + + /* + * This is a no-op because apparently write doesn't return until the + * result is read. At least that's what the reply to a 2003-04-08 LKML + * posting title "fsync() on unix domain sockets?" indicates. + * + * For conformity, make sure there isn't any active writes concurrent to this call. + */ + rtLocalIpcSessionRetain(pThis); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + if (pThis->hWriteThread == NIL_RTTHREAD) + rc = RTCritSectLeave(&pThis->CritSect); + else + { + rc = RTCritSectLeave(&pThis->CritSect); + if (RT_SUCCESS(rc)) + rc = VERR_RESOURCE_BUSY; + } + } + + rtLocalIpcSessionRelease(pThis); + return rc; +} + + +RTDECL(int) RTLocalIpcSessionWaitForData(RTLOCALIPCSESSION hSession, uint32_t cMillies) +{ + /* + * Validate input. + */ + PRTLOCALIPCSESSIONINT pThis = hSession; + AssertPtrReturn(pThis, VERR_INVALID_HANDLE); + AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE); + + /* + * Do the job. + */ + rtLocalIpcSessionRetain(pThis); + + int rc = RTCritSectEnter(&pThis->CritSect); + if (RT_SUCCESS(rc)) + { + if (pThis->hReadThread == NIL_RTTHREAD) + { + pThis->hReadThread = RTThreadSelf(); + uint64_t const msStart = RTTimeMilliTS(); + RTMSINTERVAL const cMsOriginalTimeout = cMillies; + + for (;;) + { + if (!pThis->fCancelled) + { + rc = RTCritSectLeave(&pThis->CritSect); + AssertRCBreak(rc); + + uint32_t fEvents = 0; +#ifdef RT_OS_OS2 + /* This doesn't give us any error condition on hangup, so use HUP check. */ + Log(("RTLocalIpcSessionWaitForData: Calling RTSocketSelectOneEx...\n")); + rc = RTSocketSelectOneEx(pThis->hSocket, RTPOLL_EVT_READ | RTPOLL_EVT_ERROR, &fEvents, cMillies); + Log(("RTLocalIpcSessionWaitForData: RTSocketSelectOneEx returns %Rrc, fEvents=%#x\n", rc, fEvents)); + if (RT_SUCCESS(rc) && fEvents == RTPOLL_EVT_READ && rtLocalIpcPosixHasHup(pThis)) + rc = VERR_BROKEN_PIPE; +#else +/** @todo RTSocketPoll? */ + /* POLLHUP will be set on hangup. */ + struct pollfd PollFd; + RT_ZERO(PollFd); + PollFd.fd = RTSocketToNative(pThis->hSocket); + PollFd.events = POLLHUP | POLLERR | POLLIN; + Log(("RTLocalIpcSessionWaitForData: Calling poll...\n")); + int cFds = poll(&PollFd, 1, cMillies == RT_INDEFINITE_WAIT ? -1 : cMillies); + if (cFds >= 1) + { + /* Linux & Darwin sets both POLLIN and POLLHUP when the pipe is + broken and but no more data to read. Google hints at NetBSD + returning more sane values (POLLIN till no more data, then + POLLHUP). Solairs OTOH, doesn't ever seem to return POLLHUP. */ + fEvents = RTPOLL_EVT_READ; + if ( (PollFd.revents & (POLLHUP | POLLERR)) + && !(PollFd.revents & POLLIN)) + fEvents = RTPOLL_EVT_ERROR; +# if defined(RT_OS_SOLARIS) + else if (PollFd.revents & POLLIN) +# else + else if ((PollFd.revents & (POLLIN | POLLHUP)) == (POLLIN | POLLHUP)) +# endif + { + /* Check if there is actually data available. */ + uint8_t bPeek; + ssize_t rcRecv = recv(PollFd.fd, &bPeek, 1, MSG_DONTWAIT | MSG_PEEK); + if (rcRecv <= 0) + fEvents = RTPOLL_EVT_ERROR; + } + rc = VINF_SUCCESS; + } + else if (rc == 0) + rc = VERR_TIMEOUT; + else + rc = RTErrConvertFromErrno(errno); + Log(("RTLocalIpcSessionWaitForData: poll returns %u (rc=%d), revents=%#x\n", cFds, rc, PollFd.revents)); +#endif + + int rc2 = RTCritSectEnter(&pThis->CritSect); + AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc); + + if (RT_SUCCESS(rc)) + { + if (pThis->fCancelled) + rc = VERR_CANCELLED; + else if (fEvents & RTPOLL_EVT_ERROR) + rc = VERR_BROKEN_PIPE; + } + else if ( rc == VERR_INTERRUPTED + || rc == VERR_TRY_AGAIN) + { + /* Recalc cMillies. */ + if (cMsOriginalTimeout != RT_INDEFINITE_WAIT) + { + uint64_t cMsElapsed = RTTimeMilliTS() - msStart; + cMillies = cMsElapsed >= cMsOriginalTimeout ? 0 : cMsOriginalTimeout - (RTMSINTERVAL)cMsElapsed; + } + continue; + } + } + else + rc = VERR_CANCELLED; + break; + } + + pThis->hReadThread = NIL_RTTHREAD; + } + int rc2 = RTCritSectLeave(&pThis->CritSect); + AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc); + } + + rtLocalIpcSessionRelease(pThis); + return rc; +} + + +RTDECL(int) RTLocalIpcSessionQueryProcess(RTLOCALIPCSESSION hSession, PRTPROCESS pProcess) +{ + RT_NOREF_PV(hSession); RT_NOREF_PV(pProcess); + return VERR_NOT_SUPPORTED; +} + + +RTDECL(int) RTLocalIpcSessionQueryUserId(RTLOCALIPCSESSION hSession, PRTUID pUid) +{ + RT_NOREF_PV(hSession); RT_NOREF_PV(pUid); + return VERR_NOT_SUPPORTED; +} + + +RTDECL(int) RTLocalIpcSessionQueryGroupId(RTLOCALIPCSESSION hSession, PRTGID pGid) +{ + RT_NOREF_PV(hSession); RT_NOREF_PV(pGid); + return VERR_NOT_SUPPORTED; +} + |