blob: 5a430554e59f4858198d0d31dfe0b301a9f4c894 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <stdio.h>
#include <signal.h>
#include <thread>
#include <seastar/core/app-template.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/alien.hh>
#include <seastar/core/thread.hh>
struct SeastarRunner {
seastar::app_template app;
seastar::file_desc begin_fd;
std::unique_ptr<seastar::readable_eventfd> on_end;
std::thread thread;
SeastarRunner() :
begin_fd{seastar::file_desc::eventfd(0, 0)} {}
~SeastarRunner() {}
void init(int argc, char **argv)
{
thread = std::thread([argc, argv, this] { reactor(argc, argv); });
eventfd_t result = 0;
if (int r = ::eventfd_read(begin_fd.get(), &result); r < 0) {
std::cerr << "unable to eventfd_read():" << errno << std::endl;
throw std::runtime_error("Cannot start seastar");
}
}
void stop()
{
run([this] {
on_end->write_side().signal(1);
return seastar::now();
});
thread.join();
}
void reactor(int argc, char **argv)
{
app.run(argc, argv, [this] {
on_end.reset(new seastar::readable_eventfd);
return seastar::now().then([this] {
::eventfd_write(begin_fd.get(), 1);
return seastar::now();
}).then([this] {
return on_end->wait().then([](size_t){});
}).handle_exception([](auto ep) {
std::cerr << "Error: " << ep << std::endl;
}).finally([this] {
on_end.reset();
});
});
}
template <typename Func>
void run(Func &&func) {
auto fut = seastar::alien::submit_to(0, std::forward<Func>(func));
fut.get();
}
};
|