diff options
Diffstat (limited to '')
-rw-r--r-- | src/seastar/include/seastar/core/posix.hh | 492 |
1 files changed, 492 insertions, 0 deletions
diff --git a/src/seastar/include/seastar/core/posix.hh b/src/seastar/include/seastar/core/posix.hh new file mode 100644 index 000000000..f8dece37c --- /dev/null +++ b/src/seastar/include/seastar/core/posix.hh @@ -0,0 +1,492 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2016 ScyllaDB + */ + +#pragma once + +#include <seastar/core/sstring.hh> +#include "abort_on_ebadf.hh" +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <assert.h> +#include <utility> +#include <fcntl.h> +#include <sys/ioctl.h> +#include <sys/eventfd.h> +#include <sys/timerfd.h> +#include <sys/socket.h> +#include <sys/epoll.h> +#include <sys/mman.h> +#include <signal.h> +#include <system_error> +#include <pthread.h> +#include <signal.h> +#include <memory> +#include <chrono> +#include <sys/uio.h> + +#include <seastar/net/socket_defs.hh> +#include <seastar/util/std-compat.hh> + +namespace seastar { + +/// \file +/// \defgroup posix-support POSIX Support +/// +/// Mostly-internal APIs to provide C++ glue for the underlying POSIX platform; +/// but can be used by the application when they don't block. +/// +/// \addtogroup posix-support +/// @{ + +inline void throw_system_error_on(bool condition, const char* what_arg = ""); + +template <typename T> +inline void throw_kernel_error(T r); + +struct mmap_deleter { + size_t _size; + void operator()(void* ptr) const; +}; + +using mmap_area = std::unique_ptr<char[], mmap_deleter>; + +mmap_area mmap_anonymous(void* addr, size_t length, int prot, int flags); + +class file_desc { + int _fd; +public: + file_desc() = delete; + file_desc(const file_desc&) = delete; + file_desc(file_desc&& x) noexcept : _fd(x._fd) { x._fd = -1; } + ~file_desc() { if (_fd != -1) { ::close(_fd); } } + void operator=(const file_desc&) = delete; + file_desc& operator=(file_desc&& x) { + if (this != &x) { + std::swap(_fd, x._fd); + if (x._fd != -1) { + x.close(); + } + } + return *this; + } + void close() { + assert(_fd != -1); + auto r = ::close(_fd); + throw_system_error_on(r == -1, "close"); + _fd = -1; + } + int get() const { return _fd; } + + static file_desc from_fd(int fd) { + return file_desc(fd); + } + + static file_desc open(sstring name, int flags, mode_t mode = 0) { + int fd = ::open(name.c_str(), flags, mode); + throw_system_error_on(fd == -1, "open"); + return file_desc(fd); + } + static file_desc socket(int family, int type, int protocol = 0) { + int fd = ::socket(family, type, protocol); + throw_system_error_on(fd == -1, "socket"); + return file_desc(fd); + } + static file_desc eventfd(unsigned initval, int flags) { + int fd = ::eventfd(initval, flags); + throw_system_error_on(fd == -1, "eventfd"); + return file_desc(fd); + } + static file_desc epoll_create(int flags = 0) { + int fd = ::epoll_create1(flags); + throw_system_error_on(fd == -1, "epoll_create1"); + return file_desc(fd); + } + static file_desc timerfd_create(int clockid, int flags) { + int fd = ::timerfd_create(clockid, flags); + throw_system_error_on(fd == -1, "timerfd_create"); + return file_desc(fd); + } + static file_desc temporary(sstring directory); + file_desc dup() const { + int fd = ::dup(get()); + throw_system_error_on(fd == -1, "dup"); + return file_desc(fd); + } + file_desc accept(socket_address& sa, int flags = 0) { + auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags); + throw_system_error_on(ret == -1, "accept4"); + return file_desc(ret); + } + static file_desc inotify_init(int flags); + // return nullopt if no connection is availbale to be accepted + std::optional<file_desc> try_accept(socket_address& sa, int flags = 0) { + auto ret = ::accept4(_fd, &sa.as_posix_sockaddr(), &sa.addr_length, flags); + if (ret == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(ret == -1, "accept4"); + return file_desc(ret); + } + void shutdown(int how) { + auto ret = ::shutdown(_fd, how); + if (ret == -1 && errno != ENOTCONN) { + throw_system_error_on(ret == -1, "shutdown"); + } + } + void truncate(size_t size) { + auto ret = ::ftruncate(_fd, size); + throw_system_error_on(ret, "ftruncate"); + } + int ioctl(int request) { + return ioctl(request, 0); + } + int ioctl(int request, int value) { + int r = ::ioctl(_fd, request, value); + throw_system_error_on(r == -1, "ioctl"); + return r; + } + int ioctl(int request, unsigned int value) { + int r = ::ioctl(_fd, request, value); + throw_system_error_on(r == -1, "ioctl"); + return r; + } + template <class X> + int ioctl(int request, X& data) { + int r = ::ioctl(_fd, request, &data); + throw_system_error_on(r == -1, "ioctl"); + return r; + } + template <class X> + int ioctl(int request, X&& data) { + int r = ::ioctl(_fd, request, &data); + throw_system_error_on(r == -1, "ioctl"); + return r; + } + template <class X> + int setsockopt(int level, int optname, X&& data) { + int r = ::setsockopt(_fd, level, optname, &data, sizeof(data)); + throw_system_error_on(r == -1, "setsockopt"); + return r; + } + int setsockopt(int level, int optname, const char* data) { + int r = ::setsockopt(_fd, level, optname, data, strlen(data) + 1); + throw_system_error_on(r == -1, "setsockopt"); + return r; + } + int setsockopt(int level, int optname, const void* data, socklen_t len) { + int r = ::setsockopt(_fd, level, optname, data, len); + throw_system_error_on(r == -1, "setsockopt"); + return r; + } + template <typename Data> + Data getsockopt(int level, int optname) { + Data data; + socklen_t len = sizeof(data); + memset(&data, 0, len); + int r = ::getsockopt(_fd, level, optname, &data, &len); + throw_system_error_on(r == -1, "getsockopt"); + return data; + } + int getsockopt(int level, int optname, char* data, socklen_t len) { + int r = ::getsockopt(_fd, level, optname, data, &len); + throw_system_error_on(r == -1, "getsockopt"); + return r; + } + size_t size() { + struct stat buf; + auto r = ::fstat(_fd, &buf); + throw_system_error_on(r == -1, "fstat"); + return buf.st_size; + } + std::optional<size_t> read(void* buffer, size_t len) { + auto r = ::read(_fd, buffer, len); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "read"); + return { size_t(r) }; + } + std::optional<ssize_t> recv(void* buffer, size_t len, int flags) { + auto r = ::recv(_fd, buffer, len, flags); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "recv"); + return { ssize_t(r) }; + } + std::optional<size_t> recvmsg(msghdr* mh, int flags) { + auto r = ::recvmsg(_fd, mh, flags); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "recvmsg"); + return { size_t(r) }; + } + std::optional<size_t> send(const void* buffer, size_t len, int flags) { + auto r = ::send(_fd, buffer, len, flags); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "send"); + return { size_t(r) }; + } + std::optional<size_t> sendto(socket_address& addr, const void* buf, size_t len, int flags) { + auto r = ::sendto(_fd, buf, len, flags, &addr.u.sa, addr.length()); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "sendto"); + return { size_t(r) }; + } + std::optional<size_t> sendmsg(const msghdr* msg, int flags) { + auto r = ::sendmsg(_fd, msg, flags); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "sendmsg"); + return { size_t(r) }; + } + void bind(sockaddr& sa, socklen_t sl) { + auto r = ::bind(_fd, &sa, sl); + throw_system_error_on(r == -1, "bind"); + } + void connect(sockaddr& sa, socklen_t sl) { + auto r = ::connect(_fd, &sa, sl); + if (r == -1 && errno == EINPROGRESS) { + return; + } + throw_system_error_on(r == -1, "connect"); + } + socket_address get_address() { + socket_address addr; + auto r = ::getsockname(_fd, &addr.u.sa, &addr.addr_length); + throw_system_error_on(r == -1, "getsockname"); + return addr; + } + void listen(int backlog) { + auto fd = ::listen(_fd, backlog); + throw_system_error_on(fd == -1, "listen"); + } + std::optional<size_t> write(const void* buf, size_t len) { + auto r = ::write(_fd, buf, len); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "write"); + return { size_t(r) }; + } + std::optional<size_t> writev(const iovec *iov, int iovcnt) { + auto r = ::writev(_fd, iov, iovcnt); + if (r == -1 && errno == EAGAIN) { + return {}; + } + throw_system_error_on(r == -1, "writev"); + return { size_t(r) }; + } + size_t pread(void* buf, size_t len, off_t off) { + auto r = ::pread(_fd, buf, len, off); + throw_system_error_on(r == -1, "pread"); + return size_t(r); + } + void timerfd_settime(int flags, const itimerspec& its) { + auto r = ::timerfd_settime(_fd, flags, &its, NULL); + throw_system_error_on(r == -1, "timerfd_settime"); + } + + mmap_area map(size_t size, unsigned prot, unsigned flags, size_t offset, + void* addr = nullptr) { + void *x = mmap(addr, size, prot, flags, _fd, offset); + throw_system_error_on(x == MAP_FAILED, "mmap"); + return mmap_area(static_cast<char*>(x), mmap_deleter{size}); + } + + mmap_area map_shared_rw(size_t size, size_t offset) { + return map(size, PROT_READ | PROT_WRITE, MAP_SHARED, offset); + } + + mmap_area map_shared_ro(size_t size, size_t offset) { + return map(size, PROT_READ, MAP_SHARED, offset); + } + + mmap_area map_private_rw(size_t size, size_t offset) { + return map(size, PROT_READ | PROT_WRITE, MAP_PRIVATE, offset); + } + + mmap_area map_private_ro(size_t size, size_t offset) { + return map(size, PROT_READ, MAP_PRIVATE, offset); + } + +private: + file_desc(int fd) : _fd(fd) {} + }; + + +namespace posix { + +/// Converts a duration value to a `timespec` +/// +/// \param d a duration value to convert to the POSIX `timespec` format +/// \return `d` as a `timespec` value +template <typename Rep, typename Period> +struct timespec +to_timespec(std::chrono::duration<Rep, Period> d) { + auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(d).count(); + struct timespec ts {}; + ts.tv_sec = ns / 1000000000; + ts.tv_nsec = ns % 1000000000; + return ts; +} + +/// Converts a relative start time and an interval to an `itimerspec` +/// +/// \param base First expiration of the timer, relative to the current time +/// \param interval period for re-arming the timer +/// \return `base` and `interval` converted to an `itimerspec` +template <typename Rep1, typename Period1, typename Rep2, typename Period2> +struct itimerspec +to_relative_itimerspec(std::chrono::duration<Rep1, Period1> base, std::chrono::duration<Rep2, Period2> interval) { + struct itimerspec its {}; + its.it_interval = to_timespec(interval); + its.it_value = to_timespec(base); + return its; +} + + +/// Converts a time_point and a duration to an `itimerspec` +/// +/// \param base base time for the timer; must use the same clock as the timer +/// \param interval period for re-arming the timer +/// \return `base` and `interval` converted to an `itimerspec` +template <typename Clock, class Duration, class Rep, class Period> +struct itimerspec +to_absolute_itimerspec(std::chrono::time_point<Clock, Duration> base, std::chrono::duration<Rep, Period> interval) { + return to_relative_itimerspec(base.time_since_epoch(), interval); +} + +} + +class posix_thread { +public: + class attr; +private: + // must allocate, since this class is moveable + std::unique_ptr<std::function<void ()>> _func; + pthread_t _pthread; + bool _valid = true; + mmap_area _stack; +private: + static void* start_routine(void* arg) noexcept; +public: + posix_thread(std::function<void ()> func); + posix_thread(attr a, std::function<void ()> func); + posix_thread(posix_thread&& x); + ~posix_thread(); + void join(); +public: + class attr { + public: + struct stack_size { size_t size = 0; }; + attr() = default; + template <typename... A> + attr(A... a) { + set(std::forward<A>(a)...); + } + void set() {} + template <typename A, typename... Rest> + void set(A a, Rest... rest) { + set(std::forward<A>(a)); + set(std::forward<Rest>(rest)...); + } + void set(stack_size ss) { _stack_size = ss; } + private: + stack_size _stack_size; + friend class posix_thread; + }; +}; + + +inline +void throw_system_error_on(bool condition, const char* what_arg) { + if (condition) { + if ((errno == EBADF || errno == ENOTSOCK) && is_abort_on_ebadf_enabled()) { + abort(); + } + throw std::system_error(errno, std::system_category(), what_arg); + } +} + +template <typename T> +inline +void throw_kernel_error(T r) { + static_assert(std::is_signed<T>::value, "kernel error variables must be signed"); + if (r < 0) { + auto ec = -r; + if ((ec == EBADF || ec == ENOTSOCK) && is_abort_on_ebadf_enabled()) { + abort(); + } + throw std::system_error(-r, std::system_category()); + } +} + +template <typename T> +inline +void throw_pthread_error(T r) { + if (r != 0) { + throw std::system_error(r, std::system_category()); + } +} + +inline +sigset_t make_sigset_mask(int signo) { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, signo); + return set; +} + +inline +sigset_t make_full_sigset_mask() { + sigset_t set; + sigfillset(&set); + return set; +} + +inline +sigset_t make_empty_sigset_mask() { + sigset_t set; + sigemptyset(&set); + return set; +} + +inline +void pin_this_thread(unsigned cpu_id) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + auto r = pthread_setaffinity_np(pthread_self(), sizeof(cs), &cs); + assert(r == 0); + (void)r; +} + +/// @} + +} |