summaryrefslogtreecommitdiffstats
path: root/src/test/crimson/seastar_runner.h
blob: 5a430554e59f4858198d0d31dfe0b301a9f4c894 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <stdio.h>
#include <signal.h>
#include <thread>

#include <seastar/core/app-template.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/alien.hh>
#include <seastar/core/thread.hh>

struct SeastarRunner {
  seastar::app_template app;
  seastar::file_desc begin_fd;
  std::unique_ptr<seastar::readable_eventfd> on_end;

  std::thread thread;

  SeastarRunner() :
    begin_fd{seastar::file_desc::eventfd(0, 0)} {}

  ~SeastarRunner() {}

  void init(int argc, char **argv)
  {
    thread = std::thread([argc, argv, this] { reactor(argc, argv); });
    eventfd_t result = 0;
    if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) {
      std::cerr << "unable to eventfd_read():" << errno << std::endl;
      throw std::runtime_error("Cannot start seastar");
    }
  }
  
  void stop()
  {
    run([this] {
      on_end->write_side().signal(1);
      return seastar::now();
    });
    thread.join();
  }

  void reactor(int argc, char **argv)
  {
    app.run(argc, argv, [this] {
      on_end.reset(new seastar::readable_eventfd);
      return seastar::now().then([this] {
	::eventfd_write(begin_fd.get(), 1);
	  return seastar::now();
      }).then([this] {
	return on_end->wait().then([](size_t){});
      }).handle_exception([](auto ep) {
	std::cerr << "Error: " << ep << std::endl;
      }).finally([this] {
	on_end.reset();
      });
    });
  }

  template <typename Func>
  void run(Func &&func) {
    auto fut = seastar::alien::submit_to(0, std::forward<Func>(func));
    fut.get();
  }
};