summaryrefslogtreecommitdiffstats
path: root/src/arrow/cpp/src/plasma/fling.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/cpp/src/plasma/fling.cc')
-rw-r--r--src/arrow/cpp/src/plasma/fling.cc129
1 files changed, 129 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/plasma/fling.cc b/src/arrow/cpp/src/plasma/fling.cc
new file mode 100644
index 000000000..f0960aab6
--- /dev/null
+++ b/src/arrow/cpp/src/plasma/fling.cc
@@ -0,0 +1,129 @@
+// Copyright 2013 Sharvil Nanavati
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "plasma/fling.h"
+
+#include <string.h>
+
+#include "arrow/util/logging.h"
+
+void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) {
+ iov->iov_base = buf;
+ iov->iov_len = 1;
+
+ msg->msg_iov = iov;
+ msg->msg_iovlen = 1;
+ msg->msg_control = buf;
+ msg->msg_controllen = static_cast<socklen_t>(buf_len);
+ msg->msg_name = NULL;
+ msg->msg_namelen = 0;
+}
+
+int send_fd(int conn, int fd) {
+ struct msghdr msg;
+ struct iovec iov;
+ char buf[CMSG_SPACE(sizeof(int))];
+ memset(&buf, 0, CMSG_SPACE(sizeof(int)));
+
+ init_msg(&msg, &iov, buf, sizeof(buf));
+
+ struct cmsghdr* header = CMSG_FIRSTHDR(&msg);
+ if (header == nullptr) {
+ return -1;
+ }
+ header->cmsg_level = SOL_SOCKET;
+ header->cmsg_type = SCM_RIGHTS;
+ header->cmsg_len = CMSG_LEN(sizeof(int));
+ memcpy(CMSG_DATA(header), reinterpret_cast<void*>(&fd), sizeof(int));
+
+ // Send file descriptor.
+ while (true) {
+ ssize_t r = sendmsg(conn, &msg, 0);
+ if (r < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ continue;
+ } else if (errno == EMSGSIZE) {
+ ARROW_LOG(WARNING) << "Failed to send file descriptor"
+ << " (errno = EMSGSIZE), retrying.";
+ // If we failed to send the file descriptor, loop until we have sent it
+ // successfully. TODO(rkn): This is problematic for two reasons. First
+ // of all, sending the file descriptor should just succeed without any
+ // errors, but sometimes I see a "Message too long" error number.
+ // Second, looping like this allows a client to potentially block the
+ // plasma store event loop which should never happen.
+ continue;
+ } else {
+ ARROW_LOG(INFO) << "Error in send_fd (errno = " << errno << ")";
+ return static_cast<int>(r);
+ }
+ } else if (r == 0) {
+ ARROW_LOG(INFO) << "Encountered unexpected EOF";
+ return 0;
+ } else {
+ ARROW_CHECK(r > 0);
+ return static_cast<int>(r);
+ }
+ }
+}
+
+int recv_fd(int conn) {
+ struct msghdr msg;
+ struct iovec iov;
+ char buf[CMSG_SPACE(sizeof(int))];
+ init_msg(&msg, &iov, buf, sizeof(buf));
+
+ while (true) {
+ ssize_t r = recvmsg(conn, &msg, 0);
+ if (r == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ continue;
+ } else {
+ ARROW_LOG(INFO) << "Error in recv_fd (errno = " << errno << ")";
+ return -1;
+ }
+ } else {
+ break;
+ }
+ }
+
+ int found_fd = -1;
+ int oh_noes = 0;
+ for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
+ header = CMSG_NXTHDR(&msg, header))
+ if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
+ ssize_t count = (header->cmsg_len -
+ (CMSG_DATA(header) - reinterpret_cast<unsigned char*>(header))) /
+ sizeof(int);
+ for (int i = 0; i < count; ++i) {
+ int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
+ if (found_fd == -1) {
+ found_fd = fd;
+ } else {
+ close(fd);
+ oh_noes = 1;
+ }
+ }
+ }
+
+ // The sender sent us more than one file descriptor. We've closed
+ // them all to prevent fd leaks but notify the caller that we got
+ // a bad message.
+ if (oh_noes) {
+ close(found_fd);
+ errno = EBADMSG;
+ return -1;
+ }
+
+ return found_fd;
+}