diff options
Diffstat (limited to 'ipc/chromium/src/base/message_pump_libevent.cc')
-rw-r--r-- | ipc/chromium/src/base/message_pump_libevent.cc | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/ipc/chromium/src/base/message_pump_libevent.cc b/ipc/chromium/src/base/message_pump_libevent.cc new file mode 100644 index 0000000000..546c2d1e08 --- /dev/null +++ b/ipc/chromium/src/base/message_pump_libevent.cc @@ -0,0 +1,432 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +// Copyright (c) 2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_pump_libevent.h" + +#include <errno.h> +#include <fcntl.h> +#if defined(ANDROID) || defined(OS_POSIX) +# include <unistd.h> +#endif + +#include "eintr_wrapper.h" +#include "base/logging.h" +#include "base/scoped_nsautorelease_pool.h" +#include "base/time.h" +#include "nsDependentSubstring.h" +#include "event.h" +#include "mozilla/UniquePtr.h" +#include "GeckoProfiler.h" + +// This macro checks that the _EVENT_SIZEOF_* constants defined in +// ipc/chromiume/src/third_party/<platform>/event2/event-config.h are correct. +#if defined(_EVENT_SIZEOF_SHORT) +# define CHECK_EVENT_SIZEOF(TYPE, type) \ + static_assert(_EVENT_SIZEOF_##TYPE == sizeof(type), \ + "bad _EVENT_SIZEOF_" #TYPE); +#elif defined(EVENT__SIZEOF_SHORT) +# define CHECK_EVENT_SIZEOF(TYPE, type) \ + static_assert(EVENT__SIZEOF_##TYPE == sizeof(type), \ + "bad EVENT__SIZEOF_" #TYPE); +#else +# error Cannot find libevent type sizes +#endif + +CHECK_EVENT_SIZEOF(LONG, long); +CHECK_EVENT_SIZEOF(LONG_LONG, long long); +CHECK_EVENT_SIZEOF(OFF_T, ev_off_t); +CHECK_EVENT_SIZEOF(PTHREAD_T, pthread_t); +CHECK_EVENT_SIZEOF(SHORT, short); +CHECK_EVENT_SIZEOF(SIZE_T, size_t); +CHECK_EVENT_SIZEOF(VOID_P, void*); + +// Lifecycle of struct event +// Libevent uses two main data structures: +// struct event_base (of which there is one per message pump), and +// struct event (of which there is roughly one per socket). +// The socket's struct event is created in +// MessagePumpLibevent::WatchFileDescriptor(), +// is owned by the FileDescriptorWatcher, and is destroyed in +// StopWatchingFileDescriptor(). +// It is moved into and out of lists in struct event_base by +// the libevent functions event_add() and event_del(). +// +// TODO(dkegel): +// At the moment bad things happen if a FileDescriptorWatcher +// is active after its MessagePumpLibevent has been destroyed. +// See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop +// Not clear yet whether that situation occurs in practice, +// but if it does, we need to fix it. + +namespace base { + +// Return 0 on success +// Too small a function to bother putting in a library? +static int SetNonBlocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() + : is_persistent_(false), event_(NULL) {} + +MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { + if (event_) { + StopWatchingFileDescriptor(); + } +} + +void MessagePumpLibevent::FileDescriptorWatcher::Init(event* e, + bool is_persistent) { + DCHECK(e); + DCHECK(event_ == NULL); + + is_persistent_ = is_persistent; + event_ = e; +} + +event* MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { + struct event* e = event_; + event_ = NULL; + return e; +} + +bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { + event* e = ReleaseEvent(); + if (e == NULL) return true; + + // event_del() is a no-op if the event isn't active. + int rv = event_del(e); + delete e; + return (rv == 0); +} + +// Called if a byte is received on the wakeup pipe. +void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { + AUTO_PROFILER_LABEL("MessagePumpLibevent::OnWakeup", OTHER); + + base::MessagePumpLibevent* that = + static_cast<base::MessagePumpLibevent*>(context); + DCHECK(that->wakeup_pipe_out_ == socket); + + // Remove and discard the wakeup byte. + char buf; + int nread = HANDLE_EINTR(read(socket, &buf, 1)); + DCHECK_EQ(nread, 1); + // Tell libevent to break out of inner loop. + event_base_loopbreak(that->event_base_); +} + +MessagePumpLibevent::MessagePumpLibevent() + : keep_running_(true), + in_run_(false), + event_base_(event_base_new()), + wakeup_pipe_in_(-1), + wakeup_pipe_out_(-1) { + if (!Init()) NOTREACHED(); +} + +bool MessagePumpLibevent::Init() { + int fds[2]; + if (pipe(fds)) { + DLOG(ERROR) << "pipe() failed, errno: " << errno; + return false; + } + if (SetNonBlocking(fds[0])) { + DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; + return false; + } + if (SetNonBlocking(fds[1])) { + DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; + return false; + } + wakeup_pipe_out_ = fds[0]; + wakeup_pipe_in_ = fds[1]; + + wakeup_event_ = new event; + event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, OnWakeup, + this); + event_base_set(event_base_, wakeup_event_); + + if (event_add(wakeup_event_, 0)) return false; + return true; +} + +MessagePumpLibevent::~MessagePumpLibevent() { + DCHECK(wakeup_event_); + DCHECK(event_base_); + event_del(wakeup_event_); + delete wakeup_event_; + if (wakeup_pipe_in_ >= 0) close(wakeup_pipe_in_); + if (wakeup_pipe_out_ >= 0) close(wakeup_pipe_out_); + event_base_free(event_base_); +} + +bool MessagePumpLibevent::WatchFileDescriptor(int fd, bool persistent, + Mode mode, + FileDescriptorWatcher* controller, + Watcher* delegate) { + DCHECK(fd > 0); + DCHECK(controller); + DCHECK(delegate); + DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); + + int event_mask = persistent ? EV_PERSIST : 0; + if ((mode & WATCH_READ) != 0) { + event_mask |= EV_READ; + } + if ((mode & WATCH_WRITE) != 0) { + event_mask |= EV_WRITE; + } + + // |should_delete_event| is true if we're modifying an event that's currently + // active in |controller|. + // If we're modifying an existing event and there's an error then we need to + // tell libevent to clean it up via event_delete() before returning. + bool should_delete_event = true; + mozilla::UniquePtr<event> evt(controller->ReleaseEvent()); + if (evt.get() == NULL) { + should_delete_event = false; + // Ownership is transferred to the controller. + evt = mozilla::MakeUnique<event>(); + } else { + // It's illegal to use this function to listen on 2 separate fds with the + // same |controller|. + if (EVENT_FD(evt.get()) != fd) { + NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd; + return false; + } + + // Make sure we don't pick up any funky internal libevent masks. + int old_interest_mask = + evt.get()->ev_events & (EV_READ | EV_WRITE | EV_PERSIST); + + // Combine old/new event masks. + event_mask |= old_interest_mask; + + // Must disarm the event before we can reuse it. + event_del(evt.get()); + } + + // Set current interest mask and message pump for this event. + event_set(evt.get(), fd, event_mask, OnLibeventNotification, delegate); + + // Tell libevent which message pump this socket will belong to when we add it. + if (event_base_set(event_base_, evt.get()) != 0) { + if (should_delete_event) { + event_del(evt.get()); + } + return false; + } + + // Add this socket to the list of monitored sockets. + if (event_add(evt.get(), NULL) != 0) { + if (should_delete_event) { + event_del(evt.get()); + } + return false; + } + + // Transfer ownership of evt to controller. + controller->Init(evt.release(), persistent); + return true; +} + +void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, + void* context) { + AUTO_PROFILER_LABEL("MessagePumpLibevent::OnLibeventNotification", OTHER); + + Watcher* watcher = static_cast<Watcher*>(context); + + if (flags & EV_WRITE) { + watcher->OnFileCanWriteWithoutBlocking(fd); + } + if (flags & EV_READ) { + watcher->OnFileCanReadWithoutBlocking(fd); + } +} + +MessagePumpLibevent::SignalEvent::SignalEvent() : event_(NULL) {} + +MessagePumpLibevent::SignalEvent::~SignalEvent() { + if (event_) { + StopCatching(); + } +} + +void MessagePumpLibevent::SignalEvent::Init(event* e) { + DCHECK(e); + DCHECK(event_ == NULL); + event_ = e; +} + +bool MessagePumpLibevent::SignalEvent::StopCatching() { + // XXX/cjones: this code could be shared with + // FileDescriptorWatcher. ironic that libevent is "more" + // object-oriented than this C++ + event* e = ReleaseEvent(); + if (e == NULL) return true; + + // event_del() is a no-op if the event isn't active. + int rv = event_del(e); + delete e; + return (rv == 0); +} + +event* MessagePumpLibevent::SignalEvent::ReleaseEvent() { + event* e = event_; + event_ = NULL; + return e; +} + +bool MessagePumpLibevent::CatchSignal(int sig, SignalEvent* sigevent, + SignalWatcher* delegate) { + DCHECK(sig > 0); + DCHECK(sigevent); + DCHECK(delegate); + // TODO if we want to support re-using SignalEvents, this code needs + // to jump through the same hoops as WatchFileDescriptor(). Not + // needed at present + DCHECK(NULL == sigevent->event_); + + mozilla::UniquePtr<event> evt = mozilla::MakeUnique<event>(); + signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate); + + if (event_base_set(event_base_, evt.get())) return false; + + if (signal_add(evt.get(), NULL)) return false; + + // Transfer ownership of evt to controller. + sigevent->Init(evt.release()); + return true; +} + +void MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags, + void* context) { + AUTO_PROFILER_LABEL("MessagePumpLibevent::OnLibeventSignalNotification", + OTHER); + + DCHECK(sig > 0); + DCHECK(EV_SIGNAL == flags); + DCHECK(context); + reinterpret_cast<SignalWatcher*>(context)->OnSignal(sig); +} + +// Reentrant! +void MessagePumpLibevent::Run(Delegate* delegate) { + DCHECK(keep_running_) << "Quit must have been called outside of Run!"; + + bool old_in_run = in_run_; + in_run_ = true; + + for (;;) { + ScopedNSAutoreleasePool autorelease_pool; + + bool did_work = delegate->DoWork(); + if (!keep_running_) break; + + did_work |= delegate->DoDelayedWork(&delayed_work_time_); + if (!keep_running_) break; + + if (did_work) continue; + + did_work = delegate->DoIdleWork(); + if (!keep_running_) break; + + if (did_work) continue; + + // EVLOOP_ONCE tells libevent to only block once, + // but to service all pending events when it wakes up. + AUTO_PROFILER_LABEL("MessagePumpLibevent::Run::Wait", IDLE); + if (delayed_work_time_.is_null()) { + event_base_loop(event_base_, EVLOOP_ONCE); + } else { + TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); + if (delay > TimeDelta()) { + struct timeval poll_tv; + poll_tv.tv_sec = delay.InSeconds(); + poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; + event_base_loopexit(event_base_, &poll_tv); + event_base_loop(event_base_, EVLOOP_ONCE); + } else { + // It looks like delayed_work_time_ indicates a time in the past, so we + // need to call DoDelayedWork now. + delayed_work_time_ = TimeTicks(); + } + } + } + + keep_running_ = true; + in_run_ = old_in_run; +} + +void MessagePumpLibevent::Quit() { + DCHECK(in_run_); + // Tell both libevent and Run that they should break out of their loops. + keep_running_ = false; + ScheduleWork(); +} + +void MessagePumpLibevent::ScheduleWork() { + // Tell libevent (in a threadsafe way) that it should break out of its loop. + char buf = 0; + int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); + DCHECK(nwrite == 1 || errno == EAGAIN) + << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; +} + +void MessagePumpLibevent::ScheduleDelayedWork( + const TimeTicks& delayed_work_time) { + // We know that we can't be blocked on Wait right now since this method can + // only be called on the same thread as Run, so we only need to update our + // record of how long to sleep when we do sleep. + delayed_work_time_ = delayed_work_time; +} + +void LineWatcher::OnFileCanReadWithoutBlocking(int aFd) { + ssize_t length = 0; + + while (true) { + length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex); + DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex)); + if (length <= 0) { + if (length < 0) { + if (errno == EINTR) { + continue; // retry system call when interrupted + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; // no data available: return and re-poll + } + DLOG(ERROR) << "Can't read from fd, error " << errno; + } else { + DLOG(ERROR) << "End of file"; + } + // At this point, assume that we can't actually access + // the socket anymore, and indicate an error. + OnError(); + mReceivedIndex = 0; + return; + } + + while (length-- > 0) { + DCHECK(mReceivedIndex < mBufferSize); + if (mReceiveBuffer[mReceivedIndex] == mTerminator) { + nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex); + OnLineRead(aFd, message); + if (length > 0) { + DCHECK(mReceivedIndex < (mBufferSize - 1)); + memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], + length); + } + mReceivedIndex = 0; + } else { + mReceivedIndex++; + } + } + } +} +} // namespace base |