summaryrefslogtreecommitdiffstats
path: root/src/test/crimson/seastar_runner.h
blob: 58d3f8119e669ba9342eea9323223a2d33ef9a88 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <stdio.h>
#include <signal.h>
#include <thread>

#include <seastar/core/app-template.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/alien.hh>
#include <seastar/core/thread.hh>

struct SeastarRunner {
  static constexpr eventfd_t APP_RUNNING = 1;
  static constexpr eventfd_t APP_NOT_RUN = 2;

  seastar::app_template app;
  seastar::file_desc begin_fd;
  std::unique_ptr<seastar::readable_eventfd> on_end;

  std::thread thread;

  bool begin_signaled = false;

  SeastarRunner() :
    begin_fd{seastar::file_desc::eventfd(0, 0)} {}

  ~SeastarRunner() {}

  bool is_running() const {
    return !!on_end;
  }

  int init(int argc, char **argv)
  {
    thread = std::thread([argc, argv, this] { reactor(argc, argv); });
    eventfd_t result;
    if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) {
      std::cerr << "unable to eventfd_read():" << errno << std::endl;
      return r;
    }
    assert(begin_signaled == true);
    if (result == APP_RUNNING) {
      assert(is_running());
      return 0;
    } else {
      assert(result == APP_NOT_RUN);
      assert(!is_running());
      return 1;
    }
  }
  
  void stop()
  {
    if (is_running()) {
      run([this] {
        on_end->write_side().signal(1);
        return seastar::now();
      });
    }
    thread.join();
  }

  void reactor(int argc, char **argv)
  {
    auto ret = app.run(argc, argv, [this] {
      on_end.reset(new seastar::readable_eventfd);
      return seastar::now().then([this] {
	begin_signaled = true;
	[[maybe_unused]] auto r = ::eventfd_write(begin_fd.get(), APP_RUNNING);
	assert(r == 0);
	return seastar::now();
      }).then([this] {
	return on_end->wait().then([](size_t){});
      }).handle_exception([](auto ep) {
	std::cerr << "Error: " << ep << std::endl;
      }).finally([this] {
	on_end.reset();
      });
    });
    if (ret != 0) {
      std::cerr << "Seastar app returns " << ret << std::endl;
    }
    if (!begin_signaled) {
      begin_signaled = true;
      ::eventfd_write(begin_fd.get(), APP_NOT_RUN);
    }
  }

  template <typename Func>
  void run(Func &&func) {
    assert(is_running());
    auto fut = seastar::alien::submit_to(app.alien(), 0,
					 std::forward<Func>(func));
    fut.get();
  }
};